Distributed Job Run Policies
Distributed jobs, such as multi-node PyTorch training, often involve complex orchestration of multiple Kubernetes pods. Managing the lifecycle of these pods—determining when they should be deleted, how long their metadata should persist, and how many times a failed job should be retried—is critical for both resource efficiency and debugging. In flyte-sdk, this lifecycle management is handled via the RunPolicy class within the PyTorch plugin.
The RunPolicy Class
The RunPolicy class, located in plugins/pytorch/src/flyteplugins/pytorch/task.py, is a configuration object that defines how the underlying Kubeflow operator should manage a PyTorchJob. It provides four primary controls:
clean_pod_policy: Determines which pods are deleted after the job completes. It accepts "None", "all", or "Running". Setting this to "all" ensures that all pods (including failed ones) are removed, which is useful for saving costs, while "Running" might keep failed pods around for log inspection.ttl_seconds_after_finished: Defines a Time-To-Live (TTL) for the job resource itself. Once the job finishes, the Kubernetes controller will wait for this duration before deleting the job object.active_deadline_seconds: Sets a hard limit on the job's duration. If the job runs longer than this value, it is terminated. This is a safeguard against hanging jobs or infinite loops in training scripts.backoff_limit: Specifies the number of retries allowed before the job is officially marked as failed.
Integration with Distributed Training
The RunPolicy is not used in isolation; it is a component of the Elastic configuration class. When defining a distributed PyTorch task, you pass a RunPolicy instance to the Elastic config, which is then used by the TorchFunctionTask.
The following example, adapted from plugins/pytorch/tests/test_task.py, demonstrates how to configure these policies:
from flyteplugins.pytorch.task import Elastic, RunPolicy, TorchFunctionTask
# Define a configuration with specific lifecycle policies
torch_config = Elastic(
nnodes=2,
nproc_per_node=2,
run_policy=RunPolicy(
clean_pod_policy="all",
backoff_limit=4,
ttl_seconds_after_finished=100,
active_deadline_seconds=200,
),
)
Implementation and Serialization
When a task is executed, flyte-sdk serializes the RunPolicy into a format compatible with the Kubeflow operator. This happens in the TorchFunctionTask.custom_config method.
One notable implementation detail is how clean_pod_policy is handled. While the Python class uses simple strings like "all", the underlying Protobuf definition requires specific enum values. The TorchFunctionTask performs this mapping dynamically:
# From plugins/pytorch/src/flyteplugins/pytorch/task.py
policy = common_pb2.RunPolicy(
clean_pod_policy=(
common_pb2.CleanPodPolicy.Value(
"CLEANPOD_POLICY_" + self.plugin_config.run_policy.clean_pod_policy.upper()
)
if self.plugin_config.run_policy.clean_pod_policy
else None
),
ttl_seconds_after_finished=self.plugin_config.run_policy.ttl_seconds_after_finished,
active_deadline_seconds=self.plugin_config.run_policy.active_deadline_seconds,
backoff_limit=self.plugin_config.run_policy.backoff_limit,
)
This transformation ensures that the human-readable strings provided in the Python SDK are correctly translated to the CLEANPOD_POLICY_ALL, CLEANPOD_POLICY_RUNNING, or CLEANPOD_POLICY_NONE values expected by the Flyte IDL and the Kubeflow operator.
Constraints and Behavior
The behavior of these policies is subject to certain Kubernetes and Kubeflow constraints:
- Restart Policy Dependency: The
active_deadline_secondspolicy is only enforced on pods where therestartPolicyis set toOnFailureorAlways. If the pods are not configured to restart, this deadline may not behave as expected. - Case Sensitivity: While the serialization logic converts the
clean_pod_policyto uppercase, the input to theRunPolicyconstructor must match the expected literal values ("None", "all", or "Running") to pass type checking. - Defaults: If no
RunPolicyis provided to theElasticconfiguration, it defaults toNone. In this state, flyte-sdk does not apply any specific lifecycle overrides, leaving the behavior to the default settings of the Kubernetes cluster's Kubeflow operator.