Implementing Human-in-the-Loop Workflows
Human-in-the-Loop (HITL) workflows in flyte-sdk allow you to pause a running task and wait for human intervention. This is implemented using a FastAPI application served within your cluster that provides both a web interface for manual entry and a JSON API for programmatic submissions.
By the end of this tutorial, you will build a Flyte task that pauses to ask a user for an integer value and then continues execution once the value is provided.
Prerequisites
To use the HITL plugin, ensure you have the following installed in your environment:
flyteplugins-hitlfastapiuvicornpython-multipart
Step 1: Configure the Task Environment
The HITL plugin requires a specific environment to serve the input application. You must include flyteplugins.hitl.env in the depends_on list of your TaskEnvironment. This ensures the necessary infrastructure is initialized when your task runs.
import flyte
import flyteplugins.hitl as hitl
# Define the environment for the HITL task
task_env = flyte.TaskEnvironment(
name="hitl-tutorial",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"flyteplugins-hitl",
"fastapi",
"uvicorn",
"python-multipart",
),
depends_on=[hitl.env],
)
Step 2: Create and Wait for an Event
In your task, use hitl.new_event.aio to create a new Event. This call automatically triggers the deployment of the HITL FastAPI app if it is not already running. Once the event is created, call event.wait.aio() to pause the task and wait for input.
Note: You must set report=True in the @task_env.task decorator. This allows flyte-sdk to generate a Flyte Report containing the link to the input form.
@task_env.task(report=True)
async def ask_for_number() -> int:
# 1. Create the event
event = await hitl.new_event.aio(
name="user_number_input",
data_type=int,
prompt="Please provide an integer to continue the workflow.",
timeout_seconds=600, # Wait up to 10 minutes
)
print(f"Event created. Access the form at: {event.form_url}")
# 2. Wait for the human to submit a value
# This polls object storage and is crash-resilient
user_value = await event.wait.aio()
return user_value
Step 3: Provide Input via the Web Form
When the task reaches event.wait.aio(), it generates a task report. In the Flyte UI, you will see a link labeled Event Form (generated by the EventFormLink class).
- Click the link to open the HITL web interface.
- The interface displays your
promptand an input field tailored to yourdata_type(e.g., a numeric input forint, or a dropdown forbool). - Enter the value and click Submit.
The task will detect the submission in object storage during its next poll (default interval is 5 seconds) and resume execution.
Step 4: Programmatic Submission via API
If you want to submit values from an external system or a custom UI instead of the built-in form, you can use the JSON API endpoint. The Event object provides the base URL via event.api_url.
Submissions must follow the HITLSubmissionTyped schema. You will need the request_id and response_path from the Event instance.
import requests
from flyteplugins.hitl._app import HITLSubmissionTyped
# Example of an external script submitting a value
def submit_value_programmatically(event_instance):
submission = {
"request_id": event_instance.request_id,
"value": 42,
"data_type": "int",
"response_path": event_instance._response_path
}
response = requests.post(
f"{event_instance.endpoint}/submit/json",
json=submission
)
return response.json()
How it Works
- Crash Resilience: The
Eventclass does not store state in memory. Instead, it writes request metadata to object storage and polls for a response file. If the Flyte task pod crashes and restarts, it will recreate theEventobject and resume polling the same storage path, allowing the workflow to continue without losing the human's input. - Data Types: The plugin supports
int,float,bool, andstr. TheHITLSubmissionTypedmodel ensures that values submitted via the API are validated and converted to the correct Python type before being returned to your task. - Polling: The
wait()method uses a durable sleep mechanism. While the task remains in a "Running" state in Flyte, it consumes minimal resources while waiting for the response file to appear in storage.
Next Steps
- Try using
data_type=boolto create an approval gate in your workflow. - Customize the
timeout_secondsto handle long-running human processes. - Integrate the
/submit/jsonendpoint into a Slack bot or custom dashboard to trigger workflow continuations.