Structured Data: Dataclasses and Pydantic
When you pass complex data structures between tasks, using raw dictionaries often leads to runtime errors because there is no contract for the keys and types. flyte-sdk solves this by supporting Python dataclasses and Pydantic BaseModel as first-class types. This provides type safety, automatic JSON schema generation in the Flyte UI, and validation before your code even runs.
Python Dataclasses
You can use standard Python dataclasses to define the structure of your task inputs and outputs. flyte-sdk automatically handles the serialization to and from JSON.
Basic and Nested Dataclasses
Dataclasses support basic types (str, int, float, bool), date/time types (datetime, timedelta), and collections (List, Dict). You can also nest dataclasses to create complex hierarchies.
As seen in examples/basics/types/dataclass_types.py, you define your structure and use it as a type hint in your @task:
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class NestedData:
name: str
value: int
@dataclass
class ComplexData:
str_field: str
int_list: List[int]
nested: NestedData
optional_str: Optional[str] = None
@env.task
def process_complex_data(data: ComplexData) -> str:
return f"Received {data.nested.name} with {len(data.int_list)} items"
When you call this task, flyte-sdk ensures that the input matches the ComplexData structure. If you are running via the CLI, you can pass the dataclass as a JSON string:
flyte run my_script.py process_complex_data --data '{"str_field": "test", "int_list": [1, 2], "nested": {"name": "sub", "value": 10}}'
Pydantic BaseModels
For more advanced validation, flyte-sdk supports Pydantic BaseModel. This is particularly useful when you need default values, Enum support, or Literal constraints.
Complex Validation and Enums
In examples/basics/pydantic_models.py, Pydantic is used to define a robust configuration object:
from enum import Enum
from typing import Literal, Optional
from pydantic import BaseModel
class BatchMode(str, Enum):
LINES = "lines"
BYTES = "bytes"
class RetryPolicy(BaseModel):
max_retries: int = 3
backoff_seconds: float = 1.0
class BatchConfig(BaseModel):
name: str = "default"
batch_by: BatchMode = BatchMode.LINES
retry: RetryPolicy = RetryPolicy()
mode: Literal["fast", "slow"] = "fast"
tags: list[str] = []
@env.task
def process_data(batch_config: Optional[BatchConfig] = None) -> str:
cfg = batch_config or BatchConfig()
return f"Running in {cfg.mode} mode, batching by {cfg.batch_by}"
Flyte uses the Pydantic model to generate a JSON schema. In the Flyte UI, this schema is used to provide a structured input form, ensuring that users provide valid data according to your Enum and Literal definitions.
Combining Structured Data with Heavy IO
A powerful pattern in flyte-sdk is embedding heavy IO types like flyte.io.File or flyte.io.DataFrame inside dataclasses or Pydantic models. This allows you to group metadata (like model parameters or execution IDs) with the actual data files.
The arbitrary_types_allowed Requirement
When using Flyte-specific types like File or DataFrame inside a Pydantic model, you must explicitly allow them in the model's configuration. This is because these types are not standard JSON-serializable types until flyte-sdk processes them.
As demonstrated in examples/basics/dataclass_examples.py and examples/basics/dataframe_nested.py:
from pydantic import BaseModel
from flyte.io import File, DataFrame
class BatchPredictionResults(BaseModel):
predictions: list[float]
results_file: File
# Required for Pydantic to accept Flyte IO types
class Config:
arbitrary_types_allowed = True
# For Pydantic v2 style:
class MyModel(BaseModel):
data: DataFrame
model_config = {"arbitrary_types_allowed": True}
How it Works Internally
When flyte-sdk encounters a File or DataFrame inside a structured type:
- Offloading: The heavy data (the actual file content or the dataframe rows) is uploaded to the configured blob storage (e.g., S3 or GCS).
- Reference Substitution: The
FileorDataFrameobject in the JSON structure is replaced with a reference (a URI) to the offloaded data. - Serialization: The remaining dataclass or Pydantic model, now containing only primitives and URIs, is serialized to JSON for the Flyte engine.
- Reconstruction: When the next task receives this input, flyte-sdk automatically downloads the data (or provides a lazy-loading handle) and reconstructs the original Python object.
This mechanism allows you to maintain clean, structured code while efficiently handling gigabytes of data.