Skip to main content

Ray Cluster Architecture

flyte-sdk provides a seamless integration with Ray by leveraging KubeRay to manage transient clusters. This allows you to define the infrastructure for your Ray job—including head and worker node configurations, autoscaling policies, and runtime environments—directly within your Flyte task definition.

Configuring the Ray Cluster

To define a Ray cluster, you use the RayJobConfig class. This configuration specifies the topology of the cluster, including a single head node and one or more worker node groups.

Head and Worker Nodes

You configure the head node using HeadNodeConfig and worker nodes using WorkerNodeConfig. Both classes allow you to specify resource requirements, custom pod templates, and Ray-specific start parameters.

from flyteplugins.ray.task import RayJobConfig, HeadNodeConfig, WorkerNodeConfig
from flyte import Resources

ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
requests=Resources(cpu="1", memory="2Gi"),
ray_start_params={"log-color": "True"}
),
worker_node_config=[
WorkerNodeConfig(
group_name="cpu-workers",
replicas=2,
requests=Resources(cpu="2", memory="4Gi"),
)
],
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=600,
)

Internally, RayFunctionTask processes these configurations in its custom_config method. If you provide requests or limits directly in the node configs, flyte-sdk automatically generates a PodTemplate using pod_spec_from_resources. This ensures that the Ray containers (named ray-head and ray-worker respectively) have the correct Kubernetes resource specifications.

Autoscaling

You can enable autoscaling by setting enable_autoscaling=True in the RayJobConfig. When enabled, you should also define min_replicas and max_replicas in your WorkerNodeConfig to control the scaling boundaries.

worker_config = WorkerNodeConfig(
group_name="autoscaling-group",
replicas=1,
min_replicas=1,
max_replicas=5,
)

Defining a Ray Task

To execute code on the configured cluster, apply the RayJobConfig to a task using TaskEnvironment. This associates the Ray infrastructure with your Python function.

import flyte
import ray

ray_env = flyte.TaskEnvironment(
name="ray_env",
plugin_config=ray_config,
)

@ray_env.task
async def my_ray_task(n: int) -> int:
# This code runs on the Ray head node
@ray.remote
def f(x):
return x * x

futures = [f.remote(i) for i in range(n)]
return sum(ray.get(futures))

When the task starts, RayFunctionTask.pre is called. This method initializes the Ray client by calling ray.init(). If the task is running in-cluster, flyte-sdk automatically configures the runtime_env with the current working directory and excludes common temporary files (like script_mode.tar.gz and .python_history) to ensure a clean execution environment.

Runtime Environments and Dependencies

The runtime_env field in RayJobConfig allows you to specify dependencies like pip packages or environment variables that must be available across the cluster.

ray_config = RayJobConfig(
worker_node_config=[WorkerNodeConfig(group_name="workers", replicas=1)],
runtime_env={"pip": ["numpy", "pandas"]},
)

flyte-sdk handles the serialization of this environment for different versions of KubeRay. In RayFunctionTask.custom_config, the runtime_env is converted into both a base64-encoded JSON string (for older versions) and a YAML string (runtime_env_yaml), ensuring compatibility with KubeRay >= 1.1.0.

Connecting to Existing Clusters

If you already have a Ray cluster running and want to use it instead of spinning up a transient one, you can provide the cluster address in the RayJobConfig.

existing_cluster_config = RayJobConfig(
worker_node_config=[], # No workers needed for existing cluster
address="ray://existing-cluster-head:10001",
)

When an address is provided, RayFunctionTask.pre passes it directly to ray.init(address=...), bypassing the creation of a new KubeRay RayJob resource.