Skip to main content

Task Resilience & Reliability

Distributed tasks often encounter transient failures—like network blips or temporary resource unavailability—or may hang indefinitely due to external service delays. flyte-sdk provides RetryStrategy and Timeout to manage these scenarios, ensuring that tasks are resilient and do not consume resources longer than necessary.

Retrying Failed Tasks

When a task fails due to a non-system error (e.g., an exception in your code or a transient network error), flyte-sdk can automatically retry it. You can configure retries using either a simple integer or the RetryStrategy class.

Simple Retries

The most common way to enable retries is by passing an integer to the retries parameter of the @task decorator.

from flyte import task

@task(retries=3)
async def flaky_service_call():
# This task will be retried up to 3 times if it fails
...

Explicit Retry Strategy

For more explicit configuration, use the RetryStrategy class from src/flyte/_retry.py. While the class is designed to eventually support backoff parameters, the current implementation in flyte-sdk focuses on the retry count.

from flyte import task, RetryStrategy

@task(retries=RetryStrategy(count=5))
def robust_task():
...

Internally, if you provide an integer to the @task decorator, TaskTemplate.__post_init__ in src/flyte/_task.py automatically converts it into a RetryStrategy object:

# From src/flyte/_task.py
if isinstance(self.retries, int):
self.retries = RetryStrategy(count=self.retries)

Managing Execution Timeouts

Timeouts prevent tasks from running indefinitely and blocking resources. flyte-sdk allows you to limit both the actual execution time and the time a task spends waiting in a queue.

Setting Execution Limits

You can define a timeout using an integer (representing seconds), a timedelta object, or a Timeout object.

from datetime import timedelta
from flyte import task, Timeout

# Using seconds (int)
@task(timeout=60)
async def quick_task():
...

# Using timedelta for readability
@task(timeout=timedelta(minutes=5))
async def standard_task():
...

# Using the Timeout class for granular control
@task(timeout=Timeout(max_runtime=timedelta(minutes=10), max_queued_time=600))
async def resource_heavy_task():
...

Runtime vs. Queued Time

The Timeout class in src/flyte/_timeout.py distinguishes between two types of limits:

  • max_runtime: The maximum duration a single execution attempt is allowed to run.
  • max_queued_time: The maximum time a task can remain in the "queued" state before it is canceled. This is particularly useful for tasks requiring specific resources that might be unavailable.

When you pass an int or timedelta directly to the decorator, flyte-sdk uses the timeout_from_request helper to map that value to max_runtime:

# From src/flyte/_timeout.py
def timeout_from_request(timeout: TimeoutType) -> Timeout:
if isinstance(timeout, Timeout):
return timeout
else:
if isinstance(timeout, int):
timeout = timedelta(seconds=timeout)
# ...
return Timeout(max_runtime=timeout)

Dynamic Overrides

You may want to reuse the same task definition but apply different resilience settings depending on the context (e.g., a production workflow vs. a test workflow). The TaskTemplate returned by the @task decorator provides an .override() method for this purpose.

from flyte import task, RetryStrategy

@task(retries=1)
async def my_task(x: int):
...

# Create a version of the task with more retries and a specific timeout
resilient_task = my_task.override(
retries=RetryStrategy(count=5),
timeout=300
)

The .override() method in src/flyte/_task.py creates a new instance of the task template with the updated parameters, leaving the original task definition unchanged. This is also supported on TaskDetails objects when interacting with tasks via the Flyte remote API.

Best Practices

  • Prefer timedelta: Use datetime.timedelta for timeouts longer than a few seconds to make your code more readable (e.g., timedelta(minutes=30) is clearer than 1800).
  • Set max_queued_time for Heavy Tasks: If your task requires expensive resources (like large GPUs), set a max_queued_time to ensure the task fails fast if the cluster is over-capacity, rather than waiting indefinitely.
  • Default Behavior: By default, retries and timeout are set to 0 in TaskEnvironment.task, which typically means no retries and no specific timeout limit is enforced by the SDK.