Working with Files and Directories
When you need to handle unstructured data like CSVs, images, or log files in flyte-sdk, the File and Dir classes provide a unified interface for local and remote storage. These classes support both synchronous and asynchronous I/O, allowing you to stream large datasets directly from S3, GCS, or Azure Blob Storage without downloading them entirely to disk.
Working with Files
The File class represents a single unstructured file. In flyte-sdk, you can create file references for existing data, upload local files, or stream data directly to remote storage.
Creating and Uploading Files
You can create a File object from a local path, which flyte-sdk will automatically upload to the configured remote storage when the task returns.
from flyte.io import File
from flyte import env
@env.task
async def upload_data() -> File:
# Create a local file
local_path = "/tmp/data.txt"
with open(local_path, "w") as f:
f.write("Hello Flyte!")
# Upload to remote storage asynchronously
return await File.from_local(local_path)
@env.task
def upload_data_sync() -> File:
# Synchronous version
return File.from_local_sync("/tmp/data.txt")
Streaming Remote Files
To avoid high memory usage with large files, use the open() method to stream data. The open() method is asynchronous by default, while open_sync() provides a standard synchronous file interface.
@env.task
async def stream_large_file(f: File) -> int:
total_bytes = 0
# Use block_size to control streaming buffer size
async with f.open("rb", block_size=1024 * 1024) as fh:
while True:
chunk = await fh.read(65536)
if not chunk:
break
# Note: fh.read() returns a memoryview in async mode
total_bytes += len(bytes(chunk))
return total_bytes
Deterministic Remote Paths
When you need to ensure that a task produces the same remote path across retries (e.g., for manual caching or external system integration), use named_remote(). This generates a path based on the task context and the provided name.
@env.task
async def create_deterministic_file(content: str) -> File:
# Produces the same path for the same name within this task execution
f = File.named_remote("results.csv")
async with f.open("w") as fh:
await fh.write(content)
return f
Working with Directories
The Dir class handles collections of files. It supports recursive walking and bulk transfers between local and remote storage.
Walking and Listing Directories
You can iterate through files in a Dir without downloading the entire directory. The walk() method yields File objects for each entry found in the remote storage.
from flyte.io import Dir
@env.task
async def process_directory(d: Dir) -> list[str]:
file_names = []
# Recursively walk the directory
async for file in d.walk(recursive=True):
file_names.append(file.name)
return file_names
@env.task
async def list_top_level(d: Dir) -> int:
# Non-recursive list of files
files = await d.list_files()
return len(files)
Handling Optional Outputs with EmptyDir
In flyte-sdk, using Optional[Dir] can lead to serialization issues. Instead, use the EmptyDir sentinel to indicate that a task did not produce a directory. The receiver can check the is_empty property.
from flyte.io import Dir, EmptyDir
@env.task
async def conditional_output(should_produce: bool) -> Dir:
if should_produce:
return await Dir.from_local("/tmp/my_dir")
return EmptyDir()
@env.task
def check_output(d: Dir):
if d.is_empty:
print("No directory was produced")
else:
print(f"Received directory at {d.path}")
Configuration and Performance
flyte-sdk uses obstore and fsspec for high-performance I/O. You can tune performance and reliability using environment variables:
| Environment Variable | Default | Description |
|---|---|---|
FLYTE_IO_BATCH_SIZE | 32 | Concurrency limit for bulk uploads in Dir.from_local. |
FLYTE_STORAGE_RETRIES | 3 | Number of retries for storage operations. |
FLYTE_STORAGE_BACKOFF_SECONDS | 5 | Initial backoff duration for retries. |
For cloud-specific authentication (S3, GCS, Azure), flyte-sdk respects standard environment variables like AWS_ACCESS_KEY_ID, GOOGLE_APPLICATION_CREDENTIALS, or AZURE_STORAGE_ACCOUNT_NAME.
Troubleshooting
Async Read Returns memoryview
When using async with file.open("rb") as fh:, calling await fh.read() returns a memoryview object rather than bytes. If your processing logic requires bytes, wrap the result:
content = bytes(await fh.read())
Local vs Remote Paths
File and Dir objects store a path string. If you instantiate them manually with a local path (e.g., File(path="/tmp/data.txt")), flyte-sdk treats them as local references. To ensure data is available to remote workers, always use from_local() or new_remote() to trigger the upload process.