Internal Serialization and Pickling
Flyte provides a robust type system that maps Python types to Flyte's internal IDL (Interface Definition Language). However, when flyte-sdk encounters a Python type that does not have a registered TypeTransformer, it uses a "safety net" mechanism called FlytePickle. This system ensures that arbitrary Python objects can still be passed between tasks, even if they aren't natively supported by Flyte's core types.
The Fallback Mechanism
The TypeEngine in flyte-sdk is responsible for resolving how a Python type should be serialized. In src/flyte/types/_type_engine.py, the get_transformer method implements a recursive search for a suitable transformer. If no transformer is found for a type (and it is not a dataclass), flyte-sdk falls back to the FlytePickleTransformer.
When this fallback occurs, flyte-sdk issues a warning via display_pickle_warning to alert the user that the type is unsupported and pickling is being used as a transport:
# src/flyte/types/_type_engine.py
@lru_cache
def display_pickle_warning(python_type: str):
# This is a warning that is only displayed once per python type
logger.warning(
f"Unsupported Type {python_type} found, Flyte will default to use PickleFile as the transport. "
f"Pickle can only be used to send objects between the exact same version of Python, "
f"and we strongly recommend to use python type that flyte support."
)
This mechanism also applies to the typing.Any type, which flyte-sdk treats as a signal to use the pickling system.
Serialization with cloudpickle
Flyte-sdk uses the cloudpickle library for serialization rather than the standard Python pickle module. cloudpickle is chosen because it can serialize a wider range of Python objects, including lambdas, functions defined interactively, and classes defined in the __main__ module.
The FlytePickle class in src/flyte/types/_pickle.py provides the core serialization logic:
# src/flyte/types/_pickle.py
@classmethod
async def to_pickle(cls, python_val: typing.Any) -> str:
h = hashlib.md5()
str_bytes = cloudpickle.dumps(python_val)
h.update(str_bytes)
uri = storage.get_random_local_path(file_path_or_file_name=h.hexdigest())
os.makedirs(os.path.dirname(uri), exist_ok=True)
async with aiofiles.open(uri, "w+b") as outfile:
await outfile.write(str_bytes)
return await storage.put(str(uri))
Storage Strategy: Inline vs. Offloaded
To optimize performance and minimize the size of the Flyte IDL, FlytePickleTransformer employs a dual storage strategy based on the size of the serialized object. This threshold is defined by DEFAULT_PICKLE_BYTES_LIMIT, which is set to 10KB (10,240 bytes).
- Inline Binary: If the object's size (as determined by
sys.getsizeof) is less than or equal to 10KB, it is stored directly within the FlyteLiteralas aBinaryscalar. - Offloaded Blob: If the object exceeds 10KB, it is serialized to a file, uploaded to the configured remote storage (e.g., S3, GCS), and the
Literalstores aBlobscalar containing the URI.
This logic is implemented in the to_literal method of FlytePickleTransformer:
# src/flyte/types/_pickle.py
async def to_literal(
self,
python_val: T,
python_type: Type[T],
expected: types_pb2.LiteralType,
) -> literals_pb2.Literal:
if python_val is None:
raise AssertionError("Cannot pickle None Value.")
# ... metadata setup ...
if sys.getsizeof(python_val) > DEFAULT_PICKLE_BYTES_LIMIT:
remote_path = await FlytePickle.to_pickle(python_val)
return literals_pb2.Literal(
scalar=literals_pb2.Scalar(blob=literals_pb2.Blob(metadata=meta, uri=remote_path))
)
else:
return literals_pb2.Literal(
scalar=literals_pb2.Scalar(binary=literals_pb2.Binary(value=cloudpickle.dumps(python_val)))
)
Trade-offs and Constraints
While FlytePickle provides a convenient way to handle complex Python objects, it introduces several trade-offs that developers should consider:
- Python Version Dependency: Pickled objects are generally not compatible across different Python versions. If a producer task runs on Python 3.9 and a consumer task runs on Python 3.10, deserialization may fail.
- Performance Overhead: Serialization and deserialization via
cloudpickleis significantly slower than using native Flyte types like Protobuf-based dataclasses or Parquet-backed DataFrames. - Interoperability: Pickled objects are opaque to Flyte tasks written in other languages (e.g., Java or C++). Using native types is required for cross-language workflows.
- None Values:
FlytePickleTransformerexplicitly forbids picklingNonevalues. In Flyte,Noneshould be handled by theOptionaltype system rather than the pickling system.
Usage Example
Developers can use FlytePickle implicitly by using unsupported types or typing.Any, or explicitly by using the FlytePickle type hint. The following example from examples/basics/all_types.py demonstrates both:
from typing import Any, Dict
from flyte.types._pickle import FlytePickle
class CustomObject:
def __init__(self, value):
self.value = value
@env.task
async def process_any_pickle_types(my_any: Any, my_pickle: FlytePickle) -> Dict[str, str]:
"""Process Any and FlytePickle types"""
return {
"any": f"Any type: {my_any} (actual type: {type(my_any).__name__})",
"pickle": f"FlytePickle: {my_pickle} (type: {type(my_pickle).__name__})",
}
# Usage in a workflow
sample_pickle = CustomObject("Hello from pickle!")
result = await process_any_pickle_types(my_any={"data": 123}, my_pickle=sample_pickle)