Skip to main content

Core Execution Framework

The core execution framework in flyte-sdk provides the primitives for defining where and how your code runs. It centers around the TaskEnvironment, which groups tasks and defines their default execution settings, and the Resources class, which specifies the hardware requirements for those tasks.

Task Environments

A TaskEnvironment acts as a container for configuration. When you define multiple tasks that share the same Docker image, resource requirements, or secrets, you group them within an environment to avoid repetitive configuration.

Defining an Environment

You create an environment by instantiating TaskEnvironment in src/flyte/_task_environment.py. Every environment requires a name, which serves as a prefix for all tasks defined within it.

import flyte

# Define a base environment with shared resources
base_env = flyte.TaskEnvironment(
name="data-processing",
image="ghcr.io/flyteorg/flyte-worker:v1.0.0",
resources=flyte.Resources(cpu=2, memory="4Gi"),
)

@base_env.task
async def process_data(data: list[int]) -> int:
return sum(data)

In this example, the task's fully-qualified name (FQN) becomes data-processing.process_data. This naming convention ensures that tasks are uniquely identified across different environments.

Environment Inheritance

If you need a specialized version of an existing environment—for example, one that adds a GPU or changes the cache policy—use the clone_with method. This creates a new environment that inherits all settings from the parent unless explicitly overridden.

# Create a GPU-enabled version of the base environment
gpu_env = base_env.clone_with(
name="data-processing-gpu",
resources=flyte.Resources(cpu=4, memory="16Gi", gpu="T4:1"),
cache="auto"
)

Defining Tasks

Tasks are defined by decorating Python functions with the @env.task decorator. flyte-sdk supports both synchronous and asynchronous functions.

Sync vs. Async Tasks

While flyte-sdk supports standard synchronous functions, async functions are preferred for high-concurrency scenarios, especially when using container reuse.

@base_env.task
def sync_task(x: int) -> int:
return x + 1

@base_env.task
async def async_task(x: int) -> int:
return x + 1

Internally, the decorator converts your function into an AsyncFunctionTaskTemplate (found in src/flyte/_task.py). If you provide a synchronous function, flyte-sdk wraps it to ensure it can be executed within the internal async execution loop using run_sync_with_loop.

Task-Level Overrides

The @env.task decorator allows you to override environment-level defaults for a specific task. Common overrides include cache, retries, and timeout.

@base_env.task(cache="auto", retries=3, timeout=600)
async def flaky_task(x: int) -> int:
...

Compute Resources

The Resources class in src/flyte/_resources.py defines the hardware requirements for a task. It supports standard Kubernetes units for CPU and memory, as well as specialized accelerators.

Basic Resources

You can specify CPU and memory as integers, floats, or strings with units.

resources = flyte.Resources(
cpu=2, # 2 cores
memory="4Gi", # 4 Gibibytes
disk="10Gi", # 10 GiB ephemeral storage
shm="2Gi" # 2 GiB shared memory (/dev/shm)
)

For CPU and memory, you can also provide a tuple to specify a (request, limit) range: Resources(cpu=(1, 4), memory=("2Gi", "8Gi")).

Accelerators (GPU and TPU)

flyte-sdk provides first-class support for various accelerators. You can request them using simple strings or the GPU and TPU helper functions for advanced partitioning (MIG).

# Simple GPU request
gpu_res = flyte.Resources(gpu="A100:1")

# Advanced GPU with partitioning (MIG)
mig_res = flyte.Resources(
gpu=flyte.GPU(device="A100", quantity=1, partition="1g.5gb")
)

# TPU request with specific slice
tpu_res = flyte.Resources(
gpu=flyte.TPU(device="V5P", partition="2x2x1")
)

Supported GPU types include T4, L4, A100, H100, and more. TPU support includes V5P and V6E.

Performance Optimization with Container Reuse

When task initialization (like loading large ML models) is expensive, you can use ReusePolicy to keep containers "warm" between executions. This significantly reduces cold-start overhead.

Configuring Reuse

Pass a ReusePolicy to the TaskEnvironment to enable reuse.

env = flyte.TaskEnvironment(
name="ml-inference",
reusable=flyte.ReusePolicy(
replicas=(1, 5), # Min 1, Max 5 replicas
concurrency=2, # 2 concurrent tasks per replica
idle_ttl=300 # Shut down after 5 minutes of inactivity
)
)

Important Constraints

  • Concurrency: concurrency > 1 is only supported for async tasks. Synchronous tasks are limited to a concurrency of 1 per replica.
  • Starvation: It is highly recommended to use at least 2 replicas (replicas=2 or replicas=(2, X)) to prevent starvation, where a parent task occupies the only available replica and cannot schedule its own child tasks.
  • Override Restrictions: When an environment is reusable, you cannot override resources, env_vars, or secrets at the call-site unless you also disable reuse for that specific call by passing reusable="off".

Dynamic Execution Overrides

Sometimes you need to change a task's requirements at the moment you call it—for example, retrying a failed task with more memory. The task.override() method creates a new template with modified settings.

async def run_with_retry(x: int):
try:
await process_data(data=[x])
except flyte.errors.OOMError:
# Retry with more memory and caching disabled
await process_data.override(
resources=flyte.Resources(memory="16Gi"),
cache="disable"
)(data=[x])

The override method (implemented in TaskTemplate.override in src/flyte/_task.py) returns a new instance of the task template. Note that certain attributes like name, image, and the function interface cannot be overridden.

Sync-to-Async Migration

If you are migrating legacy synchronous tasks to an asynchronous parent task, use the .aio() method. This allows you to call a synchronous task and receive an awaitable, making it compatible with asyncio.gather.

@env.task
def legacy_sync_task(x: int) -> int:
return x * 2

@env.task
async def modern_parent_task(inputs: list[int]):
# Call sync tasks as if they were async
tasks = [legacy_sync_task.aio(x) for x in inputs]
results = await asyncio.gather(*tasks)
return results

The .aio() method ensures that even synchronous tasks behave like coroutines when invoked, facilitating a smooth transition to asynchronous patterns within flyte-sdk.