Long-running App Environments
Long-running services in flyte-sdk are managed through the AppEnvironment class and its specialized subclasses. These environments allow you to deploy APIs, dashboards, and model servers that scale automatically and integrate with the rest of your Flyte workflows.
In this tutorial, you will build a FastAPI service and a Streamlit dashboard, configure their scaling behavior, and learn how to test them locally using the flyte-sdk serving infrastructure.
Prerequisites
To follow this tutorial, you need the following packages installed:
pip install flyte-sdk fastapi uvicorn streamlit
Step 1: Define a FastAPI Service
The FastAPIAppEnvironment is a specialized environment that simplifies deploying FastAPI applications. It automatically configures uvicorn as the server and adds a link to the OpenAPI documentation in the Flyte console.
Create a file named app.py:
import flyte
from fastapi import FastAPI
from flyte.app.extras import FastAPIAppEnvironment
# 1. Define your FastAPI application
app = FastAPI(title="Math Service")
@app.get("/add")
async def add(x: int, y: int):
return {"result": x + y}
# 2. Wrap it in a FastAPIAppEnvironment
app_env = FastAPIAppEnvironment(
name="math-api",
app=app,
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
["fastapi", "uvicorn"]
),
requires_auth=False, # Make the endpoint public for this tutorial
)
The FastAPIAppEnvironment handles a known issue where standard FastAPI/Starlette state objects cannot be pickled. It replaces the state with a PicklableState internally to ensure your app can be deployed to the Flyte backend.
Step 2: Configure Scaling Policies
By default, apps use a "scale-to-zero" policy (Scaling(replicas=(0, 1))), meaning they only run when they receive traffic. You can customize this using the Scaling class.
Update your app_env to use a burstable scaling policy:
from flyte.app import Scaling
app_env.scaling = Scaling(
replicas=(1, 5), # Min 1 replica (always on), Max 5 replicas
metric=Scaling.Concurrency(val=10), # Scale up when a replica handles > 10 concurrent requests
scaledown_after=600, # Wait 10 minutes of inactivity before scaling down to min_replicas
)
Available scaling patterns include:
- Fixed size:
Scaling(replicas=3) - Always-on:
Scaling(replicas=(1, 1)) - High-availability:
Scaling(replicas=(2, 10))
Step 3: Local Development and Testing
Before deploying to a remote cluster, you can serve your app locally using flyte.serve. This returns an AppHandle (specifically a _LocalApp instance) that you can use to interact with the running service.
import httpx
def test_locally():
# Serve the app in a background thread
local_app = flyte.serve(app_env)
# Wait for the health check to pass
local_app.activate(wait=True)
print(f"App is active at: {local_app.endpoint}")
# Interact with the app
response = httpx.get(f"{local_app.endpoint}/add", params={"x": 10, "y": 5})
print(f"Response: {response.json()}")
# Shut down the local server
local_app.deactivate(wait=True)
if __name__ == "__main__":
test_locally()
The activate method polls the health check endpoint (defaulting to /health) until the app is ready. You can customize these timeouts using flyte.with_servecontext.
Step 4: Deploy Generic Apps (Streamlit)
For services that aren't based on FastAPI, use the base AppEnvironment and provide the execution args or command.
streamlit_env = flyte.app.AppEnvironment(
name="data-dashboard",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
["streamlit"]
),
# Streamlit requires specific arguments to run on the correct port
args=[
"streamlit", "run", "dashboard.py",
"--server.port", "8080",
"--server.address", "0.0.0.0"
],
port=8080,
requires_auth=False,
)
Note on Reserved Ports: flyte-sdk reserves several ports for internal use. Do not configure your app to use ports 8012, 8022, 8112, 9090, or 9091.
Step 5: Lifecycle Management with Context Managers
To ensure your local server is always cleaned up, even if an error occurs, use the ephemeral_ctx context manager provided by the AppHandle.
async def run_async_test():
# with_servecontext allows overriding configuration like environment variables
serve_ctx = flyte.with_servecontext(
mode="local",
env_vars={"DEBUG": "1"}
)
handle = serve_ctx.serve(app_env)
# ephemeral_ctx automatically calls activate(wait=True) and deactivate(wait=True)
async with handle.ephemeral_ctx():
print(f"Testing app at {handle.url}")
# ... perform tests ...
# For synchronous code, use ephemeral_ctx_sync()
def run_sync_test():
handle = flyte.serve(app_env)
with handle.ephemeral_ctx_sync():
print(f"App is running at {handle.endpoint}")
Summary of Results
You have now configured a long-running service that:
- Uses
FastAPIAppEnvironmentfor automatic uvicorn integration. - Implements a
Scalingpolicy to handle bursty traffic. - Can be tested locally using
flyte.serveandAppHandle. - Manages its own lifecycle via
ephemeral_ctx.
To deploy this app to a remote Flyte cluster, simply change the mode in your serve context:
remote_app = flyte.with_servecontext(mode="remote").serve(app_env)
print(f"Remote App URL: {remote_app.url}")