Skip to main content

Databricks Integration

The Databricks integration in flyte-sdk allows you to execute Spark tasks natively on managed Databricks infrastructure. By using the Databricks configuration class, you can specify cluster settings, authentication tokens, and workspace instances to run distributed PySpark jobs.

Configuring a Databricks Task

To run a task on Databricks, you define a Databricks configuration and apply it to your task using a TaskEnvironment.

from flyteplugins.databricks import Databricks
import flyte

# 1. Define the Databricks configuration
databricks_conf = Databricks(
spark_conf={
"spark.driver.memory": "2000M",
"spark.executor.memory": "1000M",
},
# Path to the Python binary in the Databricks runtime
executor_path="/databricks/python3/bin/python",
databricks_conf={
"run_name": "flyte databricks execution",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 1,
"aws_attributes": {
"availability": "SPOT_WITH_FALLBACK",
"instance_profile_arn": "arn:aws:iam::123456789012:instance-profile/databricks-role",
},
},
"timeout_seconds": 3600,
},
databricks_instance="dbc-xxxx-yyyy.cloud.databricks.com",
# Name of the Flyte secret containing the API token
databricks_token="DATABRICKS_API_TOKEN",
)

# 2. Create a TaskEnvironment with the plugin configuration
databricks_env = flyte.TaskEnvironment(
name="databricks_env",
plugin_config=databricks_conf,
)

# 3. Decorate your task with the environment
@databricks_env.task
async def process_data_on_databricks(n: int) -> float:
# Access the Spark session provided by the Flyte context
spark = flyte.ctx().data["spark_session"]

df = spark.range(n)
return float(df.count())

Key Configuration Attributes

The Databricks class (found in plugins/databricks/src/flyteplugins/databricks/task.py) supports several critical parameters:

  • databricks_conf: A dictionary compliant with the Databricks Jobs API v2.1. It must include either new_cluster (to provision a cluster on-demand) or existing_cluster_id (to use a warm cluster).
  • databricks_instance: The domain name of your Databricks deployment (e.g., "myorg.cloud.databricks.com").
  • databricks_token: The name of the Flyte secret that holds your Databricks personal access token. The flyte-sdk does not accept the token string directly for security reasons.
  • executor_path: The path to the Python interpreter on the Databricks cluster. For standard Databricks runtimes, this is typically /databricks/python3/bin/python.

Using an Existing Cluster

If you prefer to run tasks on a pre-provisioned cluster to avoid startup latency, replace the new_cluster block with existing_cluster_id.

databricks_conf = Databricks(
databricks_conf={
"existing_cluster_id": "1234-567890-batch123",
},
databricks_instance="dbc-xxxx-yyyy.cloud.databricks.com",
databricks_token="DATABRICKS_API_TOKEN",
)

Image Requirements

When running on Databricks, your Docker image should be compatible with the Databricks Runtime. It is recommended to use a Databricks base image and install flyteplugins-databricks.

image = (
flyte.Image.from_base("databricksruntime/standard:13.3-LTS")
.clone(name="spark-databricks", registry="ghcr.io/myorg", extendable=True)
.with_env_vars({"UV_PYTHON": "/databricks/python3/bin/python"})
.with_pip_packages("flyteplugins-databricks")
)

databricks_env = flyte.TaskEnvironment(
name="databricks_env",
plugin_config=databricks_conf,
image=image,
)

Troubleshooting

Secret Configuration

The databricks_token field in the Databricks class refers to the key in Flyte's secret management system, not the token value itself. If your task fails with authentication errors, ensure that:

  1. A secret with that name exists in your Flyte deployment.
  2. The task has the necessary permissions to access that secret.

Spark Session Access

Unlike standard PySpark tasks where you might create a session manually, flyte-sdk automatically manages the session for Databricks tasks. Always retrieve it from the context:

spark = flyte.ctx().data["spark_session"]

Entrypoint and Paths

The DatabricksFunctionTask implementation (in plugins/databricks/src/flyteplugins/databricks/task.py) automatically configures the mainApplicationFile to point to the Flyte entrypoint. If you need to override this, use the applications_path attribute in the Databricks config.