Serving & Interactive Apps
Long-running applications in flyte-sdk, referred to as Apps, allow you to deploy services like model servers, interactive dashboards, and webhooks directly alongside your data workflows. Unlike standard tasks that run to completion, Apps stay active to handle requests and can be integrated into your workflows for human-in-the-loop interactions.
In this tutorial, you will learn how to wrap a CLI tool as a dashboard, build a FastAPI model server that consumes task outputs, and create an interactive workflow that pauses for human input.
Prerequisites
To follow this tutorial, ensure you have the following installed:
flyte-sdkfastapianduvicorn(for the model server)flyteplugins-hitl(for human-in-the-loop features)
Step 1: Deploying a Dashboard with AppEnvironment
The simplest way to deploy an app is by wrapping an existing CLI tool. For example, you can deploy a Streamlit dashboard by specifying the command and arguments in an AppEnvironment.
import flyte
import flyte.app
# Define the image with necessary dependencies
image = flyte.Image.from_debian_base(python="3.12").with_pip_packages("streamlit")
# Configure the AppEnvironment
app_env = flyte.app.AppEnvironment(
name="streamlit-hello",
image=image,
args="streamlit hello --server.port 8080",
resources=flyte.Resources(cpu="1", memory="1Gi"),
port=8080,
)
if __name__ == "__main__":
# Deploy the app to the cluster
flyte.deploy(app_env)
The AppEnvironment manages the lifecycle of the container. By setting port=8080, flyte-sdk ensures traffic is routed to your Streamlit service. Note that ports 8012, 8022, 8112, 9090, and 9091 are reserved by the platform and cannot be used.
Step 2: Building a FastAPI Model Server
For more complex services, FastAPIAppEnvironment provides specialized support for FastAPI applications, including hooks for resource initialization and the ability to pass data from previous task runs.
Defining the App and Parameters
You can use Parameter and RunOutput to dynamically feed the output of a specific task into your app.
from fastapi import FastAPI
import flyte
import flyte.app
from flyte.app.extras import FastAPIAppEnvironment
fastapi_app = FastAPI(title="Model Server")
# Define the app environment with a dynamic parameter
env = FastAPIAppEnvironment(
name="custom-model-server",
app=fastapi_app,
scaling=flyte.app.Scaling(replicas=(1, 4)), # Min 1, Max 4 replicas
parameters=[
flyte.app.Parameter(
name="model_file",
value=flyte.app.RunOutput(
task_name="train_model_task",
type="file"
),
download=True,
)
],
)
Using the Startup Hook
The @env.on_startup decorator allows you to perform heavy initialization, such as loading a model into memory, before the server starts accepting traffic.
import torch
@env.on_startup
async def load_model(model_file: flyte.io.File):
# The 'model_file' argument is automatically injected from the Parameter defined above
fastapi_app.state.model = torch.load(model_file.path)
@fastapi_app.get("/predict")
async def predict(input_data: float):
model = fastapi_app.state.model
return {"prediction": model(input_data)}
FastAPIAppEnvironment automatically patches the FastAPI state object to ensure it remains picklable during deployment, though you should avoid adding unpicklable custom state before the deployment process begins.
Step 3: Interactive Workflows with Human-in-the-Loop
Flyte-sdk supports "Human-in-the-Loop" (HITL) patterns where a workflow pauses and waits for a human to provide input via a web form. This is implemented using the flyteplugins.hitl module.
import flyte
import flyteplugins.hitl as hitl
task_env = flyte.TaskEnvironment(
name="hitl-example",
image=flyte.Image.from_debian_base(python="3.12").with_pip_packages("flyteplugins-hitl")
)
@task_env.task
async def interactive_workflow():
print("Workflow started. Waiting for human input...")
# Create an event that spawns a temporary FastAPI app for input
event = await hitl.new_event.aio(
"approval_event",
data_type=bool,
scope="run",
prompt="Do you approve the deployment of this model?",
)
# This call blocks until the human submits the form
approved = await event.wait.aio()
if approved:
print("Deployment approved!")
else:
print("Deployment rejected.")
if __name__ == "__main__":
flyte.run(interactive_workflow)
When event.wait() is called, flyte-sdk provides a URL in the logs and UI. The human opens this URL, fills out the generated form, and the workflow resumes once the data is submitted.
Scaling and Production Configuration
When deploying Apps for production, you can control their scaling behavior using the Scaling class:
- Replicas:
Scaling(replicas=(1, 10))ensures at least one instance is always running to avoid cold starts, while allowing up to 10 instances under load. - Scale-to-Zero: The default
Scaling()configuration allows the app to scale down to zero replicas when idle, saving costs. - Authentication: By default,
requires_auth=Trueis set onAppEnvironment, protecting your endpoints with the platform's authentication layer.
Complete Example Result
By combining these components, you can build a system where a training task produces a model, a human approves it via an interactive event, and the approved model is then automatically deployed to a high-availability FastAPI server.
To deploy your finished app environment:
if __name__ == "__main__":
flyte.deploy(env)
This command registers the app with the Flyte control plane, which manages the underlying Kubernetes resources and provides a stable endpoint for your service.