Skip to main content

Polars Parquet Serialization Mechanics

The flyte-sdk provides specialized handlers for Polars DataFrame and LazyFrame types, enabling high-performance data processing within Flyte tasks. These handlers use Parquet as the intermediate storage format, leveraging Polars' native Parquet engine to ensure efficient serialization and deserialization across distributed environments.

Eager vs. Lazy Serialization

The flyte-sdk distinguishes between eager pl.DataFrame and lazy pl.LazyFrame objects, providing optimized handlers for each.

Eager DataFrames

For standard pl.DataFrame objects, the PolarsToParquetEncodingHandler and ParquetToPolarsDecodingHandler manage the transition to and from Parquet. When a task returns a pl.DataFrame, the encoder writes the data immediately to a file named 00000 within the dataset's URI.

# plugins/polars/src/flyteplugins/polars/df_transformer.py

path = os.path.join(uri, f"{0:05}")
df = typing.cast(pl.DataFrame, dataframe.val)
# ...
df.write_parquet(path, storage_options=storage_options or None)

LazyFrames and Streaming

The PolarsLazyFrameToParquetEncodingHandler and ParquetToPolarsLazyFrameDecodingHandler are designed for deferred execution. Instead of loading the entire dataset into memory, these handlers use pl.scan_parquet for decoding and pl.sink_parquet for encoding. This allows Flyte to pass query plans between tasks, where the actual data processing only occurs when a terminal action (like collect()) is called or when the data is "sinked" to storage.

Note that LazyFrame handlers use a slightly different file naming convention, appending the .parquet extension:

# plugins/polars/src/flyteplugins/polars/df_transformer.py

path = os.path.join(uri, f"{0:05}.parquet")
lazy_df = typing.cast(pl.LazyFrame, dataframe.val)
# ...
lazy_df.sink_parquet(path, storage_options=storage_options or None)

Column Projection and Optimization

One of the primary advantages of using Parquet with Polars in flyte-sdk is the ability to perform efficient column projection. When a task defines a specific schema via typing.Annotated, the decoding handlers extract only the requested columns from the Parquet file.

In ParquetToPolarsDecodingHandler, the logic checks the current_task_metadata for requested columns:

columns = None
if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns:
columns = [c.name for c in current_task_metadata.structured_dataset_type.columns]

# ...
return pl.read_parquet(parquet_path, columns=columns, storage_options=storage_options or None)

For LazyFrame decoders, this projection is applied to the scan operation, allowing Polars to optimize the query plan before any data is read:

lf = pl.scan_parquet(parquet_path, storage_options=storage_options or None)
if columns:
lf = lf.select(*columns)
return lf

Storage Configuration Mapping

Polars requires storage options (for S3, Azure, etc.) to be provided as a flat dictionary of string keys and values. The flyte-sdk includes a utility, get_polars_storage_options, which translates Flyte's internal storage configuration into this format.

The utility supports several protocols:

  • S3: Maps access_key_id, secret_access_key, region, and endpoint to Polars-compatible keys like aws_access_key_id.
  • ABFS/Azure: Maps Azure credentials to keys like azure_storage_account_name and azure_storage_account_key.
  • GCS: Returns an empty dictionary, as Polars typically handles GCS via application default credentials automatically.

Anonymous Access Fallback

The decoding handlers include a specific recovery mechanism for S3. If a NoCredentialsError occurs during a read operation, the flyte-sdk attempts to re-read the data using anonymous access by setting aws_skip_signature to "true".

except Exception as exc:
if exc.__class__.__name__ == "NoCredentialsError":
logger.debug("S3 source detected, attempting anonymous S3 access")
storage_options = get_polars_storage_options(protocol=filesystem.protocol, anonymous=True)
return pl.read_parquet(parquet_path, columns=columns, storage_options=storage_options or None)

Implementation Constraints

While the Polars integration provides robust Parquet support, there are specific implementation details to note:

  • Single File Assumption: Current handlers assume a single Parquet file (named 00000 or 00000.parquet). The codebase contains TODO notes indicating that multi-file partitioning support is a planned future enhancement.
  • Flat Storage Options: Because Polars does not use fsspec style nested dictionaries, the get_polars_storage_options function is strictly required to flatten Flyte's configuration before passing it to Polars' IO methods.
  • Automatic Registration: The handlers are registered with the DataFrameTransformerEngine via register_polars_df_transformers(), which is executed at module import time in plugins/polars/src/flyteplugins/polars/df_transformer.py.