Platform Extensibility
flyte-sdk provides a multi-layered extensibility model that allows developers to integrate custom data types, specialized task execution logic, and external backend services. These extensions are built using a set of base classes and registries that bridge Python-native code with the Flyte IDL (Interface Definition Language).
Type System Extensibility
The TypeEngine is the core component responsible for translating between Python types and Flyte's internal representation (Literal). To support a new Python type, you implement a TypeTransformer.
TypeTransformer
A TypeTransformer must define how to map a Python class to a Flyte LiteralType and how to convert values back and forth.
In examples/type_transformers/my_transformer/src/my_transformer/transformer.py, a custom transformer for a PositiveInt wrapper is implemented:
class PositiveIntTransformer(TypeTransformer[PositiveInt]):
def __init__(self):
super().__init__(name="PositiveInt", t=PositiveInt)
def get_literal_type(self, t: Type[PositiveInt]) -> types_pb2.LiteralType:
# Map the custom type to a standard Flyte INTEGER with a tag
return types_pb2.LiteralType(
simple=types_pb2.SimpleType.INTEGER,
structure=types_pb2.TypeStructure(tag="PositiveInt"),
)
async def to_literal(self, python_val, python_type, expected):
# Convert the Python object to a Flyte Literal proto
return literals_pb2.Literal(
scalar=literals_pb2.Scalar(
primitive=literals_pb2.Primitive(integer=python_val.value)
)
)
async def to_python_value(self, lv, expected_python_type):
# Reconstruct the Python object from a Flyte Literal
return PositiveInt(lv.scalar.primitive.integer)
# Register the transformer to make it available to the engine
TypeEngine.register(PositiveIntTransformer())
The TypeEngine.register method adds the transformer to a global registry. When flyte-sdk encounters a function signature using PositiveInt, it uses this transformer to handle serialization during task execution.
Task Templates and Plugins
Custom task types are defined by subclassing TaskTemplate or AsyncFunctionTaskTemplate. These classes define how a task is serialized and what configuration it carries to the Flyte backend.
TaskPluginRegistry
The TaskPluginRegistry maps configuration objects to specific task implementations. This allows the @env.task decorator to instantiate the correct task class based on the provided configuration.
For example, the echo plugin in plugins/echo/src/flyteplugins/echo/task.py defines a task that simply routes execution to a backend plugin:
@dataclass
class Echo:
"""Configuration object for the echo plugin."""
pass
class EchoTask(AsyncFunctionTaskTemplate):
plugin_config: Echo
task_type: str = "echo"
def custom_config(self, sctx: SerializationContext) -> dict[str, Any]:
# Return configuration that will be passed to the backend plugin
return {}
# Register the config type with the task implementation
TaskPluginRegistry.register(Echo, EchoTask)
When a user writes @env.task(config=Echo()), flyte-sdk looks up Echo in the TaskPluginRegistry and uses EchoTask to represent that task.
Custom Connectors (Agents)
For integrating with external services (like BigQuery, Databricks, or custom internal APIs), flyte-sdk provides the AsyncConnector interface. This is often referred to as the "Agent" pattern, where flyte-sdk manages the lifecycle of a job running on an external system.
AsyncConnector Interface
An AsyncConnector implementation handles the creation, polling, and deletion of external resources. In examples/apps/custom_connector_app/my_connector/connector.py, a batch job connector is implemented:
class BatchJobConnector(AsyncConnector):
name = "Batch Job Connector"
task_type_name = "batch_job"
metadata_type = BatchJobMetadata
async def create(self, task_template, inputs=None, **kwargs):
# Trigger the external job and return metadata (e.g., a job ID)
job_id = str(uuid.uuid4())[:8]
return BatchJobMetadata(job_id=job_id, created_at=time.time())
async def get(self, resource_meta, **kwargs):
# Poll the external service for the current status
# Returns a Resource object with the current phase (RUNNING, SUCCEEDED, etc.)
return Resource(phase=TaskExecution.SUCCEEDED, outputs={"result": "data"})
async def delete(self, resource_meta, **kwargs):
# Clean up or cancel the external job
pass
ConnectorRegistry.register(BatchJobConnector())
Local Execution with AsyncConnectorExecutorMixin
To enable local testing of connector-based tasks, flyte-sdk provides the AsyncConnectorExecutorMixin. When a task class inherits from this mixin, its execute method is overridden to route execution through the registered AsyncConnector.
As seen in examples/apps/custom_connector_app/my_connector/task.py:
class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "batch_job"
# ... implementation ...
The mixin's execute method (found in src/flyte/connectors/_connector.py) implements a polling loop that calls connector.create() and then repeatedly calls connector.get() until a terminal state is reached, simulating the backend's behavior during local development.
Serialization Context
During the transformation of Python tasks into Flyte IDL, flyte-sdk uses a SerializationContext. This object carries metadata required for serialization, such as the project, domain, and version, as well as the CodeBundle which contains the location of the uploaded source code. Custom tasks can access this context in methods like custom_config or container_args to dynamically adjust their serialized representation.