DynamicBatcher
Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.
Attributes
| Attribute | Type | Description |
|---|---|---|
| stats | [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats) | Current BatchStats snapshot. |
| is_running | bool | Whether the aggregation and processing loops are active. |
Constructor
Signature
def DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT] = null,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_cost: int = 32000,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2,
default_cost: int = 1
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| process_fn | ProcessFn[RecordT, ResultT] = null | async def f(batch: list[RecordT]) - > list[ResultT] Must return results in the same order as the input batch. |
| cost_estimator | `CostEstimatorFn[RecordT] | None` = None |
| target_batch_cost | int = 32000 | Cost budget per batch. The aggregator fills batches up to this limit before dispatching. |
| max_batch_size | int = 256 | Hard cap on records per batch regardless of cost budget. |
| min_batch_size | int = 1 | Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress. |
| batch_timeout_s | float = 0.05 | Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches. |
| max_queue_size | int = 5000 | Bounded queue size. When full, submit awaits (backpressure). |
| prefetch_batches | int = 2 | Number of pre-assembled batches to buffer between the aggregation and processing loops. |
| default_cost | int = 1 | Fallback cost when no estimator is available. |
Methods
stats()
@classmethod
def stats() - > [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)
Current BatchStats snapshot.
Returns
| Type | Description |
|---|---|
[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats) |
is_running()
@classmethod
def is_running() - > bool
Whether the aggregation and processing loops are active.
Returns
| Type | Description |
|---|---|
bool |
start()
@classmethod
def start() - > None
Start the aggregation and processing loops. Raises RuntimeError if the batcher is already running.
Returns
| Type | Description |
|---|---|
None |
stop()
@classmethod
def stop() - > None
Performs a graceful shutdown by processing all enqueued work and then stopping. This method blocks until every pending future is resolved.
Returns
| Type | Description |
|---|---|
None |
submit()
@classmethod
def submit(
record: RecordT,
estimated_cost: int | None = None
) - > asyncio.Future[ResultT]
Submit a single record for batched processing. Returns an asyncio.Future that resolves once the batch containing this record has been processed.
Parameters
| Name | Type | Description |
|---|---|---|
| record | RecordT | The input record. |
| estimated_cost | `int | None` = None |
Returns
| Type | Description |
|---|---|
asyncio.Future[ResultT] | A future whose result is the corresponding entry from the list returned by process_fn. |
submit_batch()
@classmethod
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int]| None = None
) - > list[asyncio.Future[ResultT]]
Submits multiple records for batched processing and returns a list of their corresponding futures. This method is a convenience for submitting several records at once.
Parameters
| Name | Type | Description |
|---|---|---|
| records | Sequence[RecordT] | Iterable of input records. |
| estimated_cost | `Sequence[int] | None` = None |
Returns
| Type | Description |
|---|---|
list[asyncio.Future[ResultT]] | List of futures, one per record. |