Skip to main content

Data & Type System

The flyte-sdk data and type system provides a bridge between native Python types and Flyte's internal representation. It is designed to handle everything from simple scalars to complex structured data and large-scale file/directory transfers, ensuring that data is correctly serialized, offloaded to remote storage when necessary, and reconstructed in downstream tasks.

The Type Engine Architecture

The TypeEngine (defined in src/flyte/types/_type_engine.py) is the central orchestrator for all type-related operations in flyte-sdk. It maintains a registry of TypeTransformer implementations that define how specific Python types map to Flyte's Literal and LiteralType protobuf definitions.

When a task is executed, the TypeEngine uses these transformers to:

  1. Infer Flyte Types: Convert Python type hints into Flyte LiteralType objects.
  2. Serialize Values: Convert Python objects into Flyte Literal objects (e.g., via to_literal).
  3. Deserialize Values: Reconstruct Python objects from Flyte Literal objects (e.g., via to_python_value).

Type Transformers

The TypeTransformer is an abstract base class that developers can extend to support custom types. flyte-sdk includes built-in transformers for standard types, as well as specialized ones for complex structures. For example, the PydanticTransformer integrates with Pydantic to generate JSON schemas for Flyte's STRUCT type:

class PydanticTransformer(TypeTransformer[BaseModel]):
def get_literal_type(self, t: Type[BaseModel]) -> LiteralType:
# Uses a custom generator to produce a JSON schema for the Flyte UI
schema = t.model_json_schema(schema_generator=CustomPydanticJsonSchemaGenerator)
return LiteralType(simple=SimpleType.STRUCT, metadata=schema)

The TypeEngine also handles recursive types like list[T], dict[str, T], and tuple[T1, T2]. Note that flyte-sdk requires typed tuples; untyped tuple or tuple[Any, ...] are generally not supported for individual value serialization.

Core IO Abstractions: File and Dir

For handling large datasets that should not be stored directly in the Flyte metadata database, flyte-sdk provides File and Dir abstractions (found in src/flyte/io/_file.py and src/flyte/io/_dir.py). These classes manage the lifecycle of data in remote storage (like S3 or GCS) while providing a familiar file-like interface to the user.

Async and Sync Interfaces

Both File and Dir provide dual interfaces for asynchronous and synchronous operations. Methods without a suffix are asynchronous, while synchronous versions are suffixed with _sync.

Example of reading a File asynchronously:

@env.task
async def read_file(file: File) -> str:
async with file.open("rb") as f:
content = bytes(await f.read())
return content.decode("utf-8")

Example of walking a Dir recursively:

@env.task
async def process_all_files(d: Dir) -> int:
file_count = 0
async for file in d.walk(recursive=True):
async with file.open("rb") as f:
content = await f.read()
file_count += 1
return file_count

The EmptyDir Sentinel

A common challenge in data pipelines is representing the absence of an output directory. In flyte-sdk, Optional[Dir] does not always round-trip correctly through serialization. To solve this, flyte-sdk uses an EmptyDir sentinel. Tasks can return Dir.empty() to indicate no data was produced, and downstream tasks can check d.is_empty() to branch logic.

# In src/flyte/io/_dir.py
@classmethod
def empty(cls) -> "Dir":
"""Return a sentinel Dir representing 'no directory was produced'."""
return EmptyDir()

Serialization Fallbacks

When the TypeEngine encounters a Python type that does not have a registered transformer, it falls back to FlytePickle (implemented in src/flyte/types/_pickle.py).

FlytePickle uses cloudpickle to serialize the object. Depending on the size, the resulting bytes are either stored directly in the Flyte Literal (as binary) or offloaded to remote blob storage. While convenient, FlytePickle has drawbacks:

  • Efficiency: It is less efficient than specialized transformers for large data.
  • Opacity: The Flyte UI cannot inspect the contents of a pickled object, as it is treated as an opaque blob.

Performance and Configuration

The data system in flyte-sdk is optimized for high-throughput IO and concurrent type processing. Two key environment variables control this behavior:

  • _F_TE_MAX_COROS: (Default: 10) Controls the maximum number of concurrent coroutines the TypeEngine will use for batch operations.
  • FLYTE_IO_BATCH_SIZE: (Default: 32) Determines the batch size for directory operations, such as recursive uploads or listings in src/flyte/storage/_storage.py.

When working with File or Dir objects that use "lazy uploaders" (functions that defer the actual upload until serialization), flyte-sdk ensures these are invoked within the correct async context to prevent blocking the event loop.