Skip to main content

DynamicBatcher

Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.

Attributes

AttributeTypeDescription
stats[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)Current BatchStats snapshot.
is_runningboolWhether the aggregation and processing loops are active.

Constructor

Signature

def DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT] = null,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_cost: int = 32000,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2,
default_cost: int = 1
) - > null

Parameters

NameTypeDescription
process_fnProcessFn[RecordT, ResultT] = nullasync def f(batch: list[RecordT]) - > list[ResultT] Must return results in the same order as the input batch.
cost_estimator`CostEstimatorFn[RecordT]None` = None
target_batch_costint = 32000Cost budget per batch. The aggregator fills batches up to this limit before dispatching.
max_batch_sizeint = 256Hard cap on records per batch regardless of cost budget.
min_batch_sizeint = 1Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress.
batch_timeout_sfloat = 0.05Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches.
max_queue_sizeint = 5000Bounded queue size. When full, submit awaits (backpressure).
prefetch_batchesint = 2Number of pre-assembled batches to buffer between the aggregation and processing loops.
default_costint = 1Fallback cost when no estimator is available.

Methods


stats()

@classmethod
def stats() - > [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)

Current BatchStats snapshot.

Returns

TypeDescription
[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)

is_running()

@classmethod
def is_running() - > bool

Whether the aggregation and processing loops are active.

Returns

TypeDescription
bool

start()

@classmethod
def start() - > None

Start the aggregation and processing loops. Raises RuntimeError if the batcher is already running.

Returns

TypeDescription
None

stop()

@classmethod
def stop() - > None

Performs a graceful shutdown by processing all enqueued work and then stopping. This method blocks until every pending future is resolved.

Returns

TypeDescription
None

submit()

@classmethod
def submit(
record: RecordT,
estimated_cost: int | None = None
) - > asyncio.Future[ResultT]

Submit a single record for batched processing. Returns an asyncio.Future that resolves once the batch containing this record has been processed.

Parameters

NameTypeDescription
recordRecordTThe input record.
estimated_cost`intNone` = None

Returns

TypeDescription
asyncio.Future[ResultT]A future whose result is the corresponding entry from the list returned by process_fn.

submit_batch()

@classmethod
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int]| None = None
) - > list[asyncio.Future[ResultT]]

Submits multiple records for batched processing and returns a list of their corresponding futures. This method is a convenience for submitting several records at once.

Parameters

NameTypeDescription
recordsSequence[RecordT]Iterable of input records.
estimated_cost`Sequence[int]None` = None

Returns

TypeDescription
list[asyncio.Future[ResultT]]List of futures, one per record.