Integrating BigQuery and Snowflake
flyte-sdk provides specialized task templates for executing SQL queries against Google BigQuery and Snowflake. These tasks integrate with Flyte's asynchronous connector framework, allowing you to run long-running queries and load the results directly into Flyte-compatible DataFrames (such as pandas or polars) for downstream processing.
Executing BigQuery Queries
To run queries on BigQuery, use the BigQueryTask and BigQueryConfig classes. The task supports parameterized queries using Flyte's Golang templating format ({{ .input_name }}).
import flyte
from flyte.io import DataFrame
from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
# 1. Define the configuration
bq_config = BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
)
# 2. Define the BigQuery task
bigquery_task = BigQueryTask(
name="get_user_events",
inputs={"user_id": int, "limit": int},
output_dataframe_type=DataFrame,
plugin_config=bq_config,
query_template="""
SELECT event_name, event_timestamp
FROM `my-project.my_dataset.events`
WHERE user_id = {{ .user_id }}
LIMIT {{ .limit }}
""",
# Optional: Name of the secret containing GCP credentials
google_application_credentials="my-gcp-secret"
)
@flyte.task
def process_results(df: DataFrame):
# The result is automatically loaded into a DataFrame
print(f"Loaded {len(df)} rows")
@flyte.workflow
def my_workflow(user_id: int):
df = bigquery_task(user_id=user_id, limit=100)
process_results(df=df)
Key BigQuery Features
- Whitespace Normalization:
BigQueryTaskautomatically strips extra newlines, tabs, and multiple spaces from yourquery_templateto ensure a clean SQL statement is sent to the engine. - Advanced Configuration: You can pass a
google.cloud.bigquery.QueryJobConfigobject toBigQueryConfigvia theQueryJobConfigattribute to control settings like query caching or destination tables.
Executing Snowflake Queries
The Snowflake task template allows you to run SQL on Snowflake. Unlike BigQuery, Snowflake tasks use Python's printf-style string formatting (%(param_name)s) for parameters.
import pandas as pd
from flyteplugins.snowflake import Snowflake, SnowflakeConfig
# 1. Define connection metadata
sf_config = SnowflakeConfig(
account="myorg-myaccount",
database="ANALYTICS",
schema="PUBLIC",
warehouse="COMPUTE_WH",
user="flyte_user",
# Additional parameters like role or authenticator
connection_kwargs={"role": "ANALYST"}
)
# 2. Define the Snowflake task
snowflake_query = Snowflake(
name="query_snowflake",
inputs={"min_age": int},
output_dataframe_type=pd.DataFrame,
plugin_config=sf_config,
query_template="SELECT * FROM users WHERE age >= %(min_age)s;",
snowflake_private_key="my-snowflake-key"
)
Performing Snowflake Batch Inserts
flyte-sdk supports a specialized "batch mode" for Snowflake that expands list inputs into a multi-row VALUES clause. This is highly efficient for bulk data loading.
To use batch mode, set batch=True and ensure your query_template contains a single VALUES clause with placeholders matching your input keys.
from flyteplugins.snowflake import Snowflake, SnowflakeConfig
snowflake_batch_insert = Snowflake(
name="snowflake_batch_insert",
inputs={
"ids": list[int],
"names": list[str]
},
plugin_config=sf_config,
query_template="INSERT INTO TEST_TABLE (ID, NAME) VALUES (%(ids)s, %(names)s);",
batch=True,
snowflake_private_key="my-snowflake-key"
)
# When called, all input lists must have the same length
# snowflake_batch_insert(ids=[1, 2], names=["Alice", "Bob"])
Authentication and Secrets
Both plugins handle authentication by referencing secret keys stored in your Flyte environment.
BigQuery Authentication
Pass the name of the secret containing your Google Application Credentials JSON to the google_application_credentials parameter of BigQueryTask.
Snowflake Authentication
Snowflake supports key-pair authentication natively:
snowflake_private_key: The secret key for the Snowflake private key.snowflake_private_key_passphrase: The secret key for the private key passphrase (if encrypted).
If you use a secret_group, the environment variable name is auto-generated as {SECRET_GROUP}_{KEY} (uppercased). If omitted, the key name is used directly.
For other authentication methods (like passwords), pass them inside the connection_kwargs dictionary within your SnowflakeConfig.
Troubleshooting
- BigQuery Template Syntax: Ensure you use
{{ .input_name }}for BigQuery. Using%(input_name)swill result in SQL syntax errors. - Snowflake Batch Requirements: In batch mode, the
query_templatemust contain exactly oneVALUES (...)clause. If your input lists have mismatched lengths, the task will fail during execution. - DataFrame Types: While
flyte.io.DataFrameis the standard type, you can specifypandas.DataFrameorpolars.DataFrameas theoutput_dataframe_typeif the corresponding plugin (e.g.,flyteplugins-snowflake) supports the transformer.