Skip to main content

Caching & Versioning

Caching in flyte-sdk allows tasks to skip execution if they have been run previously with the same inputs and logic. This is managed through the Cache class, which defines how a task's version is calculated and how the cache should behave during execution.

Cache Behaviors

The Cache class in flyte._cache.cache supports three primary behaviors that determine how a task's version is derived:

  • auto: The default behavior. flyte-sdk automatically generates a version by hashing the task's source code. Any change to the function body will invalidate the cache.
  • override: You provide a manual version string via version_override. The cache only invalidates when you manually update this string.
  • disable: Caching is completely disabled for the task, and it will execute every time.

You apply these configurations using the cache parameter in the @env.task decorator or via the .override() method on a task object.

from flyte import Cache, TaskEnvironment

env = TaskEnvironment(name="caching-example")

# Automatic versioning based on function body
@env.task(cache=Cache(behavior="auto"))
async def t1_auto(data: str) -> str:
return f"Processed {data}"

# Manual versioning
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def t2_manual(i: int) -> int:
return i * i

Automatic Versioning with FunctionBodyPolicy

When behavior="auto" is used, flyte-sdk employs the FunctionBodyPolicy (found in flyte._cache.policy_function_body) to generate a version string. This policy ensures that semantic changes to your code trigger a re-run, while non-functional changes (like comments or reformatting) do not.

The FunctionBodyPolicy.get_version method performs the following steps:

  1. Retrieves the source code using inspect.getsource.
  2. Parses the source into an Abstract Syntax Tree (AST) using ast.parse.
  3. Dumps the AST into a string representation with include_attributes=False. This ignores line numbers and column offsets, ensuring that moving a function within a file or changing whitespace doesn't invalidate the cache.
  4. Combines the AST string with an optional salt and produces a SHA-256 hash.
# Internal implementation detail of FunctionBodyPolicy
source = inspect.getsource(params.func)
dedented_source = textwrap.dedent(source)
parsed_ast = ast.parse(dedented_source)
ast_bytes = ast.dump(parsed_ast, include_attributes=False).encode("utf-8")
return hashlib.sha256(ast_bytes + salt.encode("utf-8")).hexdigest()

Cache Configuration Parameters

The Cache class provides several parameters to fine-tune caching behavior:

Input Exclusion

The ignored_inputs parameter allows you to specify input names that should not be part of the cache key. This is useful for parameters that change frequently but do not affect the output, such as timestamps or request IDs.

@env.task(cache=Cache(behavior="override", version_override="v2", ignored_inputs="request_id"))
async def process_data(request_id: str, value: int) -> int:
# This task will return a cached result for the same 'value',
# regardless of what 'request_id' is passed.
return value + 1

Cache Serialization

When serialize=True is set, flyte-sdk ensures that if multiple concurrent executions of the same task occur with identical inputs, only one execution actually runs. The other executions wait for the first one to complete and then reuse its cached result. This prevents "thundering herd" problems for expensive computations.

Cache Salting

The salt parameter provides a way to globally invalidate caches or create separate cache namespaces. If you change the salt, the resulting version hash for all tasks using that Cache object will change, effectively clearing the old cache.

Custom Cache Policies

For advanced use cases, you can implement the CachePolicy protocol defined in flyte._cache.cache. A policy must implement the get_version method, which receives VersionParameters containing the function, image, and code bundle context.

import hashlib
from flyte._cache.cache import VersionParameters, CachePolicy

class GitHashPolicy:
"""
A custom policy that uses the current Git commit hash as the version.
"""
def get_version(self, salt: str, params: VersionParameters) -> str:
import subprocess
git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
combined = f"{salt}{git_hash}".encode()
return hashlib.sha256(combined).hexdigest()

# Usage
@env.task(cache=Cache(behavior="auto", policies=[GitHashPolicy()]))
async def git_versioned_task(x: int) -> int:
return x + 1

The VersionParameters object passed to the policy includes:

  • func: The callable being executed.
  • image: The container image (string or Image object) associated with the task.
  • code_bundle: A CodeBundle object if the code is being deployed as a bundle.

If multiple policies are provided in the policies list, flyte-sdk concatenates the version strings produced by each policy and hashes the result to produce the final task version.