Skip to main content

Remote Task Discovery and Overrides

When you need to use a task defined in a different project or environment, or when a task fails due to insufficient resources like memory, flyte-sdk provides mechanisms to discover remote tasks and modify their execution parameters at runtime.

Discovering Remote Tasks

You can locate tasks on a Flyte cluster using the Task class. This is useful for referencing existing logic without re-implementing it in your current project.

Fetching a Specific Task

Use Task.get() to retrieve a reference to a remote task. By default, this returns a LazyEntity, which defers the actual network request until you call the task or explicitly fetch its details.

import flyte.remote

# Fetch a task by name and specific version
my_task = flyte.remote.Task.get(
name="my_project.tasks.compute_stats",
project="flytesnacks",
domain="development",
version="v1"
)

# Use auto-versioning to get the latest version
latest_task = flyte.remote.Task.get(
name="my_project.tasks.compute_stats",
auto_version="latest"
)

The auto_version parameter supports two modes:

  • "latest": Queries the Flyte service for the most recently created version of the task.
  • "current": Derives the version from the current execution context. This is only valid when called from within a running Flyte task.

Listing Available Tasks

If you need to browse tasks within a project and domain, use Task.listall(). This method returns an iterator of Task objects containing basic metadata.

import flyte.remote

# List all tasks in a project
tasks = flyte.remote.Task.listall(project="flytesnacks", domain="development")

for task in tasks:
print(f"Found task: {task.name} (version: {task.version})")
print(f"Console URL: {task.url}")

Internally, listall communicates with the TaskService via the list_tasks gRPC method, applying filters for project, domain, and optionally task name prefixes.

Inspecting Task Details

While a Task object provides basic identification, TaskDetails contains the full specification, including the interface (inputs/outputs), resource requirements, and security context.

To access these details from a LazyEntity (the object returned by Task.get), call .fetch():

# Fetch the full TaskDetails
details = await my_task.fetch.aio()

print(f"Task Type: {details.task_type}")
print(f"Required Inputs: {details.required_args}")
print(f"Resources (Requests, Limits): {details.resources}")
print(f"Secrets required: {details.secrets}")

The TaskDetails.interface property uses flyte.types.guess_interface to parse the underlying protobuf definition into a NativeInterface, allowing you to programmatically inspect the expected types.

Applying Runtime Overrides

One of the most powerful features of flyte-sdk is the ability to override task properties at the call site. This is commonly used for OOM (Out of Memory) recovery or adjusting timeouts for specific datasets.

Resource and Secret Overrides

You can use the .override() method on either a LazyEntity or a TaskDetails object. This method returns a new instance with the updated properties.

import flyte

# Adjust resources for a specific call
tuned_task = my_task.override(
resources=flyte.Resources(cpu=2, memory="4Gi"),
timeout=flyte.Timeout(minutes=10)
)

# Provide secrets at runtime
secured_task = my_task.override(
secrets=flyte.SecretRequest(key="api_key", group="my_secrets")
)

Cache Overrides

You can also modify caching behavior. Note that for remote tasks, cache.behavior must be set to either "disable" or "override". If using "override", you must provide a version_override.

# Disable caching for a specific execution
no_cache_task = my_task.override(
cache=flyte.Cache(behavior="disable")
)

Executing Remote Tasks

Remote tasks in flyte-sdk are designed to be executed on the Flyte cluster, not locally. When you call a remote task, the flyte-sdk checks if it is running within a task context.

Keyword Argument Requirement

Remote tasks do not support positional arguments. You must pass all inputs as keyword arguments. This is enforced in TaskDetails.__call__, which raises a RemoteTaskUsageError if positional arguments are detected.

# Correct usage
result = await my_task(x=10, y=20)

# Incorrect usage - will raise RemoteTaskUsageError
# result = await my_task(10, 20)

Execution Context

If you attempt to call a remote task outside of a Flyte execution (e.g., in a local script), flyte-sdk will raise an error. Internally, the __call__ method retrieves the get_controller() and uses submit_task_ref to delegate the execution to the Flyte engine.

@env.task
async def workflow_task(val: int) -> int:
# This works because it's inside a task context
remote_task = flyte.remote.Task.get("other_project.task", auto_version="latest")
return await remote_task(input_val=val)

When TaskDetails.__call__ is invoked, it verifies that all required_args (calculated by comparing the interface against default_inputs) are provided in the kwargs. If any are missing, it raises a ValueError before attempting to submit the task.