Execution Environments
When you have a group of tasks that share the same Docker image, compute resources, or secrets, repeating those definitions for every task is error-prone and verbose. The TaskEnvironment class in flyte-sdk provides a centralized way to manage these infrastructure settings, allowing you to define defaults once and apply them to multiple tasks.
Defining a Task Environment
A TaskEnvironment acts as a container for task definitions. It sets the baseline for where and how your code executes.
import flyte
# Define the environment with shared resources
env = flyte.TaskEnvironment(
name="data-processing",
image=flyte.Image.from_debian_base(python="3.12").with_pip_packages("pandas"),
resources=flyte.Resources(cpu="2", memory="4Gi"),
)
# Register a task within this environment
@env.task
async def process_data(data: dict) -> dict:
# This task inherits the 2 CPU and 4Gi memory settings
return {"status": "processed"}
Internally, TaskEnvironment inherits from the base Environment class (found in src/flyte/_environment.py), which handles the core infrastructure attributes like image, resources, env_vars, and secrets. When you use the @env.task decorator, the environment automatically generates a fully-qualified name (FQN) for the task using the pattern <env_name>.<function_name>.
Configuration Hierarchy
flyte-sdk implements a three-level override system for task configuration:
- TaskEnvironment: Sets the global defaults for all tasks in that environment.
- @env.task decorator: Overrides specific settings for a single task (e.g.,
retries,timeout). - task.override(): Overrides settings at the specific point of invocation in a workflow.
The more specific level always takes precedence. For example, if the environment defines 2 CPUs but a task decorator specifies 4, the task will run with 4 CPUs.
Container Reuse and Cold Starts
When environment creation is expensive (e.g., large images or complex setup), you can use a ReusePolicy to keep containers "warm" between task invocations. This avoids the overhead of starting a new container for every execution.
env = flyte.TaskEnvironment(
name="fast-inference",
reusable=flyte.ReusePolicy(
replicas=(2, 5), # Auto-scale between 2 and 5 replicas
concurrency=10, # Support 10 concurrent tasks per replica
idle_ttl=60, # Shut down environment after 60s of total idleness
),
)
The ReusePolicy (defined in src/flyte/_reusable_environment.py) has several critical constraints:
- Async Requirement: If
concurrencyis greater than 1, the tasks must be defined asasyncfunctions. Non-async tasks will raise aValueErrorduring registration if concurrency is high. - Starvation Prevention: flyte-sdk recommends a minimum of 2 replicas. This prevents "starvation" where a parent task occupies the only available replica and cannot schedule its own child tasks.
- Override Restrictions: When
reusableis enabled, you cannot overrideresources,env_vars, orsecretsviatask.override()unless you also passreusable="off"in that same call.
Advanced Infrastructure Configuration
For complex Kubernetes requirements, you can attach a PodTemplate to an environment to configure sidecars, volumes, or specific node selectors.
from kubernetes.client.models import V1PodSpec, V1Container, V1EnvVar
pod_template = flyte.PodTemplate(
primary_container_name="primary",
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
env=[V1EnvVar(name="CUSTOM_VAR", value="flyte-val")]
)
],
),
)
env = flyte.TaskEnvironment(
name="k8s-specialized",
pod_template=pod_template,
)
If your environment depends on other environments being deployed (for example, a driver environment that schedules tasks into a worker environment), use the depends_on parameter. This ensures that the worker infrastructure is provisioned alongside the driver.
Sandboxed Execution
The sandbox namespace on a TaskEnvironment allows you to run code in a restricted execution environment with strict resource limits. This is useful for running untrusted code or ensuring a pipeline doesn't exceed specific memory or time bounds.
You access this via env.sandbox.orchestrator, which supports three modes:
- Bare Decorator:
@env.sandbox.orchestrator
def restricted_task(n: int) -> int:
return n * 2 - Decorator Factory:
@env.sandbox.orchestrator(timeout_ms=5000, max_memory=100*1024*1024)
def heavy_task(data: list) -> dict:
... - Code String:
# Dynamically create a task from a string
dynamic_task = env.sandbox.orchestrator(
"x + y",
inputs={"x": int, "y": int},
output=int
)
The _SandboxNamespace class (in src/flyte/_task_environment.py) manages these templates. By default, sandboxed tasks are limited to 50 MiB of memory and a 30-second timeout unless configured otherwise via SandboxedConfig.
Environment Composition
You can create new environments based on existing ones using clone_with(). This is useful for creating variations (e.g., a GPU version of a standard CPU environment) without repeating the base image or secret configuration.
base_env = flyte.TaskEnvironment(name="base", image="my-image:latest")
# Inherits image, but changes resources and name
gpu_env = base_env.clone_with(
name="base-gpu",
resources=flyte.Resources(gpu="1")
)
Alternatively, if you have a set of existing task templates and want to group them into a new environment, use TaskEnvironment.from_task(). This factory method validates that all tasks share the same container image before creating the environment.