Skip to main content

Remote Orchestration & Management

Remote orchestration and management in flyte-sdk allows you to interact with the Flyte backend to monitor executions, manage projects, and handle secrets. These operations are primarily facilitated through the flyte.remote module, which provides high-level abstractions for Flyte entities.

Initialization and Authentication

Before interacting with the Flyte backend, you must initialize the flyte-sdk with the appropriate credentials and endpoint.

Initializing with an API Key

The most common way to initialize flyte-sdk is using an encoded API key, which contains the endpoint, client ID, client secret, and organization.

import flyte

# Initialize using an API key (reads from FLYTE_API_KEY env var if not provided)
flyte.init_from_api_key(
api_key="your_encoded_api_key",
project="my_project",
domain="development"
)

Initializing from a Configuration File

You can also initialize flyte-sdk using a standard Flyte configuration file.

import flyte

# Initialize from a specific config file
flyte.init_from_config(path_or_config="~/.flyte/config.yaml")

Using Passthrough Authentication

For scenarios requiring custom metadata (e.g., passing user-specific tokens), use init_passthrough and the auth_metadata context manager.

import flyte
from flyte.remote import auth_metadata, Run

# Initialize in passthrough mode
flyte.init_passthrough(endpoint="flyte.example.com", insecure=False)

# Use custom metadata for a specific block of code
with auth_metadata(("key1", "value1"), ("key2", "value2")):
runs = Run.listall(limit=5)
for run in runs:
print(run.name)

Managing Runs

The Run class in flyte.remote provides methods to list, retrieve, and control task executions.

Listing and Aborting Runs

You can list runs with filters and perform bulk actions like aborting long-running executions.

from datetime import datetime, timedelta, timezone
import flyte.remote as remote

# Initialize flyte-sdk first
# ...

async def abort_stale_runs():
ten_hours_ago = datetime.now(timezone.utc) - timedelta(hours=10)

# List all runs currently in the 'running' phase
runs = [r async for r in remote.Run.listall.aio(in_phase=("running",))]

for r in runs:
# Access action metadata for start time
if r.action.start_time < ten_hours_ago:
print(f"Aborting stale run: {r.name}")
await r.abort.aio(reason="Stale run detected by management script")

# Run the async function
# asyncio.run(abort_stale_runs())

Monitoring Execution Progress

Use wait() to block until a run reaches a terminal state, or watch() to stream status updates.

from flyte.remote import Run

run = Run.get("my_run_name")

# Wait for the run to finish and display a progress panel
run.wait()

# Or watch the run for granular updates
async def watch_run(run_name: str):
run = await Run.get.aio(run_name)
async for details in run.watch():
print(f"Phase: {details.phase}, Started: {details.start_time}")

Retrieving Logs and Data

You can fetch logs and execution inputs/outputs directly from a Run object.

from flyte.remote import Run

run = Run.get("my_run_name")

# Print logs to console
run.show_logs(max_lines=50)

# Get logs as an iterator
for line in run.get_logs(filter_system=True):
print(line)

# Access inputs and outputs
inputs = run.inputs()
outputs = run.outputs()
print(f"Inputs: {inputs}")
print(f"Outputs: {outputs}")

Remote Task References

The Task class allows you to reference and execute tasks that are already deployed on a Flyte cluster.

Fetching Remote Task Metadata

Use Task.get to create a LazyEntity and fetch() to retrieve its full details.

from flyte.remote import Task
import flyte.errors

try:
# Create a reference to the latest version of a task
lazy_task = Task.get("my_task_name", project="p", domain="d", auto_version="latest")

# Explicitly fetch to validate existence and get metadata
task_details = lazy_task.fetch()
print(f"Task version: {task_details.version}")
print(f"Required arguments: {task_details.required_args}")

except flyte.errors.RemoteTaskNotFoundError:
print("The specified task does not exist on the cluster.")

Executing Remote Tasks

Remote tasks can be called like regular functions, but they must be submitted to a remote cluster. Positional arguments are not supported; you must use keyword arguments.

from flyte.remote import Task

# Get the task reference
remote_task = Task.get("math.multiply", project="p", domain="d", auto_version="latest")

# Execute the task on the cluster (must be called within a task context or via a controller)
# result = remote_task(a=5, b=10)

Overriding Task Configuration

You can create a new task reference with overridden resources, retries, or timeouts using override().

import flyte
from flyte.remote import Task

lazy_task = Task.get("heavy_task", project="p", domain="d", auto_version="latest")

# Override resources for this specific reference
overridden_task = lazy_task.override(
resources=flyte.Resources(cpu=4, memory="16Gi"),
retries=3,
timeout=timedelta(minutes=30)
)

Administrative Actions

flyte-sdk provides APIs for managing projects and secrets within the Flyte backend.

Project Management

You can create, update, and archive projects.

from flyte.remote import Project

# Create a new project
Project.create(
id="new-project",
name="New Project Display Name",
description="A project for data processing",
labels={"team": "data-science"}
)

# List active projects
for p in Project.listall(archived=False):
print(f"Project: {p.pb2.id}")

# Archive a project
project = Project.get("new-project")
project.archive()

Secret Management

Secrets can be managed for use within tasks. Note that image_pull secrets require that project and domain are NOT set during creation.

from flyte.remote import Secret

# Create a regular secret
Secret.create(name="api-token", value="secret-value-123", type="regular")

# Create an image pull secret (project/domain must be None)
# Ensure flyte.init was called without project/domain or override them
Secret.create(name="my-registry-auth", value='{"auths":...}', type="image_pull")

# List and delete secrets
for s in Secret.listall():
print(f"Secret: {s.name}, Type: {s.type}")

Secret.delete("api-token")

Troubleshooting and Gotchas

  • Remote Execution: Remote tasks retrieved via Task.get cannot be executed locally. They are intended for cross-project orchestration or remote submission.
  • Positional Arguments: When calling a remote task, always use keyword arguments. Positional arguments will raise a RemoteTaskUsageError.
  • Lazy Entities: Task.get returns a LazyEntity. It does not perform a network request until you call fetch() or attempt to execute the task.
  • Auto-versioning: Using auto_version="current" in Task.get is only valid when the code is running inside an active Flyte task context (e.g., during a remote execution).
  • Image Pull Secrets: When creating a secret of type image_pull, the flyte-sdk initialization must not have a default project or domain set, as these secrets are typically global or handled at a different scope by the backend.