Skip to main content

Long-running App Environments

Long-running services in flyte-sdk are managed through the AppEnvironment class and its specialized subclasses. These environments allow you to deploy APIs, dashboards, and model servers that scale automatically and integrate with the rest of your Flyte workflows.

In this tutorial, you will build a FastAPI service and a Streamlit dashboard, configure their scaling behavior, and learn how to test them locally using the flyte-sdk serving infrastructure.

Prerequisites

To follow this tutorial, you need the following packages installed:

pip install flyte-sdk fastapi uvicorn streamlit

Step 1: Define a FastAPI Service

The FastAPIAppEnvironment is a specialized environment that simplifies deploying FastAPI applications. It automatically configures uvicorn as the server and adds a link to the OpenAPI documentation in the Flyte console.

Create a file named app.py:

import flyte
from fastapi import FastAPI
from flyte.app.extras import FastAPIAppEnvironment

# 1. Define your FastAPI application
app = FastAPI(title="Math Service")

@app.get("/add")
async def add(x: int, y: int):
return {"result": x + y}

# 2. Wrap it in a FastAPIAppEnvironment
app_env = FastAPIAppEnvironment(
name="math-api",
app=app,
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
["fastapi", "uvicorn"]
),
requires_auth=False, # Make the endpoint public for this tutorial
)

The FastAPIAppEnvironment handles a known issue where standard FastAPI/Starlette state objects cannot be pickled. It replaces the state with a PicklableState internally to ensure your app can be deployed to the Flyte backend.

Step 2: Configure Scaling Policies

By default, apps use a "scale-to-zero" policy (Scaling(replicas=(0, 1))), meaning they only run when they receive traffic. You can customize this using the Scaling class.

Update your app_env to use a burstable scaling policy:

from flyte.app import Scaling

app_env.scaling = Scaling(
replicas=(1, 5), # Min 1 replica (always on), Max 5 replicas
metric=Scaling.Concurrency(val=10), # Scale up when a replica handles > 10 concurrent requests
scaledown_after=600, # Wait 10 minutes of inactivity before scaling down to min_replicas
)

Available scaling patterns include:

  • Fixed size: Scaling(replicas=3)
  • Always-on: Scaling(replicas=(1, 1))
  • High-availability: Scaling(replicas=(2, 10))

Step 3: Local Development and Testing

Before deploying to a remote cluster, you can serve your app locally using flyte.serve. This returns an AppHandle (specifically a _LocalApp instance) that you can use to interact with the running service.

import httpx

def test_locally():
# Serve the app in a background thread
local_app = flyte.serve(app_env)

# Wait for the health check to pass
local_app.activate(wait=True)

print(f"App is active at: {local_app.endpoint}")

# Interact with the app
response = httpx.get(f"{local_app.endpoint}/add", params={"x": 10, "y": 5})
print(f"Response: {response.json()}")

# Shut down the local server
local_app.deactivate(wait=True)

if __name__ == "__main__":
test_locally()

The activate method polls the health check endpoint (defaulting to /health) until the app is ready. You can customize these timeouts using flyte.with_servecontext.

Step 4: Deploy Generic Apps (Streamlit)

For services that aren't based on FastAPI, use the base AppEnvironment and provide the execution args or command.

streamlit_env = flyte.app.AppEnvironment(
name="data-dashboard",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
["streamlit"]
),
# Streamlit requires specific arguments to run on the correct port
args=[
"streamlit", "run", "dashboard.py",
"--server.port", "8080",
"--server.address", "0.0.0.0"
],
port=8080,
requires_auth=False,
)

Note on Reserved Ports: flyte-sdk reserves several ports for internal use. Do not configure your app to use ports 8012, 8022, 8112, 9090, or 9091.

Step 5: Lifecycle Management with Context Managers

To ensure your local server is always cleaned up, even if an error occurs, use the ephemeral_ctx context manager provided by the AppHandle.

async def run_async_test():
# with_servecontext allows overriding configuration like environment variables
serve_ctx = flyte.with_servecontext(
mode="local",
env_vars={"DEBUG": "1"}
)

handle = serve_ctx.serve(app_env)

# ephemeral_ctx automatically calls activate(wait=True) and deactivate(wait=True)
async with handle.ephemeral_ctx():
print(f"Testing app at {handle.url}")
# ... perform tests ...

# For synchronous code, use ephemeral_ctx_sync()
def run_sync_test():
handle = flyte.serve(app_env)
with handle.ephemeral_ctx_sync():
print(f"App is running at {handle.endpoint}")

Summary of Results

You have now configured a long-running service that:

  1. Uses FastAPIAppEnvironment for automatic uvicorn integration.
  2. Implements a Scaling policy to handle bursty traffic.
  3. Can be tested locally using flyte.serve and AppHandle.
  4. Manages its own lifecycle via ephemeral_ctx.

To deploy this app to a remote Flyte cluster, simply change the mode in your serve context:

remote_app = flyte.with_servecontext(mode="remote").serve(app_env)
print(f"Remote App URL: {remote_app.url}")