Skip to main content

Remote Execution Management

To manage and monitor remote executions in flyte-sdk, you use the Run and Action classes to track state, stream logs, and retrieve results from the Flyte backend.

Monitoring Execution Lifecycle

When you have a run ID from a previous execution, you can retrieve the run and wait for it to reach a terminal state before accessing its outputs.

from flyte.remote import Run

# Retrieve an existing run by its name/ID
run = Run.get("a7q2z9xl5b")

# Wait for the run to complete
# This displays a rich progress panel by default
run.wait()

if run.done():
print(f"Run finished with phase: {run.phase}")

# Retrieve outputs once the run is terminal
outputs = run.outputs()
print(f"Execution Result: {outputs}")

The Run.wait() method in flyte-sdk supports different synchronization points via the wait_for parameter:

  • "terminal" (default): Waits until the run succeeds or fails.
  • "running": Returns as soon as the execution moves out of queued/initializing states.

Streaming Execution Logs

You can stream logs from a running or completed execution directly to your terminal. flyte-sdk provides both synchronous and asynchronous iterators for log retrieval.

Synchronous Streaming

from flyte.remote import Run

run = Run.get("a7q2z9xl5b")

# Stream logs with system lines filtered out
for line in run.get_logs(filter_system=True, show_ts=True):
print(line)

Asynchronous Streaming

For integration into async applications, use the .aio() extension:

from flyte.remote import Run

async def monitor_logs(run_id: str):
run = await Run.get.aio(run_id)
async for line in run.get_logs.aio(show_ts=True):
print(f"Remote Log: {line}")

Analyzing Performance with Phase Transitions

To debug scheduling delays or resource initialization time, you can inspect the granular phase transitions stored in ActionDetails. This allows you to see exactly how long a task spent in QUEUED, INITIALIZING, or RUNNING states.

from flyte.remote import Run

run = Run.get("a7q2z9xl5b")
details = run.details() # Returns RunDetails containing ActionDetails

# Access specific timing metrics
print(f"Time spent in queue: {details.action_details.queued_time}")
print(f"Time spent initializing: {details.action_details.initializing_time}")
print(f"Actual execution time: {details.action_details.running_time}")

# Get the full breakdown of every transition
transitions = details.action_details.get_phase_transitions()
for t in transitions:
duration = t.duration.total_seconds()
print(f"Phase {t.phase.value}: {duration}s (Started: {t.start_time})")

Managing Active Executions

If an execution is no longer needed or is consuming excessive resources, you can terminate it using the abort method.

from flyte.remote import Run

run = Run.get("a7q2z9xl5b")

if not run.done():
print(f"Aborting run {run.name}...")
run.abort(reason="Resource optimization: manual termination.")

Troubleshooting and Gotchas

Output Availability

Calling run.outputs() or run.action.outputs() on an execution that is still running will raise a RuntimeError. Always ensure the run is in a terminal state:

# Correct pattern
run.wait()
if run.done():
results = run.outputs()

Log Readiness

Logs may not be immediately available the moment an action starts. If you need to ensure logs are ready before streaming, use the wait_for="logs-ready" parameter on the underlying Action object:

run = Run.get("a7q2z9xl5b")
# Wait specifically until the logging backend has initialized for this run
run.action.wait(wait_for="logs-ready")
run.show_logs()

Interactive vs. Non-Interactive Output

The wait() and show_logs() methods use the rich library to provide interactive progress bars and log viewers. In CI/CD environments or non-interactive shells, flyte-sdk automatically falls back to plain text status updates. You can force a quieter output by passing quiet=True to wait().