Skip to main content

Implementing Human-in-the-Loop Workflows

Human-in-the-Loop (HITL) workflows in flyte-sdk allow you to pause a running task and wait for human intervention. This is implemented using a FastAPI application served within your cluster that provides both a web interface for manual entry and a JSON API for programmatic submissions.

By the end of this tutorial, you will build a Flyte task that pauses to ask a user for an integer value and then continues execution once the value is provided.

Prerequisites

To use the HITL plugin, ensure you have the following installed in your environment:

  • flyteplugins-hitl
  • fastapi
  • uvicorn
  • python-multipart

Step 1: Configure the Task Environment

The HITL plugin requires a specific environment to serve the input application. You must include flyteplugins.hitl.env in the depends_on list of your TaskEnvironment. This ensures the necessary infrastructure is initialized when your task runs.

import flyte
import flyteplugins.hitl as hitl

# Define the environment for the HITL task
task_env = flyte.TaskEnvironment(
name="hitl-tutorial",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"flyteplugins-hitl",
"fastapi",
"uvicorn",
"python-multipart",
),
depends_on=[hitl.env],
)

Step 2: Create and Wait for an Event

In your task, use hitl.new_event.aio to create a new Event. This call automatically triggers the deployment of the HITL FastAPI app if it is not already running. Once the event is created, call event.wait.aio() to pause the task and wait for input.

Note: You must set report=True in the @task_env.task decorator. This allows flyte-sdk to generate a Flyte Report containing the link to the input form.

@task_env.task(report=True)
async def ask_for_number() -> int:
# 1. Create the event
event = await hitl.new_event.aio(
name="user_number_input",
data_type=int,
prompt="Please provide an integer to continue the workflow.",
timeout_seconds=600, # Wait up to 10 minutes
)

print(f"Event created. Access the form at: {event.form_url}")

# 2. Wait for the human to submit a value
# This polls object storage and is crash-resilient
user_value = await event.wait.aio()

return user_value

Step 3: Provide Input via the Web Form

When the task reaches event.wait.aio(), it generates a task report. In the Flyte UI, you will see a link labeled Event Form (generated by the EventFormLink class).

  1. Click the link to open the HITL web interface.
  2. The interface displays your prompt and an input field tailored to your data_type (e.g., a numeric input for int, or a dropdown for bool).
  3. Enter the value and click Submit.

The task will detect the submission in object storage during its next poll (default interval is 5 seconds) and resume execution.

Step 4: Programmatic Submission via API

If you want to submit values from an external system or a custom UI instead of the built-in form, you can use the JSON API endpoint. The Event object provides the base URL via event.api_url.

Submissions must follow the HITLSubmissionTyped schema. You will need the request_id and response_path from the Event instance.

import requests
from flyteplugins.hitl._app import HITLSubmissionTyped

# Example of an external script submitting a value
def submit_value_programmatically(event_instance):
submission = {
"request_id": event_instance.request_id,
"value": 42,
"data_type": "int",
"response_path": event_instance._response_path
}

response = requests.post(
f"{event_instance.endpoint}/submit/json",
json=submission
)
return response.json()

How it Works

  • Crash Resilience: The Event class does not store state in memory. Instead, it writes request metadata to object storage and polls for a response file. If the Flyte task pod crashes and restarts, it will recreate the Event object and resume polling the same storage path, allowing the workflow to continue without losing the human's input.
  • Data Types: The plugin supports int, float, bool, and str. The HITLSubmissionTyped model ensures that values submitted via the API are validated and converted to the correct Python type before being returned to your task.
  • Polling: The wait() method uses a durable sleep mechanism. While the task remains in a "Running" state in Flyte, it consumes minimal resources while waiting for the response file to appear in storage.

Next Steps

  • Try using data_type=bool to create an approval gate in your workflow.
  • Customize the timeout_seconds to handle long-running human processes.
  • Integrate the /submit/json endpoint into a Slack bot or custom dashboard to trigger workflow continuations.