Skip to main content

Dir

A generic directory class representing a directory with files of a specified format. Provides both async and sync interfaces for directory operations. All methods without _sync suffix are async.

Attributes

AttributeTypeDescription
pathstrThe path to the directory (can be local or remote).
nameOptional[str] = NoneOptional name for the directory (defaults to basename of path).
formatstr = ""The format of the files in the directory.
hashOptional[str] = NoneAn optional hash value used for cache key computation when this Dir is used as an input to discoverable tasks.

Constructor

Signature

def Dir(
path: str,
name: Optional[str] = None
) - > null

Parameters

NameTypeDescription
pathstrThe path to the directory (can be local or remote)
nameOptional[str] = NoneOptional name for the directory (defaults to basename of path)

Methods


pre_init()

@classmethod
def pre_init(
data: Any
)

Internal: Pydantic validator to set default name from path. Not intended for direct use.

Parameters

NameTypeDescription
dataAnyThe data dictionary being validated by Pydantic.

is_empty()

@classmethod
def is_empty() - > bool

True when this is a sentinel Dir produced by :class:EmptyDir/Dir.empty() — i.e. the task didn't actually produce a directory. Use this to branch on whether the upstream task emitted real data without dealing with Optional[Dir] (which the type engine cannot round-trip correctly through SerializableType).

Returns

TypeDescription
boolTrue if the directory is an empty sentinel, False otherwise.

empty()

@classmethod
def empty() - > [Dir](dir.md?sid=flyte_io__dir_dir)

Return a sentinel Dir representing 'no directory was produced'. Use as the return value when a task may or may not produce an output directory; the caller can check :attr:Dir.is_empty to detect the sentinel. Round-trips cleanly through Flyte serialization (unlike Optional[Dir]).

Returns

TypeDescription
[Dir](dir.md?sid=flyte_io__dir_dir)A sentinel Dir instance indicating that no directory was produced.

lazy_uploader()

@classmethod
def lazy_uploader() - > Callable[[], Coroutine[Any, Any, tuple[str | None, str]]]| None

Retrieves the callable responsible for asynchronously uploading the local directory to remote storage when in remote mode.

Returns

TypeDescription
`Callable[[], Coroutine[Any, Any, tuple[strNone, str]]]

lazy_uploader()

@classmethod
def lazy_uploader(
lazy_uploader: Callable[[], Coroutine[Any, Any, tuple[str | None, str]]]| None
)

Sets the callable responsible for asynchronously uploading the local directory to remote storage when in remote mode.

Parameters

NameTypeDescription
lazy_uploader`Callable[[], Coroutine[Any, Any, tuple[strNone, str]]]

schema_match()

@classmethod
def schema_match(
incoming: dict
)

Internal: Check if incoming schema matches Dir schema. Not intended for direct use.

Parameters

NameTypeDescription
incomingdictThe incoming schema dictionary to compare against the Dir schema.

walk()

@classmethod
def walk(
recursive: bool = True,
max_depth: Optional[int] = None
) - > AsyncIterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Asynchronously walk through the directory and yield File objects. Use this to iterate through all files in a directory. Each yielded File can be read directly without downloading.

Parameters

NameTypeDescription
recursivebool = TrueIf True, recursively walk subdirectories. If False, only list files in the top-level directory.
max_depthOptional[int] = NoneMaximum depth for recursive walking. If None, walk through all subdirectories.

Returns

TypeDescription
AsyncIterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]An asynchronous iterator that yields File objects for each file found in the directory.

walk_sync()

@classmethod
def walk_sync(
recursive: bool = True,
file_pattern: str = "*",
max_depth: Optional[int] = None
) - > Iterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Synchronously walk through the directory and yield File objects. Use this in non-async tasks to iterate through all files in a directory.

Parameters

NameTypeDescription
recursivebool = TrueIf True, recursively walk subdirectories. If False, only list files in the top-level directory.
file_patternstr = "*"Glob pattern to filter files (e.g., ".txt", ".csv"). Default is "*" (all files).
max_depthOptional[int] = NoneMaximum depth for recursive walking. If None, walk through all subdirectories.

Returns

TypeDescription
Iterator[[File](../file/file.md?sid=flyte_io__file_file)[T]]An iterator that yields File objects for each file found in the directory.

list_files()

