Skip to main content

Data Validation & Structured IO

flyte-sdk provides specialized tools for ensuring data integrity and handling structured data formats efficiently. This includes deep integration with Pandera for DataFrame validation, a streaming-first JsonlFile type for JSON Lines, and dedicated connectors for external data warehouses like BigQuery and Snowflake.

Data Validation with Pandera

When you pass DataFrames between tasks, you often need to ensure they conform to a specific schema (e.g., correct column names, types, and value ranges). Without validation, a downstream task might fail with a cryptic KeyError or produce incorrect results due to unexpected data.

flyte-sdk integrates with Pandera to automate this validation. By using Pandera's type hints in your task signatures, flyte-sdk automatically validates incoming and outgoing DataFrames and generates detailed HTML validation reports in the Flyte Deck.

Defining and Using Schemas

To validate a DataFrame, define a pa.DataFrameModel and use it within pt.DataFrame[...] in your task signature.

import pandas as pd
import pandera.pandas as pa
import pandera.typing.pandas as pt
import flyte

class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str

@flyte.task(report=True)
async def process_employees(df: pt.DataFrame[EmployeeSchema]) -> pt.DataFrame[EmployeeSchema]:
# df is automatically validated against EmployeeSchema before the task runs
return df

The PanderaDataFrameTransformer (found in flyteplugins.pandera.transformers.base) handles the conversion. When report=True is set on the task, flyte-sdk captures Pandera's validation results and renders them as a tab in the Flyte Deck, allowing you to inspect schema violations visually.

Handling Validation Errors

By default, validation failures raise an exception and fail the task. You can customize this behavior using ValidationConfig to merely log a warning instead.

from typing import Annotated
from flyteplugins.pandera import ValidationConfig

@flyte.task(report=True)
async def permissive_task(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")],
) -> pt.DataFrame[EmployeeSchema]:
# If validation fails, a warning is logged and the report is generated,
# but the task continues.
return df

Structured IO with JsonlFile

Processing large datasets in JSON format can be memory-intensive if you load the entire file at once. The JsonlFile type in flyte-sdk is designed for streaming JSON Lines data, supporting both asynchronous and synchronous iteration with optional compression.

Streaming Reads and Writes

JsonlFile allows you to process records one by one, which is essential for datasets that exceed available memory. It also supports transparent zstd compression if the file extension is .zst.

from flyteplugins.jsonl import JsonlFile

@flyte.task
async def stream_data() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl.zst")
async with f.writer(compression_level=3) as w:
for i in range(1000):
await w.write({"id": i, "value": f"item-{i}"})
return f

@flyte.task
async def consume_data(f: JsonlFile) -> int:
count = 0
async for record in f.iter_records(on_error="skip"):
# Malformed lines are skipped instead of raising an error
count += 1
return count

The iter_records method (and its sync counterpart iter_records_sync) provides robust error handling. You can pass on_error="skip" to ignore malformed lines or provide a custom callback function to log or collect errors without stopping the stream.

External Data Connectors

flyte-sdk provides high-level task templates for interacting with external data warehouses. These connectors handle connection management, parameter injection, and mapping results back to Flyte types.

Snowflake

The Snowflake task (in flyteplugins.snowflake.task) allows you to execute SQL queries and perform batch inserts. It supports key-pair authentication by referencing Flyte secrets.

from flyteplugins.snowflake import Snowflake, SnowflakeConfig

sf_config = SnowflakeConfig(
user="MY_USER",
account="MY_ACCOUNT",
database="FLYTE",
schema="PUBLIC",
warehouse="COMPUTE_WH",
)

# Batch mode expands list inputs into a single VALUES clause
snowflake_insert = Snowflake(
name="snowflake_batch_insert",
inputs={"id": list[int], "name": list[str]},
plugin_config=sf_config,
query_template="INSERT INTO TEST (ID, NAME) VALUES (%(id)s, %(name)s);",
snowflake_private_key="my_snowflake_secret",
batch=True,
)

In batch=True mode, the query_template must use Python's printf-style formatting (e.g., %(name)s). flyte-sdk expands the input lists into a multi-row VALUES statement, which is significantly more efficient than individual inserts.

BigQuery

The BigQueryTask (in flyteplugins.bigquery.task) executes queries on Google BigQuery. Unlike Snowflake, it uses Flyte's Golang-style templating for parameters.

from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
from flyte.io import DataFrame

bigquery_query = BigQueryTask(
name="bigquery_select",
inputs={"min_id": int},
output_dataframe_type=DataFrame,
plugin_config=BigQueryConfig(ProjectID="my-gcp-project"),
query_template="SELECT * FROM my_dataset.my_table WHERE id > {{ .Inputs.min_id }};",
)

When the task executes, the {{ .Inputs.min_id }} placeholder is replaced with the actual input value. The results are automatically mapped to the specified output_dataframe_type, making it easy to pipe data from BigQuery into downstream Pandas or Polars tasks.