Skip to main content

Core Type System Architecture

The core type system in flyte-sdk bridges the gap between Python's native type system and Flyte's language-agnostic Interface Definition Language (IDL). When you define a task with type hints, flyte-sdk uses this system to ensure that Python objects are correctly serialized into Flyte Literal types for transport and deserialized back into Python objects during execution.

The TypeEngine

The TypeEngine class, located in src/flyte/types/_type_engine.py, is the central orchestrator and registry for all type conversions. It maintains a mapping of Python types to their corresponding TypeTransformer implementations.

When a task is executed, the TypeEngine is responsible for two primary operations:

  1. to_literal: Converts a Python value into a Flyte Literal (defined in flyteidl2.core.literals_pb2).
  2. to_python_value: Converts a Flyte Literal back into a native Python object.

The TypeEngine also provides to_literal_type, which generates a LiteralType (the IDL representation of a type) from a Python type hint. This is used during task registration to define the task's interface.

TypeTransformers

Every type supported by flyte-sdk is handled by a subclass of TypeTransformer. This abstract base class (found in src/flyte/types/_type_engine.py) defines the interface for type conversion:

class TypeTransformer(typing.Generic[T]):
@abstractmethod
def get_literal_type(self, t: Type[T]) -> LiteralType:
"""Converts the python type to a Flyte LiteralType"""
...

@abstractmethod
async def to_literal(self, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
"""Converts a given python_val to a Flyte Literal"""
...

@abstractmethod
async def to_python_value(self, lv: Literal, expected_python_type: Type[T]) -> Optional[T]:
"""Converts the given Literal to a Python Type"""
...

Simple Transformers

For primitive types like int, str, and bool, flyte-sdk uses the SimpleTransformer helper. It reduces boilerplate by allowing you to define conversions using simple lambda functions. For example, the IntTransformer is defined as:

IntTransformer = SimpleTransformer(
"int",
int,
types_pb2.LiteralType(simple=types_pb2.SimpleType.INTEGER),
lambda x: Literal(scalar=Scalar(primitive=Primitive(integer=x))),
_handle_flyte_console_float_input_to_int,
)

Handling Complex Types

flyte-sdk provides specialized transformers for complex Python structures like Pydantic models and dataclasses. These transformers use a combination of MessagePack for efficient binary serialization and JSON Schema for type metadata.

Pydantic and Dataclass Transformers

The PydanticTransformer and DataclassTransformer (both in src/flyte/types/_type_engine.py) serialize objects into a Binary literal with the msgpack tag.

  • Pydantic: Uses Pydantic's native model_dump_json and model_validate methods, with additional logic to handle enums by name.
  • Dataclasses: Utilizes the mashumaro library for high-performance serialization to and from MessagePack.

Lazy Uploaders for Nested IO Types

A critical feature of these complex transformers is the _invoke_lazy_uploaders function. If you nest Flyte IO types like FlyteFile, FlyteDir, or StructuredDataset inside a Pydantic model or dataclass, they are not immediately uploaded to remote storage.

Before serialization, the transformer calls _invoke_lazy_uploaders to recursively find these objects and trigger their upload. This ensures that the resulting MessagePack payload contains the correct remote URIs rather than local file paths.

# Internal mechanism in PydanticTransformer.to_literal
await _invoke_lazy_uploaders(python_val)
json_str = python_val.model_dump_json()
# ... serialization continues

Custom Type Transformers

You can extend the Flyte type system by implementing your own TypeTransformer and registering it with the TypeEngine. This is useful for supporting domain-specific types or third-party library objects.

Implementation Example

The following example (adapted from examples/type_transformers/my_transformer/src/my_transformer/transformer.py) shows a transformer for a custom PositiveInt type:

from flyte.types._type_engine import TypeTransformer, TypeEngine
from flyteidl2.core import types_pb2, literals_pb2
from typing import Type

class PositiveInt:
def __init__(self, value: int):
if value < 0:
raise ValueError("Value must be positive")
self.value = value

class PositiveIntTransformer(TypeTransformer[PositiveInt]):
def __init__(self):
super().__init__(name="PositiveInt", t=PositiveInt)

def get_literal_type(self, t: Type[PositiveInt]) -> types_pb2.LiteralType:
return types_pb2.LiteralType(
simple=types_pb2.SimpleType.INTEGER,
structure=types_pb2.TypeStructure(tag="PositiveInt"),
)

async def to_literal(self, python_val: PositiveInt, python_type: Type[PositiveInt], expected: types_pb2.LiteralType) -> literals_pb2.Literal:
return literals_pb2.Literal(scalar=literals_pb2.Scalar(primitive=literals_pb2.Primitive(integer=python_val.value)))

async def to_python_value(self, lv: literals_pb2.Literal, expected_python_type: Type[PositiveInt]) -> PositiveInt:
value = lv.scalar.primitive.integer
return PositiveInt(value)

# Register the transformer so TypeEngine can find it
TypeEngine.register(PositiveIntTransformer())

Configuration

The type engine's performance during batch conversions (e.g., in large LiteralMap objects) can be tuned using the _F_TE_MAX_COROS environment variable. This controls the maximum number of concurrent coroutines used for type transformations, defaulting to 10.