Developing Async Connectors
Async connectors (also known as Agents) in flyte-sdk allow you to offload long-running tasks to external services like BigQuery, Snowflake, or Databricks. Instead of consuming Flyte worker resources while waiting for a remote job to finish, an async connector submits the job and then periodically polls for its status.
This tutorial walks you through building a custom async connector for a hypothetical batch processing service.
Define Job Metadata
The first step is to define a metadata class that tracks the external job. This class must inherit from ResourceMeta and be decorated as a dataclass. It is used to store information like job IDs or creation timestamps, which flyte-sdk uses to poll the external service.
from dataclasses import dataclass
from flyte.connectors import ResourceMeta
@dataclass
class BatchJobMetadata(ResourceMeta):
job_id: str
created_at: float
Implement the Async Connector
The core logic of your integration lives in a class inheriting from AsyncConnector. You must implement three primary methods: create, get, and delete.
import uuid
import time
from typing import Any, Dict, Optional
from flyteidl2.core.execution_pb2 import TaskExecution
from flyte.connectors import AsyncConnector, Resource, ResourceMeta
class BatchJobConnector(AsyncConnector):
name = "Batch Job Connector"
task_type_name = "batch_job"
task_type_version = 0
metadata_type = BatchJobMetadata
async def create(
self,
task_template,
output_prefix: str,
inputs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> BatchJobMetadata:
# Simulate submitting a job to an external service
job_id = str(uuid.uuid4())[:8]
return BatchJobMetadata(job_id=job_id, created_at=time.time())
async def get(self, resource_meta: BatchJobMetadata, **kwargs) -> Resource:
# Simulate polling the external service
elapsed = time.time() - resource_meta.created_at
if elapsed < 5:
return Resource(phase=TaskExecution.RUNNING, message="Job in progress")
# Return success and outputs once the job is finished
return Resource(
phase=TaskExecution.SUCCEEDED,
message="Job completed successfully",
outputs={"result": f"output-from-{resource_meta.job_id}"},
)
async def delete(self, resource_meta: BatchJobMetadata, **kwargs):
# Handle job cancellation
print(f"Cancelled job {resource_meta.job_id}")
create: Triggered when the task starts. It should return aResourceMetaobject containing the external job's identity.get: Called periodically by flyte-sdk to check the job status. It returns aResourceobject containing the currentphase(e.g.,RUNNING,SUCCEEDED,FAILED) and any outputs.delete: Called if the Flyte workflow is aborted. Use this to clean up or cancel the external job.
Register the Connector
For flyte-sdk to discover your connector, you must register it with the ConnectorRegistry. This is typically done at the module level where the connector is defined.
from flyte.connectors import ConnectorRegistry
ConnectorRegistry.register(BatchJobConnector())
Create the Flyte Task
To use the connector in a workflow, define a task class that inherits from both AsyncConnectorExecutorMixin and TaskTemplate. The AsyncConnectorExecutorMixin enables local execution by simulating the polling loop.
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.extend import TaskTemplate
from flyte.models import NativeInterface
class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
def __init__(self, name: str, **kwargs):
super().__init__(
name=name,
interface=NativeInterface(inputs={}, outputs={"result": str}),
task_type="batch_job",
image=None,
**kwargs,
)
Handling Secrets
If your connector requires credentials (like API keys), you can use ConnectorSecretsMixin to manage them. Secrets are passed to the create and get methods as keyword arguments.
from flyte.connectors._connector import ConnectorSecretsMixin
class SecureConnector(AsyncConnector, ConnectorSecretsMixin):
def __init__(self, secrets: Dict[str, str]):
super().__init__(secrets=secrets)
# ... implementation ...
async def create(self, task_template, **kwargs) -> ResourceMeta:
# Access secret value from kwargs
api_key = kwargs.get("my_api_key")
# ... use api_key to authenticate ...
Local Execution and Testing
Because BatchJobTask inherits from AsyncConnectorExecutorMixin, you can run it locally just like any other Python function. flyte-sdk will use the registered BatchJobConnector to manage the task lifecycle.
import asyncio
async def run_locally():
task = BatchJobTask(name="test_task")
# This will trigger the connector's create() and poll get() until completion
result = await task.execute()
print(f"Task result: {result}")
if __name__ == "__main__":
asyncio.run(run_locally())
When you run this, flyte-sdk looks up the connector in the ConnectorRegistry matching the task_type ("batch_job"), calls create, and then enters a loop calling get every few seconds until the phase reaches SUCCEEDED.