Task Resilience & Reliability
Distributed tasks often encounter transient failures—like network blips or temporary resource unavailability—or may hang indefinitely due to external service delays. flyte-sdk provides RetryStrategy and Timeout to manage these scenarios, ensuring that tasks are resilient and do not consume resources longer than necessary.
Retrying Failed Tasks
When a task fails due to a non-system error (e.g., an exception in your code or a transient network error), flyte-sdk can automatically retry it. You can configure retries using either a simple integer or the RetryStrategy class.
Simple Retries
The most common way to enable retries is by passing an integer to the retries parameter of the @task decorator.
from flyte import task
@task(retries=3)
async def flaky_service_call():
# This task will be retried up to 3 times if it fails
...
Explicit Retry Strategy
For more explicit configuration, use the RetryStrategy class from src/flyte/_retry.py. While the class is designed to eventually support backoff parameters, the current implementation in flyte-sdk focuses on the retry count.
from flyte import task, RetryStrategy
@task(retries=RetryStrategy(count=5))
def robust_task():
...
Internally, if you provide an integer to the @task decorator, TaskTemplate.__post_init__ in src/flyte/_task.py automatically converts it into a RetryStrategy object:
# From src/flyte/_task.py
if isinstance(self.retries, int):
self.retries = RetryStrategy(count=self.retries)
Managing Execution Timeouts
Timeouts prevent tasks from running indefinitely and blocking resources. flyte-sdk allows you to limit both the actual execution time and the time a task spends waiting in a queue.
Setting Execution Limits
You can define a timeout using an integer (representing seconds), a timedelta object, or a Timeout object.
from datetime import timedelta
from flyte import task, Timeout
# Using seconds (int)
@task(timeout=60)
async def quick_task():
...
# Using timedelta for readability
@task(timeout=timedelta(minutes=5))
async def standard_task():
...
# Using the Timeout class for granular control
@task(timeout=Timeout(max_runtime=timedelta(minutes=10), max_queued_time=600))
async def resource_heavy_task():
...
Runtime vs. Queued Time
The Timeout class in src/flyte/_timeout.py distinguishes between two types of limits:
max_runtime: The maximum duration a single execution attempt is allowed to run.max_queued_time: The maximum time a task can remain in the "queued" state before it is canceled. This is particularly useful for tasks requiring specific resources that might be unavailable.
When you pass an int or timedelta directly to the decorator, flyte-sdk uses the timeout_from_request helper to map that value to max_runtime:
# From src/flyte/_timeout.py
def timeout_from_request(timeout: TimeoutType) -> Timeout:
if isinstance(timeout, Timeout):
return timeout
else:
if isinstance(timeout, int):
timeout = timedelta(seconds=timeout)
# ...
return Timeout(max_runtime=timeout)
Dynamic Overrides
You may want to reuse the same task definition but apply different resilience settings depending on the context (e.g., a production workflow vs. a test workflow). The TaskTemplate returned by the @task decorator provides an .override() method for this purpose.
from flyte import task, RetryStrategy
@task(retries=1)
async def my_task(x: int):
...
# Create a version of the task with more retries and a specific timeout
resilient_task = my_task.override(
retries=RetryStrategy(count=5),
timeout=300
)
The .override() method in src/flyte/_task.py creates a new instance of the task template with the updated parameters, leaving the original task definition unchanged. This is also supported on TaskDetails objects when interacting with tasks via the Flyte remote API.
Best Practices
- Prefer
timedelta: Usedatetime.timedeltafor timeouts longer than a few seconds to make your code more readable (e.g.,timedelta(minutes=30)is clearer than1800). - Set
max_queued_timefor Heavy Tasks: If your task requires expensive resources (like large GPUs), set amax_queued_timeto ensure the task fails fast if the cluster is over-capacity, rather than waiting indefinitely. - Default Behavior: By default,
retriesandtimeoutare set to0inTaskEnvironment.task, which typically means no retries and no specific timeout limit is enforced by the SDK.