Skip to main content

Sharded Data with JSONL Directories

When datasets grow too large for a single file, managing them as a collection of shards becomes necessary for performance and scalability. The flyte-sdk provides the JsonlDir class to handle these sharded datasets transparently, managing file rotation during writes and providing a unified stream during reads.

Shard Naming and Structure

A JsonlDir is a directory containing files that follow a specific naming convention: part-NNNNN.jsonl (or part-NNNNN.jsonl.zst for compressed shards). The indices are zero-padded to 5 digits (e.g., part-00000.jsonl, part-00001.jsonl).

When reading, flyte-sdk automatically discovers these files using _get_sorted_shards() and processes them in alphabetical order. This ensures that if your data has an inherent order across shards, it is preserved during iteration.

Writing Sharded Data

To create a sharded dataset, use the writer() (async) or writer_sync() (sync) context managers. These writers automatically handle "rotation"—closing the current file and opening a new one—when specific thresholds are met.

You can control rotation using two main parameters:

  • max_records_per_shard: The maximum number of JSON objects per file.
  • max_bytes_per_shard: The maximum uncompressed size of a file (defaults to 256 MB).
from flyteplugins.jsonl import JsonlDir
from flytekit import task

@task
async def create_shards() -> JsonlDir:
# Create a reference to a remote directory
d = JsonlDir.new_remote("s3://my-bucket/sharded-data")

# Use the writer to automatically rotate every 1000 records
async with d.writer(max_records_per_shard=1000) as w:
for i in range(5000):
await w.write({"id": i, "data": "..."})

return d

Internally, the JsonlDirWriter (and its sync counterpart JsonlDirWriterSync) tracks the current shard index, record count, and byte count. When _should_rotate() returns true, the writer flushes the current JsonlFile and increments the index for the next one.

Append Safety

The JsonlDir.writer() is designed to be safe for appending to existing directories. Before writing the first record, it calls _next_index(), which scans the directory for existing part-NNNNN files and starts numbering from the highest index found plus one. This prevents accidental overwrites when multiple tasks write to the same location.

Reading Sharded Data

JsonlDir abstracts the underlying files so you can iterate over the entire dataset as if it were a single stream.

Transparent Iteration

The iter_records() method yields individual records from all shards in order.

@task
async def process_data(d: JsonlDir):
async for record in d.iter_records():
# This iterates through part-00000.jsonl, then part-00001.jsonl, etc.
print(record["id"])

Performance with Prefetching

To minimize I/O wait times, iter_records() implements a prefetching strategy. While you are processing records from the current shard, flyte-sdk kicks off a background task to start reading the next shard into an asyncio.Queue.

This overlaps network I/O with CPU processing. You can control the memory footprint of this prefetch buffer using the queue_size parameter (defaulting to 8192 records).

Batch Processing

For higher throughput, especially when working with data science libraries, JsonlDir supports batch-based iteration.

Record Batches

iter_batches() yields lists of records, which is often more efficient than processing one record at a time.

async for batch in d.iter_batches(batch_size=500):
# batch is a list[dict]
process_batch(batch)

Arrow Integration

If you have pyarrow installed, iter_arrow_batches() provides the highest performance by yielding pyarrow.RecordBatch objects. This bypasses much of the Python overhead by delegating parsing to the underlying JsonlFile.iter_arrow_batches() implementation.

async for arrow_batch in d.iter_arrow_batches(batch_size=65536):
# arrow_batch is a pyarrow.RecordBatch
table = pa.Table.from_batches([arrow_batch])

Error Handling and Compression

  • Compression: You can write compressed shards by passing shard_extension=".jsonl.zst" to the writer. The reader automatically detects and handles mixed compression (some shards .jsonl, others .jsonl.zst) within the same directory.
  • Error Handling: Both readers and writers support an on_error parameter. You can set it to "raise" (default), "skip", or provide a custom callable to handle malformed JSON lines without stopping the entire iteration.
  • Remote Storage: Because JsonlDir inherits from the standard Flyte Dir class, it natively supports S3, GCS, and local filesystems via the configured Flyte storage provider.