Skip to main content

Streaming IO with JSONL Files

When processing large datasets in flyte-sdk, loading entire JSON files into memory can lead to OOM (Out of Memory) errors. The JsonlFile class in the flyteplugins.jsonl package provides a memory-efficient way to stream JSON Lines data, allowing you to process records one by one or in batches without exhausting system resources.

Streaming Records Asynchronously

To write data to a JSONL file asynchronously, use the writer() context manager. This returns a JsonlWriter that buffers records in memory and flushes them to storage once a size threshold is reached.

from flyteplugins.jsonl import JsonlFile
from flyte.io import env

@env.task
async def write_data() -> JsonlFile:
# Create a remote file reference
f = JsonlFile.new_remote("data.jsonl")

# Use the async writer context manager
async with f.writer() as w:
for i in range(1000):
await w.write({"id": i, "message": f"record-{i}"})

return f

@env.task
async def read_data(f: JsonlFile):
# Stream records one by one
async for record in f.iter_records():
print(f"Processing record: {record['id']}")

The JsonlWriter uses orjson for high-performance serialization and maintains an internal _JsonlBuffer (default 1MB) to minimize the number of write operations to the underlying storage.

Synchronous Streaming

If your task is not asynchronous, flyte-sdk provides synchronous equivalents for both reading and writing.

@env.task
def process_sync() -> JsonlFile:
f = JsonlFile.new_remote("data_sync.jsonl")

# Use writer_sync for synchronous buffered writes
with f.writer_sync() as w:
w.write_many([{"key": "val1"}, {"key": "val2"}])
w.write({"key": "val3"})

# Iterate records synchronously
for record in f.iter_records_sync():
print(record)

return f

Transparent Zstd Compression

JsonlFile automatically handles compression based on the file extension. If the path ends in .jsonl.zst or .jsonl.zstd, flyte-sdk uses zstandard to compress data on the fly during writes and decompress it during reads.

@env.task
async def compressed_io():
# The .zst extension triggers transparent compression
f = JsonlFile.new_remote("large_data.jsonl.zst")

async with f.writer(compression_level=5) as w:
await w.write({"data": "compressed content"})

# Reading works exactly the same way
async for record in f.iter_records():
assert record["data"] == "compressed content"

When writing compressed files, JsonlFile uses _ZstdJsonlWriter internally, which offloads the compression work to a thread pool to avoid blocking the event loop.

Streaming to Arrow RecordBatches

For data science workflows, you can stream JSONL data directly into pyarrow.RecordBatch objects. This is highly efficient for converting JSON data into tabular formats for processing with libraries like Pandas or Polars.

@env.task
async def to_arrow(f: JsonlFile):
import pyarrow as pa

# Stream in batches of 4096 rows
async for batch in f.iter_arrow_batches(batch_size=4096):
# batch is a pyarrow.RecordBatch
print(f"Received batch with {batch.num_rows} rows")

# Example: Convert batch to a table
table = pa.Table.from_batches([batch])
# ... process table ...

[!IMPORTANT] Arrow integration requires the pyarrow package. If it is missing, iter_arrow_batches will raise a ModuleNotFoundError.

Handling Malformed Data

By default, iter_records raises an exception if it encounters a malformed JSON line. You can change this behavior using the on_error parameter.

Skipping Corrupt Lines

Set on_error="skip" to log a warning and continue processing when a line cannot be parsed.

async for record in f.iter_records(on_error="skip"):
# Corrupt lines are logged as warnings and skipped
process(record)

Custom Error Handling

Provide a callable to implement custom logic for handling errors, such as collecting failed lines for later inspection.

def my_handler(line_number: int, raw_line: bytes, exc: Exception):
print(f"Failed at line {line_number}: {exc}")

async for record in f.iter_records(on_error=my_handler):
process(record)

Performance Tuning

You can tune the streaming performance by adjusting the buffer size and compression settings:

  • flush_bytes: Controls how much data is buffered before writing to storage. The default is 1MB (1 << 20). Larger buffers can improve throughput for remote storage but increase memory usage.
  • compression_level: When using .zst files, this controls the Zstd compression level (default is 3). Higher levels result in smaller files but slower write speeds.
# High-throughput write with large buffer and high compression
async with f.writer(flush_bytes=10 * 1024 * 1024, compression_level=9) as w:
await w.write_many(large_dataset)

Troubleshooting

  • Missing Data: Always use the writer() or writer_sync() context managers. If you don't, the final buffer may not be flushed to storage, and Zstd streams may not be properly closed.
  • Memory Spikes: If you see memory spikes while using iter_arrow_batches, try reducing the batch_size.
  • Extension Detection: Compression is only enabled for .jsonl.zst or .jsonl.zstd. Using .jsonl.gz or other extensions will treat the file as plain text, likely resulting in parsing errors.