Skip to main content

Developing Async Connectors

Async connectors (also known as Agents) in flyte-sdk allow you to offload long-running tasks to external services like BigQuery, Snowflake, or Databricks. Instead of consuming Flyte worker resources while waiting for a remote job to finish, an async connector submits the job and then periodically polls for its status.

This tutorial walks you through building a custom async connector for a hypothetical batch processing service.

Define Job Metadata

The first step is to define a metadata class that tracks the external job. This class must inherit from ResourceMeta and be decorated as a dataclass. It is used to store information like job IDs or creation timestamps, which flyte-sdk uses to poll the external service.

from dataclasses import dataclass
from flyte.connectors import ResourceMeta

@dataclass
class BatchJobMetadata(ResourceMeta):
job_id: str
created_at: float

Implement the Async Connector

The core logic of your integration lives in a class inheriting from AsyncConnector. You must implement three primary methods: create, get, and delete.

import uuid
import time
from typing import Any, Dict, Optional
from flyteidl2.core.execution_pb2 import TaskExecution
from flyte.connectors import AsyncConnector, Resource, ResourceMeta

class BatchJobConnector(AsyncConnector):
name = "Batch Job Connector"
task_type_name = "batch_job"
task_type_version = 0
metadata_type = BatchJobMetadata

async def create(
self,
task_template,
output_prefix: str,
inputs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> BatchJobMetadata:
# Simulate submitting a job to an external service
job_id = str(uuid.uuid4())[:8]
return BatchJobMetadata(job_id=job_id, created_at=time.time())

async def get(self, resource_meta: BatchJobMetadata, **kwargs) -> Resource:
# Simulate polling the external service
elapsed = time.time() - resource_meta.created_at
if elapsed < 5:
return Resource(phase=TaskExecution.RUNNING, message="Job in progress")

# Return success and outputs once the job is finished
return Resource(
phase=TaskExecution.SUCCEEDED,
message="Job completed successfully",
outputs={"result": f"output-from-{resource_meta.job_id}"},
)

async def delete(self, resource_meta: BatchJobMetadata, **kwargs):
# Handle job cancellation
print(f"Cancelled job {resource_meta.job_id}")
  • create: Triggered when the task starts. It should return a ResourceMeta object containing the external job's identity.
  • get: Called periodically by flyte-sdk to check the job status. It returns a Resource object containing the current phase (e.g., RUNNING, SUCCEEDED, FAILED) and any outputs.
  • delete: Called if the Flyte workflow is aborted. Use this to clean up or cancel the external job.

Register the Connector

For flyte-sdk to discover your connector, you must register it with the ConnectorRegistry. This is typically done at the module level where the connector is defined.

from flyte.connectors import ConnectorRegistry

ConnectorRegistry.register(BatchJobConnector())

Create the Flyte Task

To use the connector in a workflow, define a task class that inherits from both AsyncConnectorExecutorMixin and TaskTemplate. The AsyncConnectorExecutorMixin enables local execution by simulating the polling loop.

from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.extend import TaskTemplate
from flyte.models import NativeInterface

class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
def __init__(self, name: str, **kwargs):
super().__init__(
name=name,
interface=NativeInterface(inputs={}, outputs={"result": str}),
task_type="batch_job",
image=None,
**kwargs,
)

Handling Secrets

If your connector requires credentials (like API keys), you can use ConnectorSecretsMixin to manage them. Secrets are passed to the create and get methods as keyword arguments.

from flyte.connectors._connector import ConnectorSecretsMixin

class SecureConnector(AsyncConnector, ConnectorSecretsMixin):
def __init__(self, secrets: Dict[str, str]):
super().__init__(secrets=secrets)
# ... implementation ...

async def create(self, task_template, **kwargs) -> ResourceMeta:
# Access secret value from kwargs
api_key = kwargs.get("my_api_key")
# ... use api_key to authenticate ...

Local Execution and Testing

Because BatchJobTask inherits from AsyncConnectorExecutorMixin, you can run it locally just like any other Python function. flyte-sdk will use the registered BatchJobConnector to manage the task lifecycle.

import asyncio

async def run_locally():
task = BatchJobTask(name="test_task")
# This will trigger the connector's create() and poll get() until completion
result = await task.execute()
print(f"Task result: {result}")

if __name__ == "__main__":
asyncio.run(run_locally())

When you run this, flyte-sdk looks up the connector in the ConnectorRegistry matching the task_type ("batch_job"), calls create, and then enters a loop calling get every few seconds until the phase reaches SUCCEEDED.