Polars Parquet Serialization Mechanics
The flyte-sdk provides specialized handlers for Polars DataFrame and LazyFrame types, enabling high-performance data processing within Flyte tasks. These handlers use Parquet as the intermediate storage format, leveraging Polars' native Parquet engine to ensure efficient serialization and deserialization across distributed environments.
Eager vs. Lazy Serialization
The flyte-sdk distinguishes between eager pl.DataFrame and lazy pl.LazyFrame objects, providing optimized handlers for each.
Eager DataFrames
For standard pl.DataFrame objects, the PolarsToParquetEncodingHandler and ParquetToPolarsDecodingHandler manage the transition to and from Parquet. When a task returns a pl.DataFrame, the encoder writes the data immediately to a file named 00000 within the dataset's URI.
# plugins/polars/src/flyteplugins/polars/df_transformer.py
path = os.path.join(uri, f"{0:05}")
df = typing.cast(pl.DataFrame, dataframe.val)
# ...
df.write_parquet(path, storage_options=storage_options or None)
LazyFrames and Streaming
The PolarsLazyFrameToParquetEncodingHandler and ParquetToPolarsLazyFrameDecodingHandler are designed for deferred execution. Instead of loading the entire dataset into memory, these handlers use pl.scan_parquet for decoding and pl.sink_parquet for encoding. This allows Flyte to pass query plans between tasks, where the actual data processing only occurs when a terminal action (like collect()) is called or when the data is "sinked" to storage.
Note that LazyFrame handlers use a slightly different file naming convention, appending the .parquet extension:
# plugins/polars/src/flyteplugins/polars/df_transformer.py
path = os.path.join(uri, f"{0:05}.parquet")
lazy_df = typing.cast(pl.LazyFrame, dataframe.val)
# ...
lazy_df.sink_parquet(path, storage_options=storage_options or None)
Column Projection and Optimization
One of the primary advantages of using Parquet with Polars in flyte-sdk is the ability to perform efficient column projection. When a task defines a specific schema via typing.Annotated, the decoding handlers extract only the requested columns from the Parquet file.
In ParquetToPolarsDecodingHandler, the logic checks the current_task_metadata for requested columns:
columns = None
if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns:
columns = [c.name for c in current_task_metadata.structured_dataset_type.columns]
# ...
return pl.read_parquet(parquet_path, columns=columns, storage_options=storage_options or None)
For LazyFrame decoders, this projection is applied to the scan operation, allowing Polars to optimize the query plan before any data is read:
lf = pl.scan_parquet(parquet_path, storage_options=storage_options or None)
if columns:
lf = lf.select(*columns)
return lf
Storage Configuration Mapping
Polars requires storage options (for S3, Azure, etc.) to be provided as a flat dictionary of string keys and values. The flyte-sdk includes a utility, get_polars_storage_options, which translates Flyte's internal storage configuration into this format.
The utility supports several protocols:
- S3: Maps
access_key_id,secret_access_key,region, andendpointto Polars-compatible keys likeaws_access_key_id. - ABFS/Azure: Maps Azure credentials to keys like
azure_storage_account_nameandazure_storage_account_key. - GCS: Returns an empty dictionary, as Polars typically handles GCS via application default credentials automatically.
Anonymous Access Fallback
The decoding handlers include a specific recovery mechanism for S3. If a NoCredentialsError occurs during a read operation, the flyte-sdk attempts to re-read the data using anonymous access by setting aws_skip_signature to "true".
except Exception as exc:
if exc.__class__.__name__ == "NoCredentialsError":
logger.debug("S3 source detected, attempting anonymous S3 access")
storage_options = get_polars_storage_options(protocol=filesystem.protocol, anonymous=True)
return pl.read_parquet(parquet_path, columns=columns, storage_options=storage_options or None)
Implementation Constraints
While the Polars integration provides robust Parquet support, there are specific implementation details to note:
- Single File Assumption: Current handlers assume a single Parquet file (named
00000or00000.parquet). The codebase containsTODOnotes indicating that multi-file partitioning support is a planned future enhancement. - Flat Storage Options: Because Polars does not use
fsspecstyle nested dictionaries, theget_polars_storage_optionsfunction is strictly required to flatten Flyte's configuration before passing it to Polars' IO methods. - Automatic Registration: The handlers are registered with the
DataFrameTransformerEngineviaregister_polars_df_transformers(), which is executed at module import time inplugins/polars/src/flyteplugins/polars/df_transformer.py.