@classmethod
def list_files() - > List[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Asynchronously get a list of all files in the directory (non-recursive). Use this when you need a list of all files in the top-level directory at once.

Returns

TypeDescription
List[[File](../file/file.md?sid=flyte_io__file_file)[T]]A list of File objects for files in the top-level directory.

list_files_sync()

@classmethod
def list_files_sync() - > List[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Synchronously get a list of all files in the directory (non-recursive). Use this in non-async tasks when you need a list of all files in the top-level directory at once.

Returns

TypeDescription
List[[File](../file/file.md?sid=flyte_io__file_file)[T]]A list of File objects for files in the top-level directory.

download()

@classmethod
def download(
local_path: Optional[Union[str, Path]] = None
) - > str

Asynchronously download the entire directory to a local path. Use this when you need to download all files in a directory to your local filesystem for processing.

Parameters

NameTypeDescription
local_pathOptional[Union[str, Path]] = NoneThe local path to download the directory to. If None, a temporary directory will be used and a path will be generated.

Returns

TypeDescription
strThe absolute path to the downloaded directory.

download_sync()

@classmethod
def download_sync(
local_path: Optional[Union[str, Path]] = None
) - > str

Synchronously download the entire directory to a local path. Use this in non-async tasks when you need to download all files in a directory to your local filesystem.

Parameters

NameTypeDescription
local_pathOptional[Union[str, Path]] = NoneThe local path to download the directory to. If None, a temporary directory will be used and a path will be generated.

Returns

TypeDescription
strThe absolute path to the downloaded directory.

from_local()

@classmethod
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str] = None,
dir_cache_key: Optional[str] = None,
batch_size: Optional[int] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]

Asynchronously create a new Dir by uploading a local directory to remote storage. Use this in async tasks when you have a local directory that needs to be uploaded to remote storage.

Parameters

NameTypeDescription
local_pathUnion[str, Path]Path to the local directory.
remote_destinationOptional[str] = NoneOptional remote path to store the directory. If None, a path will be automatically generated.
dir_cache_keyOptional[str] = NoneOptional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks. If not specified, the cache key will be based on directory attributes.
batch_sizeOptional[int] = NoneOptional concurrency limit for uploading files. If not specified, the default value is determined by the FLYTE_IO_BATCH_SIZE environment variable (default: 32).

Returns

TypeDescription
[Dir](dir.md?sid=flyte_io__dir_dir)[T]A new Dir instance pointing to the uploaded directory.

from_local_sync()

@classmethod
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str] = None,
dir_cache_key: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]

Synchronously create a new Dir by uploading a local directory to remote storage. Use this in non-async tasks when you have a local directory that needs to be uploaded to remote storage.

Parameters

NameTypeDescription
local_pathUnion[str, Path]Path to the local directory.
remote_destinationOptional[str] = NoneOptional remote path to store the directory. If None, a path will be automatically generated.
dir_cache_keyOptional[str] = NoneOptional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks. If not specified, the cache key will be based on directory attributes.

Returns

TypeDescription
[Dir](dir.md?sid=flyte_io__dir_dir)[T]A new Dir instance pointing to the uploaded directory.

new_remote()

@classmethod
def new_remote(
dir_name: Optional[str] = None,
hash: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]

Create a new Dir reference for a remote directory that will be written to. Use this when you want to create a new directory and write files into it directly without creating a local directory first.

Parameters

NameTypeDescription
dir_nameOptional[str] = NoneOptional name for the remote directory. If not set, a generated name will be used.
hashOptional[str] = NoneOptional precomputed hash value to use for cache key computation when this Dir is used as an input to discoverable tasks.

Returns

TypeDescription
[Dir](dir.md?sid=flyte_io__dir_dir)[T]A new Dir instance with a generated remote path.

from_existing_remote()

@classmethod
def from_existing_remote(
remote_path: str,
dir_cache_key: Optional[str] = None
) - > [Dir](dir.md?sid=flyte_io__dir_dir)[T]

Create a Dir reference from an existing remote directory. Use this when you want to reference a directory that already exists in remote storage without uploading it.

Parameters

NameTypeDescription
remote_pathstrThe remote path to the existing directory.
dir_cache_keyOptional[str] = NoneOptional hash value to use for cache key computation. If not specified, the cache key will be computed based on the directory's attributes.

Returns

TypeDescription
[Dir](dir.md?sid=flyte_io__dir_dir)[T]A new Dir instance pointing to the existing remote directory.

exists()

@classmethod
def exists() - > bool

Asynchronously check if the directory exists.

Returns

TypeDescription
boolTrue if the directory exists, False otherwise.

exists_sync()

@classmethod
def exists_sync() - > bool

Synchronously check if the directory exists. Use this in non-async tasks or when you need synchronous directory existence checking.

Returns

TypeDescription
boolTrue if the directory exists, False otherwise.

get_file()

@classmethod
def get_file(
file_name: str
) - > Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Asynchronously get a specific file from the directory by name. Use this when you know the name of a specific file in the directory you want to access.

Parameters

NameTypeDescription
file_namestrThe name of the file to get.

Returns

TypeDescription
Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]A File instance if the file exists, None otherwise.

get_file_sync()

@classmethod
def get_file_sync(
file_name: str
) - > Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]

Synchronously get a specific file from the directory by name. Use this in non-async tasks when you know the name of a specific file in the directory you want to access.

Parameters

NameTypeDescription
file_namestrThe name of the file to get.

Returns

TypeDescription
Optional[[File](../file/file.md?sid=flyte_io__file_file)[T]]A File instance if the file exists, None otherwise.