Skip to main content

Dask Cluster Setup

To configure a Dask cluster for distributed execution in flyte-sdk, you define a Dask configuration object and apply it to a TaskEnvironment. This allows you to specify the number of workers, resource requirements, and custom images for both the scheduler and worker pods.

from flyteplugins.dask import Dask, Scheduler, WorkerGroup
from flyte import Resources, TaskEnvironment
import typing

# 1. Configure the Dask cluster settings
dask_config = Dask(
scheduler=Scheduler(
resources=Resources(cpu="1", memory="2Gi"),
image="custom-dask-image:latest" # Optional: defaults to task image
),
workers=WorkerGroup(
number_of_workers=5,
resources=Resources(cpu="2", memory="4Gi"),
image="custom-dask-image:latest"
),
)

# 2. Assign the configuration to a TaskEnvironment
dask_env = TaskEnvironment(
name="dask_compute_env",
plugin_config=dask_config,
)

# 3. Define an async task within this environment
@dask_env.task
async def compute_pi(n: int) -> float:
from distributed import Client

# The client automatically connects to the provisioned cluster
client = Client()

def estimate_pi(n):
import random
count = 0
for _ in range(n):
x, y = random.random(), random.random()
if x*x + y*y <= 1:
count += 1
return 4 * count / n

futures = client.map(estimate_pi, [n // 5] * 5)
results = await client.gather(futures)
return sum(results) / len(results)

Configuring Scheduler and Worker Groups

The Dask class in flyteplugins.dask.task acts as the root configuration. It contains two primary components:

  • Scheduler: Configures the Dask scheduler pod. You can specify a custom image and resources (CPU and Memory).
  • WorkerGroup: Configures the pool of Dask workers. Key parameters include number_of_workers, image, and resources.

If image is not provided for either the scheduler or workers, flyte-sdk defaults to using the same image the task was registered with.

Automatic Code Synchronization

When a DaskTask executes in a cluster, flyte-sdk automatically ensures that your local code and dependencies are available on all nodes. This is handled by the DaskTask.pre method in plugins/dask/src/flyteplugins/dask/task.py, which registers internal Dask plugins:

  1. DownloadCodeBundleSchedulerPlugin: Runs on the scheduler pod to download the code bundle and add it to sys.path.
  2. DownloadCodeBundleWorkerPlugin: Runs on every worker pod as it initializes to perform the same setup.

This mechanism ensures that functions mapped across the cluster can be deserialized and executed correctly without manual environment setup on each node.

Customizing Resources for Large Workloads

You can define specific resource limits and requests for your Dask pods using the Resources class. This is useful when workers require significant memory for data processing.

from flyteplugins.dask import Dask, Scheduler, WorkerGroup
from flyte import Resources

# Advanced configuration with specific resource requests and limits
advanced_dask_config = Dask(
scheduler=Scheduler(
resources=Resources(
cpu=("500m", "1"),
memory=("1Gi", "2Gi")
)
),
workers=WorkerGroup(
number_of_workers=20,
resources=Resources(
cpu="2",
memory="8Gi"
)
)
)

Troubleshooting and Requirements

Image Compatibility

All images used for the scheduler and workers must have dask[distributed] installed. If you use custom images, ensure they are compatible with the version of distributed used in your task runner.

Environment Consistency

To avoid serialization errors (such as Pickle or CloudPickle mismatches), the Python version and installed dependencies should be consistent across:

  1. The job runner pod (where the task function starts).
  2. The Dask scheduler pod.
  3. All Dask worker pods.

Async Execution

Because DaskTask inherits from AsyncFunctionTaskTemplate, your task functions should be defined as async def. This allows the task to non-blockingly interact with the Dask Client and other asynchronous resources.