A generic directory class representing a directory with files of a specified format. Provides both async and sync interfaces for directory operations. All methods without _sync suffix are async.
Attributes
| Attribute | Type | Description |
|---|
| path | str | The path to the directory (can be local or remote). |
| name | Optional[str] = None | Optional name for the directory (defaults to basename of path). |
| format | str = "" | The format of the files in the directory. |
| hash | Optional[str] = None | An optional hash value used for cache key computation when this Dir is used as an input to discoverable tasks. |
Constructor
Signature
def Dir(
path: str,
name: Optional[str] = None
) - > null
Parameters
| Name | Type | Description |
|---|
| path | str | The path to the directory (can be local or remote) |
| name | Optional[str] = None | Optional name for the directory (defaults to basename of path) |
Methods
pre_init()
@classmethod
def pre_init(
data: Any
)
Internal: Pydantic validator to set default name from path. Not intended for direct use.
Parameters
| Name | Type | Description |
|---|
| data | Any | The data dictionary being validated by Pydantic. |
is_empty()
@classmethod
def is_empty() - > bool
True when this is a sentinel Dir produced by :class:EmptyDir/Dir.empty() — i.e. the task didn't actually produce a directory. Use this to branch on whether the upstream task emitted real data without dealing with Optional[Dir] (which the type engine cannot round-trip correctly through SerializableType).
Returns
| Type | Description |
|---|
bool | True if the directory is an empty sentinel, False otherwise. |
empty()
@classmethod
def empty() - > [Dir](dir.md?sid=flyte_io__dir_dir)
Return a sentinel Dir representing 'no directory was produced'. Use as the return value when a task may or may not produce an output directory; the caller can check :attr:Dir.is_empty to detect the sentinel. Round-trips cleanly through Flyte serialization (unlike Optional[Dir]).
Returns
| Type | Description |
|---|
[Dir](dir.md?sid=flyte_io__dir_dir) | A sentinel Dir instance indicating that no directory was produced. |
lazy_uploader()
@classmethod
def lazy_uploader() - > Callable[[], Coroutine[Any, Any, tuple[str | None, str]]]| None
Retrieves the callable responsible for asynchronously uploading the local directory to remote storage when in remote mode.
Returns
| Type | Description |
|---|
| `Callable[[], Coroutine[Any, Any, tuple[str | None, str]]] |
lazy_uploader()
@classmethod
def lazy_uploader(
lazy_uploader: Callable[[], Coroutine[Any, Any, tuple[str | None, str]]]| None
)
Sets the callable responsible for asynchronously uploading the local directory to remote storage when in remote mode.
Parameters
| Name | Type | Description |
|---|
| lazy_uploader | `Callable[[], Coroutine[Any, Any, tuple[str | None, str]]] |
schema_match()
@classmethod
def schema_match(
incoming: dict
)
Internal: Check if incoming schema matches Dir schema. Not intended for direct use.
Parameters
| Name | Type | Description |
|---|
| incoming | dict | The incoming schema dictionary to compare against the Dir schema. |
walk()
@classmethod
def walk(
recursive: bool = True,
max_depth: Optional[int] = None
) - > AsyncIterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Asynchronously walk through the directory and yield File objects. Use this to iterate through all files in a directory. Each yielded File can be read directly without downloading.
Parameters
| Name | Type | Description |
|---|
| recursive | bool = True | If True, recursively walk subdirectories. If False, only list files in the top-level directory. |
| max_depth | Optional[int] = None | Maximum depth for recursive walking. If None, walk through all subdirectories. |
Returns
| Type | Description |
|---|
AsyncIterator[[File](../file/file.md?sid=flyte_io__file_file)[T]] | An asynchronous iterator that yields File objects for each file found in the directory. |
walk_sync()
@classmethod
def walk_sync(
recursive: bool = True,
file_pattern: str = "*",
max_depth: Optional[int] = None
) - > Iterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Synchronously walk through the directory and yield File objects. Use this in non-async tasks to iterate through all files in a directory.
Parameters
| Name | Type | Description |
|---|
| recursive | bool = True | If True, recursively walk subdirectories. If False, only list files in the top-level directory. |
| file_pattern | str = "*" | Glob pattern to filter files (e.g., ".txt", ".csv"). Default is "*" (all files). |
| max_depth | Optional[int] = None | Maximum depth for recursive walking. If None, walk through all subdirectories. |
Returns
| Type | Description |
|---|
Iterator[[File](../file/file.md?sid=flyte_io__file_file)[T]] | An iterator that yields File objects for each file found in the directory. |
list_files()
@classmethod
def list_files() - > List[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Asynchronously get a list of all files in the directory (non-recursive). Use this when you need a list of all files in the top-level directory at once.
Returns
| Type | Description |
|---|
List[[File](../file/file.md?sid=flyte_io__file_file)[T]] | A list of File objects for files in the top-level directory. |
list_files_sync()
@classmethod
def list_files_sync() - > List[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Synchronously get a list of all files in the directory (non-recursive). Use this in non-async tasks when you need a list of all files in the top-level directory at once.
Returns
| Type | Description |
|---|
List[[File](../file/file.md?sid=flyte_io__file_file)[T]] | A list of File objects for files in the top-level directory. |
download()
@classmethod
def download(
local_path: Optional[Union[str, Path]] = None
) - > str
Asynchronously download the entire directory to a local path. Use this when you need to download all files in a directory to your local filesystem for processing.
Parameters
| Name | Type | Description |
|---|
| local_path | Optional[Union[str, Path]] = None | The local path to download the directory to. If None, a temporary directory will be used and a path will be generated. |
Returns
| Type | Description |
|---|
str | The absolute path to the downloaded directory. |
download_sync()
@classmethod
def download_sync(
local_path: Optional[Union[str, Path]] = None
) - > str
Synchronously download the entire directory to a local path. Use this in non-async tasks when you need to download all files in a directory to your local filesystem.
Parameters
| Name | Type | Description |
|---|
| local_path | Optional[Union[str, Path]] = None | The local path to download the directory to. If None, a temporary directory will be used and a path will be generated. |
Returns
| Type | Description |
|---|
str | The absolute path to the downloaded directory. |
from_local()
@classmethod
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str] = None,
dir_cache_key: Optional[str] = None,
batch_size: Optional[int] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]
Asynchronously create a new Dir by uploading a local directory to remote storage. Use this in async tasks when you have a local directory that needs to be uploaded to remote storage.
Parameters
| Name | Type | Description |
|---|
| local_path | Union[str, Path] | Path to the local directory. |
| remote_destination | Optional[str] = None | Optional remote path to store the directory. If None, a path will be automatically generated. |
| dir_cache_key | Optional[str] = None | Optional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks. If not specified, the cache key will be based on directory attributes. |
| batch_size | Optional[int] = None | Optional concurrency limit for uploading files. If not specified, the default value is determined by the FLYTE_IO_BATCH_SIZE environment variable (default: 32). |
Returns
| Type | Description |
|---|
[Dir](dir.md?sid=flyte_io__dir_dir)[T] | A new Dir instance pointing to the uploaded directory. |
from_local_sync()
@classmethod
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str] = None,
dir_cache_key: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]
Synchronously create a new Dir by uploading a local directory to remote storage. Use this in non-async tasks when you have a local directory that needs to be uploaded to remote storage.
Parameters
| Name | Type | Description |
|---|
| local_path | Union[str, Path] | Path to the local directory. |
| remote_destination | Optional[str] = None | Optional remote path to store the directory. If None, a path will be automatically generated. |
| dir_cache_key | Optional[str] = None | Optional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks. If not specified, the cache key will be based on directory attributes. |
Returns
| Type | Description |
|---|
[Dir](dir.md?sid=flyte_io__dir_dir)[T] | A new Dir instance pointing to the uploaded directory. |
new_remote()
@classmethod
def new_remote(
dir_name: Optional[str] = None,
hash: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]
Create a new Dir reference for a remote directory that will be written to. Use this when you want to create a new directory and write files into it directly without creating a local directory first.
Parameters
| Name | Type | Description |
|---|
| dir_name | Optional[str] = None | Optional name for the remote directory. If not set, a generated name will be used. |
| hash | Optional[str] = None | Optional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks. |
Returns
| Type | Description |
|---|
[Dir](dir.md?sid=flyte_io__dir_dir)[T] | A new Dir instance with a generated remote path. |
from_existing_remote()
@classmethod
def from_existing_remote(
remote_path: str,
dir_cache_key: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]
Create a Dir reference from an existing remote directory. Use this when you want to reference a directory that already exists in remote storage without uploading it.
Parameters
| Name | Type | Description |
|---|
| remote_path | str | The remote path to the existing directory. |
| dir_cache_key | Optional[str] = None | Optional hash value to use for cache key computation. If not specified, the cache key will be computed based on the directory's attributes. |
Returns
| Type | Description |
|---|
[Dir](dir.md?sid=flyte_io__dir_dir)[T] | A new Dir instance pointing to the existing remote directory. |
exists()
@classmethod
def exists() - > bool
Asynchronously check if the directory exists.
Returns
| Type | Description |
|---|
bool | True if the directory exists, False otherwise. |
exists_sync()
@classmethod
def exists_sync() - > bool
Synchronously check if the directory exists. Use this in non-async tasks or when you need synchronous directory existence checking.
Returns
| Type | Description |
|---|
bool | True if the directory exists, False otherwise. |
get_file()
@classmethod
def get_file(
file_name: str
) - > Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Asynchronously get a specific file from the directory by name. Use this when you know the name of a specific file in the directory you want to access.
Parameters
| Name | Type | Description |
|---|
| file_name | str | The name of the file to get. |
Returns
| Type | Description |
|---|
Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]] | A File instance if the file exists, None otherwise. |
get_file_sync()
@classmethod
def get_file_sync(
file_name: str
) - > Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]
Synchronously get a specific file from the directory by name. Use this in non-async tasks when you know the name of a specific file in the directory you want to access.
Parameters
| Name | Type | Description |
|---|
| file_name | str | The name of the file to get. |
Returns
| Type | Description |
|---|
Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]] | A File instance if the file exists, None otherwise. |