Inspecting Actions and Execution Data
In flyte-sdk, a Run represents the execution of a workflow, while an Action represents the execution of an individual task or node within that run. When a task fails or takes longer than expected, you can use the Action and ActionDetails classes to inspect granular timing, retrieve logs, and access the exact inputs and outputs of the execution.
Retrieving Actions
When you need to inspect a specific part of a run, you first retrieve the corresponding Action object. Actions are identified by their name (e.g., "a0", "a1") and the name of the run they belong to.
To get a specific action, use the Action.get method:
from flyte.remote import Action
# Retrieve a specific action by run name and action name
action = Action.get(run_name="my-run-id", name="a0")
print(f"Action: {action.name}, Task: {action.task_name}, Phase: {action.phase}")
If you don't know the specific action name, you can list all actions associated with a run using Action.listall:
# List all actions for a run that are in a terminal state
actions = Action.listall(for_run_name="my-run-id", in_phase=["SUCCEEDED", "FAILED"])
for action in actions:
print(f"{action.name}: {action.phase}")
Internally, Action.get calls ActionDetails.get_details.aio in src/flyte/remote/_action.py to fetch the full state from the remote service.
Monitoring Execution and Status
You can monitor the progress of an action in real-time. The Action.wait() method provides a visual progress indicator (using the rich library) that updates as the action transitions through phases.
# Block until the action reaches a terminal state
action.wait()
# Or wait for a specific milestone, like logs being ready
action.wait(wait_for="logs-ready")
The Action.phase property returns an ActionPhase enum (defined in flyte.models). Common phases include QUEUED, INITIALIZING, RUNNING, and terminal states like SUCCEEDED or FAILED.
Analyzing Performance with Phase Transitions
If an action is slow, you can analyze exactly where time was spent by inspecting its phase transitions. This data is stored in ActionDetails and provides a breakdown of every state change the action underwent.
# Get granular details about the execution
details = action.details()
# Access timing for the latest attempt
print(f"Time spent in queue: {details.queued_time}")
print(f"Time spent running: {details.running_time}")
# Iterate through all transitions for deep analysis
transitions = details.get_phase_transitions()
for t in transitions:
print(f"Phase: {t.phase}, Duration: {t.duration.total_seconds()}s")
The PhaseTransitionInfo class in src/flyte/remote/_action.py tracks the start_time and end_time for each phase. This is particularly useful for identifying if delays are caused by resource bottlenecks (long QUEUED time) or the task logic itself (long RUNNING time).
Accessing Inputs and Outputs
flyte-sdk allows you to retrieve the actual data processed by an action. Inputs are available as soon as the action starts, while outputs are only available once the action reaches the SUCCEEDED state.
ActionInputs
ActionInputs behaves like a dictionary. When you call action.inputs(), flyte-sdk lazily fetches the data via the DataProxyService.
inputs = action.inputs()
# Access by key
val = inputs["my_input_key"]
print(f"Input data: {inputs}")
ActionOutputs
ActionOutputs behaves like a tuple but also provides named access. If you attempt to access outputs before the action is done, flyte-sdk raises a RuntimeError.
if action.done():
outputs = action.outputs()
# Access as a tuple
first_result = outputs[0]
# Access by auto-generated names (e.g., o0, o1)
print(outputs.named_outputs)
else:
print("Action is still running; outputs not yet available.")
The conversion from Flyte's internal literal format to native Python types is handled by convert_from_inputs_to_native and convert_outputs_to_native within the ActionDetails._cache_data method.
Retrieving Logs
To see the standard output or error logs of a task execution, use show_logs for an interactive viewer or get_logs to process log lines programmatically.
# Print the last 50 lines of logs to the console
action.show_logs(max_lines=50)
# Stream logs as they arrive
for line in action.get_logs():
print(f"LOG: {line}")
The get_logs method returns an AsyncGenerator (or a standard Iterator if called synchronously) that tails logs from the remote backend using the Logs.tail utility in src/flyte/remote/_logs.py.