Skip to main content

Local Connector Execution

When you develop tasks that rely on remote agents or connectors (like BigQuery or Snowflake), you can test them in a local environment by using the AsyncConnectorExecutorMixin. This mixin enables flyte-sdk to simulate the remote execution flow locally by looking up a registered connector and polling it until completion.

Enabling Local Execution for a Task

To make a task executable locally via a connector, inherit from AsyncConnectorExecutorMixin and TaskTemplate. The mixin overrides the execute method to route the execution through the ConnectorRegistry.

from flyte.connectors import AsyncConnectorExecutorMixin
from flyte._task import TaskTemplate

class MyCustomTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "my_custom_job"

def __init__(self, name, **kwargs):
super().__init__(
name=name,
interface=..., # Define your task interface
task_type=self._TASK_TYPE,
task_type_version=0,
**kwargs,
)

Implementing and Registering a Local Connector

For the mixin to work, a corresponding AsyncConnector must be registered in the ConnectorRegistry. This connector defines how to create the job and how to get its status.

import uuid
import time
from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta
from flyteidl2.core.execution_pb2 import TaskExecution

class MyJobMetadata(ResourceMeta):
job_id: str

class MyJobConnector(AsyncConnector):
name = "My Custom Connector"
task_type_name = "my_custom_job"
task_type_version = 0
metadata_type = MyJobMetadata

async def create(self, task_template, output_prefix, inputs=None, **kwargs):
# Simulate job creation
job_id = str(uuid.uuid4())
return MyJobMetadata(job_id=job_id)

async def get(self, resource_meta, **kwargs):
# Simulate polling logic
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"result": "success"}
)

async def delete(self, resource_meta, **kwargs):
pass

# Register the connector so AsyncConnectorExecutorMixin can find it
ConnectorRegistry.register(MyJobConnector())

Handling Secrets Locally

The AsyncConnectorExecutorMixin automatically resolves secrets defined in the task's custom configuration. During local execution, it looks for these secrets in your local environment variables.

If your task template contains a secret mapping like {"api_key": "MY_ENV_VAR"}, the execute method will:

  1. Look up the environment variable MY_ENV_VAR.
  2. Pass the value as a keyword argument to the connector's create and get methods.

If a required environment variable is missing, flyte-sdk raises a ValueError.

Execution Flow

When you call a task that uses AsyncConnectorExecutorMixin locally:

  1. Lookup: It calls ConnectorRegistry.get_connector using the task's type and version.
  2. Creation: It calls connector.create(...) to initiate the job.
  3. Polling: It enters a loop, calling connector.get(...) every 3 seconds (as defined in src/flyte/connectors/_connector.py) until the job reaches a terminal phase (e.g., SUCCEEDED or FAILED).
  4. Logs: If the Resource returned by the connector contains log_links, they are logged to the console and recorded in the internal tracker.
  5. Completion: Once successful, it converts the connector's outputs back to native Python types.

Troubleshooting

FlyteConnectorNotFound

If you encounter flyte.connectors._connector.FlyteConnectorNotFound, ensure that:

  • You have called ConnectorRegistry.register() for your connector before executing the task.
  • The task_type_name and task_type_version in your connector match the task_type and task_type_version in your task.

Missing Secrets

If the execution fails with ValueError: Secret <NAME> not found in environment, ensure you have exported the corresponding environment variable:

export MY_ENV_VAR="your-secret-value"

Task Context Errors

The mixin requires an active task context. If you receive a RuntimeError: Task context is not set, ensure you are running the task within a flyte-sdk execution context (e.g., inside a workflow or using a local entrypoint that initializes the context).