DataFrames and Tabular Data
Flyte DataFrames provide a unified interface for handling tabular data across different libraries like Pandas, PyArrow, and Polars. Instead of loading entire datasets into memory, flyte.io.DataFrame acts as a remote reference (proxy) that materializes data only when needed, enabling efficient data transfer and interoperability between tasks using different backend libraries.
Interoperability with Pandas and PyArrow
When you define a task that accepts or returns a pd.DataFrame or pa.Table, flyte-sdk automatically handles the conversion to and from the underlying storage format (usually Parquet). This allows you to write tasks using your preferred library without worrying about manual serialization.
import pandas as pd
import flyte.io
from typing import Annotated
@env.task
async def create_data() -> pd.DataFrame:
# Returns a raw pandas DataFrame; Flyte converts it to a remote reference
return pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
@env.task
async def process_data(df: pd.DataFrame) -> pd.DataFrame:
# Flyte materializes the remote reference back into a pandas DataFrame
return df.assign(c=df["a"] + df["b"])
Internally, flyte.io.DataFrame manages the URI and format of the data. When a task returns a pd.DataFrame, the DataFrameTransformerEngine in src/flyte/io/_dataframe/dataframe.py selects the appropriate DataFrameEncoder (like PandasToParquetEncodingHandler in src/flyte/io/_dataframe/basic_dfs.py) to upload the data to remote storage.
Controlling Serialization Formats
By default, flyte-sdk uses Parquet for its efficiency and schema preservation. However, you can force a specific format like CSV using Annotated in your type hints.
from typing import Annotated
import pandas as pd
import flyte.io
@env.task
async def create_csv_data() -> Annotated[flyte.io.DataFrame, "csv"]:
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
# wrap_df creates a Flyte DataFrame reference from a raw dataframe
return flyte.io.DataFrame.wrap_df(df)
The string "csv" in Annotated tells the DataFrameTransformerEngine to look for a handler registered for the CSV format. In src/flyte/io/_dataframe/basic_dfs.py, the PandasToCSVEncodingHandler implements this by calling df.to_csv().
Lazy Evaluation with Polars
For large datasets, materializing the entire table into memory can be expensive. The flyteplugins-polars plugin allows you to use pl.LazyFrame, which enables Polars to optimize query plans across task boundaries.
import polars as pl
import flyte
@env.task
async def get_lazy_frame() -> pl.LazyFrame:
return pl.LazyFrame({"salary": [50000, 80000, 120000]})
@env.task
async def filter_high_earners(lf: pl.LazyFrame) -> pl.LazyFrame:
# Operations are added to the query plan but not executed yet
return lf.filter(pl.col("salary") > 70000)
When a pl.LazyFrame is passed between tasks, flyte-sdk transfers the underlying Parquet files. The data is only "collected" (materialized) when you explicitly call .collect() or when it is passed to a task expecting a concrete pl.DataFrame.
Manual Materialization and Remote References
If you want to delay materialization within a task or work with existing remote data, you can use flyte.io.DataFrame directly.
Loading Remote Data
You can create a reference to an existing file on S3 or GCS without downloading it immediately:
@env.task
async def process_remote_parquet() -> pd.DataFrame:
fdf = flyte.io.DataFrame.from_existing_remote(
"s3://my-bucket/data.parquet",
format="parquet"
)
# Materialize into a pandas DataFrame
df = await fdf.open(pd.DataFrame).all()
return df
Lazy Access in Tasks
When a task receives a flyte.io.DataFrame, it doesn't download anything until you call .all() or .iter():
@env.task
async def process_lazily(fdf: flyte.io.DataFrame) -> int:
# Data is downloaded and converted to pandas here
df = await fdf.open(pd.DataFrame).all()
return len(df)
The .open(dataframe_type) method sets the target type, and .all() invokes the DataFrameTransformerEngine.to_python_value logic to find the correct DataFrameDecoder for that type and format.
Extending DataFrame Support
You can add support for new dataframe libraries or storage formats by implementing custom encoders and decoders and registering them with the DataFrameTransformerEngine.
- Implement
DataFrameEncoder: Define how to save your dataframe type to storage. - Implement
DataFrameDecoder: Define how to load your dataframe type from storage. - Register with the Engine: Use
DataFrameTransformerEngine.register().
from flyte.io.extend import DataFrameEncoder, DataFrameDecoder, DataFrameTransformerEngine
from flyteidl2.core import literals_pb2, types_pb2
class MyCustomDFEncoder(DataFrameEncoder):
def __init__(self):
super().__init__(MyCustomDF, supported_format="custom_fmt")
async def encode(self, dataframe, structured_dataset_type):
# Implementation to save dataframe to dataframe.uri
...
return literals_pb2.StructuredDataset(uri=uri, metadata=...)
# Register the new handler
DataFrameTransformerEngine.register(MyCustomDFEncoder())
This registration mechanism, found in src/flyte/io/_dataframe/dataframe.py, allows flyte-sdk to dynamically discover how to handle various tabular data types at runtime.