Skip to main content

Integrating BigQuery and Snowflake

flyte-sdk provides specialized task templates for executing SQL queries against Google BigQuery and Snowflake. These tasks integrate with Flyte's asynchronous connector framework, allowing you to run long-running queries and load the results directly into Flyte-compatible DataFrames (such as pandas or polars) for downstream processing.

Executing BigQuery Queries

To run queries on BigQuery, use the BigQueryTask and BigQueryConfig classes. The task supports parameterized queries using Flyte's Golang templating format ({{ .input_name }}).

import flyte
from flyte.io import DataFrame
from flyteplugins.bigquery import BigQueryConfig, BigQueryTask

# 1. Define the configuration
bq_config = BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
)

# 2. Define the BigQuery task
bigquery_task = BigQueryTask(
name="get_user_events",
inputs={"user_id": int, "limit": int},
output_dataframe_type=DataFrame,
plugin_config=bq_config,
query_template="""
SELECT event_name, event_timestamp
FROM `my-project.my_dataset.events`
WHERE user_id = {{ .user_id }}
LIMIT {{ .limit }}
""",
# Optional: Name of the secret containing GCP credentials
google_application_credentials="my-gcp-secret"
)

@flyte.task
def process_results(df: DataFrame):
# The result is automatically loaded into a DataFrame
print(f"Loaded {len(df)} rows")

@flyte.workflow
def my_workflow(user_id: int):
df = bigquery_task(user_id=user_id, limit=100)
process_results(df=df)

Key BigQuery Features

  • Whitespace Normalization: BigQueryTask automatically strips extra newlines, tabs, and multiple spaces from your query_template to ensure a clean SQL statement is sent to the engine.
  • Advanced Configuration: You can pass a google.cloud.bigquery.QueryJobConfig object to BigQueryConfig via the QueryJobConfig attribute to control settings like query caching or destination tables.

Executing Snowflake Queries

The Snowflake task template allows you to run SQL on Snowflake. Unlike BigQuery, Snowflake tasks use Python's printf-style string formatting (%(param_name)s) for parameters.

import pandas as pd
from flyteplugins.snowflake import Snowflake, SnowflakeConfig

# 1. Define connection metadata
sf_config = SnowflakeConfig(
account="myorg-myaccount",
database="ANALYTICS",
schema="PUBLIC",
warehouse="COMPUTE_WH",
user="flyte_user",
# Additional parameters like role or authenticator
connection_kwargs={"role": "ANALYST"}
)

# 2. Define the Snowflake task
snowflake_query = Snowflake(
name="query_snowflake",
inputs={"min_age": int},
output_dataframe_type=pd.DataFrame,
plugin_config=sf_config,
query_template="SELECT * FROM users WHERE age >= %(min_age)s;",
snowflake_private_key="my-snowflake-key"
)

Performing Snowflake Batch Inserts

flyte-sdk supports a specialized "batch mode" for Snowflake that expands list inputs into a multi-row VALUES clause. This is highly efficient for bulk data loading.

To use batch mode, set batch=True and ensure your query_template contains a single VALUES clause with placeholders matching your input keys.

from flyteplugins.snowflake import Snowflake, SnowflakeConfig

snowflake_batch_insert = Snowflake(
name="snowflake_batch_insert",
inputs={
"ids": list[int],
"names": list[str]
},
plugin_config=sf_config,
query_template="INSERT INTO TEST_TABLE (ID, NAME) VALUES (%(ids)s, %(names)s);",
batch=True,
snowflake_private_key="my-snowflake-key"
)

# When called, all input lists must have the same length
# snowflake_batch_insert(ids=[1, 2], names=["Alice", "Bob"])

Authentication and Secrets

Both plugins handle authentication by referencing secret keys stored in your Flyte environment.

BigQuery Authentication

Pass the name of the secret containing your Google Application Credentials JSON to the google_application_credentials parameter of BigQueryTask.

Snowflake Authentication

Snowflake supports key-pair authentication natively:

  • snowflake_private_key: The secret key for the Snowflake private key.
  • snowflake_private_key_passphrase: The secret key for the private key passphrase (if encrypted).

If you use a secret_group, the environment variable name is auto-generated as {SECRET_GROUP}_{KEY} (uppercased). If omitted, the key name is used directly.

For other authentication methods (like passwords), pass them inside the connection_kwargs dictionary within your SnowflakeConfig.

Troubleshooting

  • BigQuery Template Syntax: Ensure you use {{ .input_name }} for BigQuery. Using %(input_name)s will result in SQL syntax errors.
  • Snowflake Batch Requirements: In batch mode, the query_template must contain exactly one VALUES (...) clause. If your input lists have mismatched lengths, the task will fail during execution.
  • DataFrame Types: While flyte.io.DataFrame is the standard type, you can specify pandas.DataFrame or polars.DataFrame as the output_dataframe_type if the corresponding plugin (e.g., flyteplugins-snowflake) supports the transformer.