Execution Controllers & Local Runtime
When you execute a Flyte task locally or remotely, the flyte-sdk uses specialized controllers to manage the lifecycle of that execution. These controllers abstract away the complexities of environment-specific task submission, input/output handling, and state tracking through a unified Controller protocol.
The Controller Protocol
The Controller protocol, defined in src/flyte/_internal/controllers/__init__.py, establishes the interface for all execution engines. It requires implementations to provide:
submit: An asynchronous method to submit a task and wait for its result.submit_sync: A synchronous wrapper that returns aconcurrent.futures.Future.finalize_parent_action: A cleanup method called after a parent task completes.record_trace: A method to record execution traces for observability.
Local Execution with LocalController
The LocalController (found in src/flyte/_internal/controllers/_local_controller.py) is the primary engine for local development. It manages task execution within the same process or across local threads, providing features like caching and automatic retries.
Task Lifecycle and Determinism
When a task is submitted to the LocalController, it uses a TaskCallSequencer to ensure deterministic execution IDs. This is critical for nested tasks and loops where the same task might be called multiple times. The controller generates a sub_action_id based on the task name, input hash, and the sequence number provided by the sequencer.
Caching and Persistence
The LocalController integrates with LocalTaskCache to avoid redundant computations.
- Cache Lookup: Before execution, it generates a
cache_keyusing the task name, input hash, and cache version. Iftask_cache.behavior == "auto", it attempts to retrieve results from the cache. - Recording: It uses a
RunRecorderto track the start, attempts, and completion of actions. This data is used to populate the local execution UI (TUI).
Retry Logic
Local tasks support retries with exponential backoff. The controller implements a retry loop that catches recoverable errors:
# From LocalController.submit
for attempt_num in range(1, max_attempts + 1):
# ... record attempt start ...
out, err = await direct_dispatch(...)
if not err:
# ... record success ...
break
# ... record failure ...
if not err.recoverable:
break
if attempt_num < max_attempts:
backoff = _MIN_BACKOFF_ON_ERR_SEC * (_BACKOFF_MULTIPLIER ** (attempt_num - 1))
await asyncio.sleep(backoff)
Synchronous Execution Management
To handle synchronous task calls without blocking the main event loop, the LocalController maintains a _runner_map of _TaskRunner instances. Each runner operates its own event loop in a dedicated thread. The controller warns if more than 100 runners are created to prevent runaway recursion.
Remote Execution and the Core Engine
Remote execution involves a two-tier architecture: the RemoteController wrapper and the core Controller engine.
RemoteController Wrapper
The RemoteController (in src/flyte/_internal/controllers/remote/_controller.py) handles the high-level logic of preparing a task for the Flyte backend.
- Serialization: It translates local
TaskTemplateobjects into wire-format specs usingtranslate_task_to_wire. - IO Management: It uploads serialized inputs to the configured storage (e.g., S3/GCS) before submission and downloads outputs upon completion.
- Interactive Mode: If the SDK is in interactive mode, it automatically builds a new pickle bundle for the task using
build_pkl_bundle.
Core Remote Engine
The core Controller (in src/flyte/_internal/controllers/remote/_core.py) is the low-level async engine that manages the actual communication with Flyte services.
- Dedicated Threading: The engine runs in a background
ControllerThreadwith its ownasyncioevent loop to ensure that network IO and polling do not interfere with the user's main thread. - Worker Pool: It maintains a pool of workers (defaulting to 20) that process actions from a shared queue.
- Informer Cache: To avoid excessive polling, the engine uses an
InformerCache. This cache tracks the state of remote actions and provides a mechanism to wait for completion viawait_for_action_completion. - Rate Limiting: All remote launches and cancellations are governed by an
AsyncLimiter(defaulting to 100 QPS) to protect the Flyte control plane from bursts of requests.
Configuration Parameters
The remote engine's behavior is tunable via environment variables:
_F_P_CNC: Default concurrency for parent actions (default: 1000)._F_MAX_RETRIES: Maximum system retries for the controller (default: 10)._F_MAX_QPS: Maximum queries per second for the rate limiter (default: 100).
Deterministic Trace Recording
Both controllers implement record_trace to support Flyte's observability features. When a task or a "traceable" function is executed, the controller records the start time, end time, inputs, and outputs. In the RemoteController, this involves submitting a specialized trace action to the backend, which allows the Flyte UI to display a detailed execution graph even for complex, nested local logic.