Skip to main content

Connector Service Infrastructure

The flyte-sdk provides a robust gRPC-based infrastructure for hosting asynchronous connectors. This service acts as a bridge between Flyte Propeller and external systems (like BigQuery or Snowflake), allowing Propeller to offload long-running task execution and monitoring to a dedicated service.

Starting the Connector Service

When you need to host connectors in your infrastructure, you can start the service using the ConnectorService class. This class initializes the gRPC server, sets up Prometheus metrics, and registers health checks.

You can start the service programmatically using the run method:

from flyte.connectors import ConnectorService

# Start the service on port 8000 with Prometheus metrics on 9090
ConnectorService.run(
port=8000,
prometheus_port=9090,
worker=10,
timeout=None,
modules=["my_custom_connector_module"]
)

Alternatively, flyte-sdk provides a CLI entry point via the flyte-connector command (defined in src/flyte/_bin/connect.py):

flyte-connector c0 --port 8000 --prometheus_port 9090 --worker 10

Internally, ConnectorService.run calls _start_grpc_server in src/flyte/connectors/utils.py. This function performs several critical setup steps:

  1. Dependency Check: Ensures flyteplugins-connector is installed.
  2. Connector Loading: Loads connectors from flyte.connectors entry points and any modules passed via the modules argument.
  3. Metrics Server: Starts an HTTP server to expose Prometheus metrics on the specified prometheus_port.
  4. Health Checks: Registers a gRPC health check service using grpc_health.v1.
  5. Service Registration: Adds AsyncConnectorService and ConnectorMetadataService to the gRPC server.

Task Lifecycle Management

The AsyncConnectorService class in src/flyte/connectors/_server.py implements the core gRPC interface for managing task lifecycles. It handles requests from Flyte Propeller to create, monitor, and delete tasks.

Creating Tasks

When a CreateTask request arrives, the service identifies the correct connector using the ConnectorRegistry based on the task type and version:

connector = ConnectorRegistry.get_connector(template.type, template.task_type_version)

The service then converts the Protobuf inputs into native Python types using the TypeEngine and convert_from_inputs_to_native before calling the connector's create method:

python_interface_inputs = {
entry.key: (TypeEngine.guess_python_type(entry.value.type), inspect.Parameter.empty)
for entry in template.interface.inputs.variables
}
native_interface = NativeInterface.from_types(inputs=python_interface_inputs, outputs={})
native_inputs = await convert_from_inputs_to_native(native_interface, Inputs(proto_inputs=request.inputs))

resource_meta = await connector.create(
task_template=request.template,
inputs=native_inputs,
# ...
)

Monitoring and Deletion

The service also provides methods for checking task status (GetTask), retrieving logs (GetTaskLogs), and cleaning up resources (DeleteTask). These methods use the resource_meta returned during creation to identify the specific job in the external system.

async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
connector = ConnectorRegistry.get_connector(request.task_category.name, request.task_category.version)
res = await connector.get(
resource_meta=connector.metadata_type.decode(request.resource_meta),
**_get_connection_kwargs(request.connection),
)
return GetTaskResponse(resource=await get_resource_proto(res))

Connector Discovery

The ConnectorMetadataService allows clients to discover which connectors are currently hosted by the service. This is useful for debugging and for Propeller to verify service capabilities.

  • ListConnectors: Returns a list of all registered connectors and their supported task types.
  • GetConnector: Returns detailed metadata for a specific connector by name.

These methods interact directly with the ConnectorRegistry to retrieve metadata about registered AsyncConnector instances.

Monitoring and Observability

The flyte-sdk connector infrastructure is designed for production monitoring. It uses the @record_connector_metrics decorator to automatically track the performance and health of connector operations.

The following metrics are exported via the Prometheus endpoint:

  • flyte_connector_requests_success_total: Counter of successful requests, labeled by task_type and operation.
  • flyte_connector_requests_failure_total: Counter of failed requests, including an error_code label.
  • flyte_connector_request_latency_seconds: Summary of time spent processing requests.
  • flyte_connector_input_literal_bytes: Summary of the size of input payloads for CreateTask operations.

If a connector operation fails, _handle_exception in src/flyte/connectors/_server.py maps the Python exception to an appropriate gRPC status code (e.g., NOT_FOUND if the connector isn't registered, or INTERNAL for other errors) and increments the failure counter.

Health Checks

The service includes a standard gRPC Health Checking Protocol implementation. It automatically registers all services defined in the connector IDL, ensuring that load balancers or orchestration tools (like Kubernetes) can accurately determine if the service is ready to receive traffic.

# From src/flyte/connectors/utils.py
for service in service_pb2.DESCRIPTOR.services_by_name.values():
health_servicer.set(service.full_name, health_pb2.HealthCheckResponse.SERVING)