Introduction to Pandera Validation
Pandera integration in flyte-sdk allows you to enforce schema constraints on DataFrames using type-annotated schemas. By using Pandera's DataFrameModel, you can define expected columns, data types, and custom validation checks that flyte-sdk automatically executes when data enters or leaves a task.
In this tutorial, you will learn how to define schemas, apply them to tasks, and customize validation behavior for Pandas, Polars, and PySpark DataFrames.
Prerequisites
To use Pandera validation, you must install the flyteplugins-pandera plugin along with your preferred DataFrame library:
pip install flyteplugins-pandera pandas # For Pandas
pip install flyteplugins-pandera polars # For Polars
pip install flyteplugins-pandera pyspark # For PySpark
[!IMPORTANT] Always import
pandera.typingmodules beforeflyteplugins.panderaor other Flyte modules. This ensures that Flyte'sTypeEnginecorrectly identifies the Pandera types during registration.
Step 1: Define a Pandera Schema
First, define your data schema by subclassing pandera.DataFrameModel. This class describes the structure and constraints of your DataFrame.
import pandera
class UserSchema(pandera.DataFrameModel):
user_id: int
username: str = pandera.Field(unique=True)
age: int = pandera.Field(ge=0, le=120)
In this example, UserSchema ensures that the DataFrame has an integer user_id, a unique string username, and an age between 0 and 120.
Step 2: Create a Task with Pandas Validation
To enable validation, use the pandera.typing.pandas.DataFrame type hint in your task signature, parameterized with your schema class.
import pandas as pd
import pandera.typing.pandas as pandera_typing_pandas
from flytekit import task
@task
def process_users(df: pandera_typing_pandas.DataFrame[UserSchema]) -> int:
# flyte-sdk has already validated 'df' against UserSchema here
return int(df["age"].mean())
When process_users is called, flyte-sdk uses the PanderaPandasDataFrameTransformer to validate the input. If the data does not match the schema, a pandera.errors.SchemaErrors exception is raised by default.
Step 3: Customize Validation Behavior
By default, validation failures stop execution by raising an error. You can change this behavior to only log a warning using ValidationConfig and Python's Annotated type.
from typing import Annotated
from flyteplugins.pandera import ValidationConfig
@task
def log_invalid_users(
df: Annotated[
pandera_typing_pandas.DataFrame[UserSchema],
ValidationConfig(on_error="warn")
]
) -> int:
# If validation fails, a warning is logged and the task continues
return len(df)
The ValidationConfig class (found in flyteplugins.pandera.config) supports on_error="raise" (default) and on_error="warn".
Step 4: Use Polars or PySpark
flyte-sdk provides specialized transformers for Polars and PySpark SQL as well. The usage pattern remains the same, but you use the framework-specific typing modules.
Polars
For Polars, use pandera.typing.polars.DataFrame or pandera.typing.polars.LazyFrame. These are handled by the PanderaPolarsDataFrameTransformer.
import pandera.typing.polars as pandera_typing_polars
@task
def polars_task(df: pandera_typing_polars.DataFrame[UserSchema]) -> int:
return df.height
PySpark SQL
For PySpark, use pandera.pyspark.DataFrameModel and pandera.typing.pyspark_sql.DataFrame. These are handled by the PanderaPySparkSqlDataFrameTransformer.
import pandera.pyspark as pandera_pyspark
import pandera.typing.pyspark_sql as pandera_typing_pyspark_sql
class SparkUserSchema(pandera_pyspark.DataFrameModel):
user_id: int = pandera_pyspark.Field()
@task
def spark_task(sdf: pandera_typing_pyspark_sql.DataFrame[SparkUserSchema]) -> int:
return sdf.count()
Automatic Validation Reports
Every time flyte-sdk performs Pandera validation, it automatically generates an HTML report and attaches it to the Flyte Deck.
- Input Validation: Reports appear in a tab named "Pandera report: input".
- Output Validation: Reports appear in a tab named "Pandera report: output".
These reports are generated by framework-specific renderers like PanderaPandasReportRenderer and provide a visual summary of the data and any validation errors encountered.
Complete Working Example
The following script demonstrates a complete Flyte workflow using Pandas validation and custom error handling.
import pandas as pd
import pandera
import pandera.typing.pandas as pandera_typing_pandas
from typing import Annotated
from flytekit import task, workflow
from flyteplugins.pandera import ValidationConfig
class InputSchema(pandera.DataFrameModel):
value: int = pandera.Field(gt=0)
@task
def get_data() -> pd.DataFrame:
# This data is valid
return pd.DataFrame({"value": [1, 2, 3]})
@task
def process_data(df: pandera_typing_pandas.DataFrame[InputSchema]) -> int:
return int(df["value"].sum())
@task
def warn_on_invalid(
df: Annotated[
pandera_typing_pandas.DataFrame[InputSchema],
ValidationConfig(on_error="warn")
]
) -> int:
# This will log a warning if 'value' <= 0
return len(df)
@workflow
def pandera_workflow() -> int:
df = get_data()
res = process_data(df=df)
_ = warn_on_invalid(df=df)
return res
if __name__ == "__main__":
print(f"Result: {pandera_workflow()}")
Next Steps
- Explore Pandera's documentation to learn about complex checks like
Check.groupbyor statistical validation. - Use Flyte Decks in the Flyte Console to inspect the detailed validation reports generated by
PanderaDataFrameTransformer.