Skip to main content

Execution Environments

When you have a group of tasks that share the same Docker image, compute resources, or secrets, repeating those definitions for every task is error-prone and verbose. The TaskEnvironment class in flyte-sdk provides a centralized way to manage these infrastructure settings, allowing you to define defaults once and apply them to multiple tasks.

Defining a Task Environment

A TaskEnvironment acts as a container for task definitions. It sets the baseline for where and how your code executes.

import flyte

# Define the environment with shared resources
env = flyte.TaskEnvironment(
name="data-processing",
image=flyte.Image.from_debian_base(python="3.12").with_pip_packages("pandas"),
resources=flyte.Resources(cpu="2", memory="4Gi"),
)

# Register a task within this environment
@env.task
async def process_data(data: dict) -> dict:
# This task inherits the 2 CPU and 4Gi memory settings
return {"status": "processed"}

Internally, TaskEnvironment inherits from the base Environment class (found in src/flyte/_environment.py), which handles the core infrastructure attributes like image, resources, env_vars, and secrets. When you use the @env.task decorator, the environment automatically generates a fully-qualified name (FQN) for the task using the pattern <env_name>.<function_name>.

Configuration Hierarchy

flyte-sdk implements a three-level override system for task configuration:

  1. TaskEnvironment: Sets the global defaults for all tasks in that environment.
  2. @env.task decorator: Overrides specific settings for a single task (e.g., retries, timeout).
  3. task.override(): Overrides settings at the specific point of invocation in a workflow.

The more specific level always takes precedence. For example, if the environment defines 2 CPUs but a task decorator specifies 4, the task will run with 4 CPUs.

Container Reuse and Cold Starts

When environment creation is expensive (e.g., large images or complex setup), you can use a ReusePolicy to keep containers "warm" between task invocations. This avoids the overhead of starting a new container for every execution.

env = flyte.TaskEnvironment(
name="fast-inference",
reusable=flyte.ReusePolicy(
replicas=(2, 5), # Auto-scale between 2 and 5 replicas
concurrency=10, # Support 10 concurrent tasks per replica
idle_ttl=60, # Shut down environment after 60s of total idleness
),
)

The ReusePolicy (defined in src/flyte/_reusable_environment.py) has several critical constraints:

  • Async Requirement: If concurrency is greater than 1, the tasks must be defined as async functions. Non-async tasks will raise a ValueError during registration if concurrency is high.
  • Starvation Prevention: flyte-sdk recommends a minimum of 2 replicas. This prevents "starvation" where a parent task occupies the only available replica and cannot schedule its own child tasks.
  • Override Restrictions: When reusable is enabled, you cannot override resources, env_vars, or secrets via task.override() unless you also pass reusable="off" in that same call.

Advanced Infrastructure Configuration

For complex Kubernetes requirements, you can attach a PodTemplate to an environment to configure sidecars, volumes, or specific node selectors.

from kubernetes.client.models import V1PodSpec, V1Container, V1EnvVar

pod_template = flyte.PodTemplate(
primary_container_name="primary",
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
env=[V1EnvVar(name="CUSTOM_VAR", value="flyte-val")]
)
],
),
)

env = flyte.TaskEnvironment(
name="k8s-specialized",
pod_template=pod_template,
)

If your environment depends on other environments being deployed (for example, a driver environment that schedules tasks into a worker environment), use the depends_on parameter. This ensures that the worker infrastructure is provisioned alongside the driver.

Sandboxed Execution

The sandbox namespace on a TaskEnvironment allows you to run code in a restricted execution environment with strict resource limits. This is useful for running untrusted code or ensuring a pipeline doesn't exceed specific memory or time bounds.

You access this via env.sandbox.orchestrator, which supports three modes:

  1. Bare Decorator:
    @env.sandbox.orchestrator
    def restricted_task(n: int) -> int:
    return n * 2
  2. Decorator Factory:
    @env.sandbox.orchestrator(timeout_ms=5000, max_memory=100*1024*1024)
    def heavy_task(data: list) -> dict:
    ...
  3. Code String:
    # Dynamically create a task from a string
    dynamic_task = env.sandbox.orchestrator(
    "x + y",
    inputs={"x": int, "y": int},
    output=int
    )

The _SandboxNamespace class (in src/flyte/_task_environment.py) manages these templates. By default, sandboxed tasks are limited to 50 MiB of memory and a 30-second timeout unless configured otherwise via SandboxedConfig.

Environment Composition

You can create new environments based on existing ones using clone_with(). This is useful for creating variations (e.g., a GPU version of a standard CPU environment) without repeating the base image or secret configuration.

base_env = flyte.TaskEnvironment(name="base", image="my-image:latest")

# Inherits image, but changes resources and name
gpu_env = base_env.clone_with(
name="base-gpu",
resources=flyte.Resources(gpu="1")
)

Alternatively, if you have a set of existing task templates and want to group them into a new environment, use TaskEnvironment.from_task(). This factory method validates that all tasks share the same container image before creating the environment.