Managing and Monitoring Runs
Managing and monitoring executions in flyte-sdk is primarily handled through the Run class. This class provides methods to retrieve existing runs, filter through execution history, monitor progress in real-time, and terminate active tasks.
Retrieving and Listing Runs
You can retrieve a specific run by its name or list multiple runs using filters such as phase, task name, or creation time.
Fetching a Single Run
To retrieve a specific execution, use the Run.get method with the run's unique name.
from flyte.remote import Run
# Retrieve a run by its name
run = Run.get("a1b2c3d4e5f6g7h8")
print(f"Run {run.name} is currently in phase: {run.phase}")
Listing Runs with Filters
The Run.listall method allows you to query for runs based on various criteria. It returns an iterator (or an AsyncIterator when using .aio()).
from flyte.remote import Run
from flyte.models import ActionPhase
# List the 5 most recent successful runs for a specific task
runs = Run.listall(
task_name="my_project.tasks.my_task",
in_phase=(ActionPhase.SUCCEEDED,),
sort_by=("created_at", "desc"),
limit=5
)
for run in runs:
print(f"Found run: {run.name} created at {run.url}")
Filtering by Time
You can use the TimeFilter class to restrict results to a specific time range using the created_at or updated_at parameters.
from datetime import datetime, timedelta, timezone
from flyte.remote import Run, TimeFilter
# Define a time range for the last 24 hours
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
time_filter = TimeFilter(after=yesterday)
# List all runs started in the last 24 hours
recent_runs = Run.listall(created_at=time_filter)
Monitoring Execution State
flyte-sdk provides both blocking and streaming methods to monitor the progress of a run.
Waiting for Completion
Use Run.wait() to block execution until the run reaches a terminal state (Succeeded, Failed, or Aborted). This method displays a rich progress panel in the terminal.
run = Run.get("my-run-id")
# Block until the run finishes
run.wait()
if run.phase == "SUCCEEDED":
print("Run completed successfully!")
Watching Real-time Updates
For more granular control or to stream updates as they happen, use Run.watch(). This returns an async generator of ActionDetails.
import asyncio
from flyte.remote import Run
async def monitor_run(run_name: str):
run = await Run.get.aio(run_name)
# Stream updates as the run progresses
async for details in run.watch():
print(f"Current phase: {details.pb2.status.phase}")
if run.done():
break
asyncio.run(monitor_run("my-run-id"))
Accessing Logs and Metadata
Once a run is retrieved, you can access its logs, inputs, outputs, and detailed configuration.
Retrieving Logs
You can stream logs to the console or retrieve them as an iterator of strings.
run = Run.get("my-run-id")
# Display logs in the terminal with a limit of 100 lines
run.show_logs(max_lines=100)
# Or iterate over log lines programmatically
for line in run.get_logs(filter_system=True):
print(f"Log: {line}")
Accessing Inputs and Outputs
Inputs and outputs are available via the inputs() and outputs() methods, which return ActionInputs and ActionOutputs respectively.
run = Run.get("my-run-id")
# Access inputs
run_inputs = run.inputs()
print(f"Input value for 'x': {run_inputs['x']}")
# Access outputs (only available if the run succeeded)
if run.done() and run.phase == "SUCCEEDED":
run_outputs = run.outputs()
print(f"Result: {run_outputs}")
Inspecting Run Details
The Run.details() method provides access to the RunDetails object, which contains metadata like labels, annotations, and environment variables.
run = Run.get("my-run-id")
details = run.details()
print(f"Labels: {details.pb2.run_spec.labels}")
print(f"Is Interruptible: {details.pb2.run_spec.interruptible}")
Terminating Active Runs
If a run needs to be stopped manually, use the abort() method. This is an idempotent operation; if the run is already terminated or does not exist, it will not raise an error.
run = Run.get("my-run-id")
if not run.done():
print(f"Aborting run {run.name}...")
run.abort(reason="Manual termination via SDK")
For bulk operations, you can combine listall with abort:
import asyncio
from flyte.remote import Run
async def abort_all_running():
# Find all currently running executions
running_runs = [r async for r in Run.listall.aio(in_phase=("running",))]
for run in running_runs:
await run.abort.aio(reason="Cleaning up active runs")
asyncio.run(abort_all_running())