Local Connector Execution
When you develop tasks that rely on remote agents or connectors (like BigQuery or Snowflake), you can test them in a local environment by using the AsyncConnectorExecutorMixin. This mixin enables flyte-sdk to simulate the remote execution flow locally by looking up a registered connector and polling it until completion.
Enabling Local Execution for a Task
To make a task executable locally via a connector, inherit from AsyncConnectorExecutorMixin and TaskTemplate. The mixin overrides the execute method to route the execution through the ConnectorRegistry.
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte._task import TaskTemplate
class MyCustomTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "my_custom_job"
def __init__(self, name, **kwargs):
super().__init__(
name=name,
interface=..., # Define your task interface
task_type=self._TASK_TYPE,
task_type_version=0,
**kwargs,
)
Implementing and Registering a Local Connector
For the mixin to work, a corresponding AsyncConnector must be registered in the ConnectorRegistry. This connector defines how to create the job and how to get its status.
import uuid
import time
from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta
from flyteidl2.core.execution_pb2 import TaskExecution
class MyJobMetadata(ResourceMeta):
job_id: str
class MyJobConnector(AsyncConnector):
name = "My Custom Connector"
task_type_name = "my_custom_job"
task_type_version = 0
metadata_type = MyJobMetadata
async def create(self, task_template, output_prefix, inputs=None, **kwargs):
# Simulate job creation
job_id = str(uuid.uuid4())
return MyJobMetadata(job_id=job_id)
async def get(self, resource_meta, **kwargs):
# Simulate polling logic
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"result": "success"}
)
async def delete(self, resource_meta, **kwargs):
pass
# Register the connector so AsyncConnectorExecutorMixin can find it
ConnectorRegistry.register(MyJobConnector())
Handling Secrets Locally
The AsyncConnectorExecutorMixin automatically resolves secrets defined in the task's custom configuration. During local execution, it looks for these secrets in your local environment variables.
If your task template contains a secret mapping like {"api_key": "MY_ENV_VAR"}, the execute method will:
- Look up the environment variable
MY_ENV_VAR. - Pass the value as a keyword argument to the connector's
createandgetmethods.
If a required environment variable is missing, flyte-sdk raises a ValueError.
Execution Flow
When you call a task that uses AsyncConnectorExecutorMixin locally:
- Lookup: It calls
ConnectorRegistry.get_connectorusing the task's type and version. - Creation: It calls
connector.create(...)to initiate the job. - Polling: It enters a loop, calling
connector.get(...)every 3 seconds (as defined insrc/flyte/connectors/_connector.py) until the job reaches a terminal phase (e.g.,SUCCEEDEDorFAILED). - Logs: If the
Resourcereturned by the connector containslog_links, they are logged to the console and recorded in the internal tracker. - Completion: Once successful, it converts the connector's outputs back to native Python types.
Troubleshooting
FlyteConnectorNotFound
If you encounter flyte.connectors._connector.FlyteConnectorNotFound, ensure that:
- You have called
ConnectorRegistry.register()for your connector before executing the task. - The
task_type_nameandtask_type_versionin your connector match thetask_typeandtask_type_versionin your task.
Missing Secrets
If the execution fails with ValueError: Secret <NAME> not found in environment, ensure you have exported the corresponding environment variable:
export MY_ENV_VAR="your-secret-value"
Task Context Errors
The mixin requires an active task context. If you receive a RuntimeError: Task context is not set, ensure you are running the task within a flyte-sdk execution context (e.g., inside a workflow or using a local entrypoint that initializes the context).