Core Execution Framework
The core execution framework in flyte-sdk provides the primitives for defining where and how your code runs. It centers around the TaskEnvironment, which groups tasks and defines their default execution settings, and the Resources class, which specifies the hardware requirements for those tasks.
Task Environments
A TaskEnvironment acts as a container for configuration. When you define multiple tasks that share the same Docker image, resource requirements, or secrets, you group them within an environment to avoid repetitive configuration.
Defining an Environment
You create an environment by instantiating TaskEnvironment in src/flyte/_task_environment.py. Every environment requires a name, which serves as a prefix for all tasks defined within it.
import flyte
# Define a base environment with shared resources
base_env = flyte.TaskEnvironment(
name="data-processing",
image="ghcr.io/flyteorg/flyte-worker:v1.0.0",
resources=flyte.Resources(cpu=2, memory="4Gi"),
)
@base_env.task
async def process_data(data: list[int]) -> int:
return sum(data)
In this example, the task's fully-qualified name (FQN) becomes data-processing.process_data. This naming convention ensures that tasks are uniquely identified across different environments.
Environment Inheritance
If you need a specialized version of an existing environment—for example, one that adds a GPU or changes the cache policy—use the clone_with method. This creates a new environment that inherits all settings from the parent unless explicitly overridden.
# Create a GPU-enabled version of the base environment
gpu_env = base_env.clone_with(
name="data-processing-gpu",
resources=flyte.Resources(cpu=4, memory="16Gi", gpu="T4:1"),
cache="auto"
)
Defining Tasks
Tasks are defined by decorating Python functions with the @env.task decorator. flyte-sdk supports both synchronous and asynchronous functions.
Sync vs. Async Tasks
While flyte-sdk supports standard synchronous functions, async functions are preferred for high-concurrency scenarios, especially when using container reuse.
@base_env.task
def sync_task(x: int) -> int:
return x + 1
@base_env.task
async def async_task(x: int) -> int:
return x + 1
Internally, the decorator converts your function into an AsyncFunctionTaskTemplate (found in src/flyte/_task.py). If you provide a synchronous function, flyte-sdk wraps it to ensure it can be executed within the internal async execution loop using run_sync_with_loop.
Task-Level Overrides
The @env.task decorator allows you to override environment-level defaults for a specific task. Common overrides include cache, retries, and timeout.
@base_env.task(cache="auto", retries=3, timeout=600)
async def flaky_task(x: int) -> int:
...
Compute Resources
The Resources class in src/flyte/_resources.py defines the hardware requirements for a task. It supports standard Kubernetes units for CPU and memory, as well as specialized accelerators.
Basic Resources
You can specify CPU and memory as integers, floats, or strings with units.
resources = flyte.Resources(
cpu=2, # 2 cores
memory="4Gi", # 4 Gibibytes
disk="10Gi", # 10 GiB ephemeral storage
shm="2Gi" # 2 GiB shared memory (/dev/shm)
)
For CPU and memory, you can also provide a tuple to specify a (request, limit) range:
Resources(cpu=(1, 4), memory=("2Gi", "8Gi")).
Accelerators (GPU and TPU)
flyte-sdk provides first-class support for various accelerators. You can request them using simple strings or the GPU and TPU helper functions for advanced partitioning (MIG).
# Simple GPU request
gpu_res = flyte.Resources(gpu="A100:1")
# Advanced GPU with partitioning (MIG)
mig_res = flyte.Resources(
gpu=flyte.GPU(device="A100", quantity=1, partition="1g.5gb")
)
# TPU request with specific slice
tpu_res = flyte.Resources(
gpu=flyte.TPU(device="V5P", partition="2x2x1")
)
Supported GPU types include T4, L4, A100, H100, and more. TPU support includes V5P and V6E.
Performance Optimization with Container Reuse
When task initialization (like loading large ML models) is expensive, you can use ReusePolicy to keep containers "warm" between executions. This significantly reduces cold-start overhead.
Configuring Reuse
Pass a ReusePolicy to the TaskEnvironment to enable reuse.
env = flyte.TaskEnvironment(
name="ml-inference",
reusable=flyte.ReusePolicy(
replicas=(1, 5), # Min 1, Max 5 replicas
concurrency=2, # 2 concurrent tasks per replica
idle_ttl=300 # Shut down after 5 minutes of inactivity
)
)
Important Constraints
- Concurrency:
concurrency > 1is only supported forasynctasks. Synchronous tasks are limited to a concurrency of 1 per replica. - Starvation: It is highly recommended to use at least 2 replicas (
replicas=2orreplicas=(2, X)) to prevent starvation, where a parent task occupies the only available replica and cannot schedule its own child tasks. - Override Restrictions: When an environment is
reusable, you cannot overrideresources,env_vars, orsecretsat the call-site unless you also disable reuse for that specific call by passingreusable="off".
Dynamic Execution Overrides
Sometimes you need to change a task's requirements at the moment you call it—for example, retrying a failed task with more memory. The task.override() method creates a new template with modified settings.
async def run_with_retry(x: int):
try:
await process_data(data=[x])
except flyte.errors.OOMError:
# Retry with more memory and caching disabled
await process_data.override(
resources=flyte.Resources(memory="16Gi"),
cache="disable"
)(data=[x])
The override method (implemented in TaskTemplate.override in src/flyte/_task.py) returns a new instance of the task template. Note that certain attributes like name, image, and the function interface cannot be overridden.
Sync-to-Async Migration
If you are migrating legacy synchronous tasks to an asynchronous parent task, use the .aio() method. This allows you to call a synchronous task and receive an awaitable, making it compatible with asyncio.gather.
@env.task
def legacy_sync_task(x: int) -> int:
return x * 2
@env.task
async def modern_parent_task(inputs: list[int]):
# Call sync tasks as if they were async
tasks = [legacy_sync_task.aio(x) for x in inputs]
results = await asyncio.gather(*tasks)
return results
The .aio() method ensures that even synchronous tasks behave like coroutines when invoked, facilitating a smooth transition to asynchronous patterns within flyte-sdk.