Skip to main content

Data Hashing and Cache Integrity

Flyte uses hashing to ensure cache integrity by generating unique keys for task outputs. When a task is configured with cache=True, Flyte compares the hash of the input values and the task version to determine if a cached result can be reused. For complex data types like DataFrames, flyte-sdk provides specialized mechanisms to compute hashes based on content rather than just metadata, ensuring that changes to the underlying data correctly invalidate the cache.

The Hashing Protocol

The foundation of hashing in flyte-sdk is the HashMethod protocol, defined in src/flyte/io/_hashing_io.py. This protocol requires two methods:

  • update(data): Updates the internal state with new data.
  • result(): Returns the final hash string.

The HashFunction class is a concrete implementation of this protocol that wraps a user-provided callable. This allows developers to define custom hashing logic for any data type.

from flyte.io import HashFunction
import hashlib

def my_custom_hash(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()

hash_fn = HashFunction.from_fn(my_custom_hash)
hash_fn.update(b"example data")
print(hash_fn.result())

Content-Based Hashing for DataFrames

DataFrames often require content-based hashing because their file metadata (like timestamps) might change even if the data remains identical. flyte-sdk supports two primary ways to apply custom hashing to DataFrames.

Using Annotated Types

For raw dataframe types like pandas.DataFrame or polars.DataFrame, you can use the Annotated type hint to associate a HashFunction with the type. The DataFrameTransformerEngine in src/flyte/io/_dataframe/dataframe.py extracts this function during serialization to compute the hash.

import pandas as pd
from typing import Annotated
from flyte.io import HashFunction

def hash_pandas_dataframe(df: pd.DataFrame) -> str:
# Compute a sum of hashes for all objects in the dataframe
return str(pd.util.hash_pandas_object(df).sum())

# Define a reusable hashed type
HashedPandasDataFrame = Annotated[
pd.DataFrame,
HashFunction.from_fn(hash_pandas_dataframe)
]

@task(cache=True, cache_version="1.0")
def produce_dataframe() -> HashedPandasDataFrame:
return pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})

Explicit Hashing with flyte.io.DataFrame

When using the flyte.io.DataFrame abstraction, you can pass a HashFunction or a precomputed string directly to the from_local or from_local_sync methods.

from flyte.io import DataFrame, HashFunction

@task(cache=True, cache_version="1.0")
async def produce_explicit_hash() -> DataFrame:
df = pd.DataFrame({"data": [1, 2, 3]})
hash_method = HashFunction.from_fn(hash_pandas_dataframe)

# The hash is computed immediately and stored in the DataFrame object
return await DataFrame.from_local(df, hash_method=hash_method)

Cache Integrity and the Internal Hash Cache

A significant challenge in distributed systems is maintaining hash consistency when objects are passed between tasks. If a task receives a raw pd.DataFrame and returns it, the hash information might be lost if the input type wasn't annotated.

To solve this, flyte-sdk implements an internal _dataframe_hash_cache in src/flyte/io/_dataframe/dataframe.py. This cache uses id(python_val) as a key and stores a tuple of a weakref to the object and its computed hash.

# Internal cache structure in src/flyte/io/_dataframe/dataframe.py
_dataframe_hash_cache: Dict[int, Tuple[weakref.ref, str]] = {}

When DataFrameTransformerEngine.to_python_value deserializes a literal into a raw dataframe, it stores the hash in this cache. Later, when to_literal is called on that same object instance, it retrieves the hash from the cache even if the type signature doesn't explicitly request hashing. This ensures that the "identity" of the data is preserved across task boundaries within the same process.

Constraints and Limitations

  • Weak References: The cache relies on weakref. If a dataframe type does not support weak references (e.g., it uses __slots__ without __weakref__), the hash cannot be cached.
  • Local Execution: Caching is a feature of the Flyte platform. While flyte-sdk computes these hashes locally, cache hits and misses only occur when running on a remote Flyte cluster.

Hashing during I/O Operations

For streaming data or large files, flyte-sdk provides HashingWriter and HashingReader in src/flyte/io/_hashing_io.py. These classes wrap standard file-like objects and update a HashMethod accumulator on every read or write operation. This allows Flyte to compute hashes on-the-fly without needing to load the entire dataset into memory twice.

from flyte.io._hashing_io import HashingWriter, HashlibAccumulator
import hashlib

# Wrap a file handle to compute a SHA256 hash while writing
with open("data.bin", "wb") as f:
acc = HashlibAccumulator(hashlib.sha256())
writer = HashingWriter(f, acc)
writer.write(b"some large data")

print(f"Hash: {writer.result()}")