File Types: Handling Files with FlyteFile
File Types: Handling Files with FlyteFile
The FlyteFile type provides a robust and abstract mechanism for handling files within Flyte workflows. Its primary purpose is to simplify data passing and persistence for file-based artifacts, abstracting away the complexities of underlying storage systems (e.g., S3, GCS, local filesystem). This ensures data lineage, type safety, and seamless execution across different environments.
Core Capabilities
FlyteFile offers several core capabilities that streamline file management in data-intensive workflows:
- Automatic Data Handling:
FlyteFileautomatically manages the lifecycle of files. When aFlyteFileis passed as an input to a task, Flyte downloads the file from its remote storage location to the task's local execution environment. Conversely, when a task produces aFlyteFileas an output, Flyte uploads the file from the task's local environment to a designated remote storage path. - Type Safety and Serialization: By using
FlyteFilein type hints, developers ensure that file inputs and outputs are correctly handled and validated. This integrates seamlessly with Flyte's type system, providing strong guarantees about data consistency. - Local Access and Remote Storage Abstraction: Within a task,
FlyteFileinstances behave like local file paths, allowing standard file operations (read, write, copy). The underlying remote storage details are completely abstracted, enabling tasks to run identically whether files are stored locally, on S3, GCS, or other supported backends. - Directory Handling with
FlyteDirectory: For scenarios involving multiple files or structured data, theFlyteDirectorytype extendsFlyteFile's capabilities to manage entire directories. This is particularly useful for datasets, models, or complex outputs that consist of several related files.
Defining File Inputs and Outputs
To leverage FlyteFile and FlyteDirectory, define them as type hints for task inputs and outputs.
Task Inputs
When a task expects a file, declare the input parameter with the FlyteFile type. Flyte automatically stages the remote file to a local path accessible within the task.
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
from typing import Annotated
@task
def process_data_file(input_file: FlyteFile) -> str:
"""
Reads a file and returns its content.
"""
with open(input_file, "r") as f:
content = f.read()
return f"Processed content: {content[:50]}..."
@workflow
def file_processing_workflow(data_source: Annotated[FlyteFile, "txt"]) -> str:
return process_data_file(input_file=data_source)
In this example, data_source is a FlyteFile that will be downloaded to the process_data_file task's local environment before execution. The Annotated type hint can optionally specify the expected file extension, which can aid in validation or content type inference.
Task Outputs
To produce a file as an output, declare the return type of the task as FlyteFile. Within the task, write the output to a temporary local path, and Flyte automatically uploads it to remote storage.
import os
import shutil
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
from typing import Annotated
@task
def generate_report(input_text: str) -> Annotated[FlyteFile, "txt"]:
"""
Generates a report file from input text.
"""
output_path = "report.txt" # Flyte will manage this local path
with open(output_path, "w") as f:
f.write(f"Report generated from: {input_text}\n")
f.write("This is a sample report content.")
return FlyteFile(output_path) # Wrap the local path in FlyteFile
@workflow
def report_workflow(text_input: str) -> Annotated[FlyteFile, "txt"]:
return generate_report(input_text=text_input)
When generate_report completes, the report.txt file is uploaded to the workflow's output location.
Handling Directories
For outputs that are collections of files, use FlyteDirectory.
import os
import shutil
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from typing import Annotated
@task
def create_dataset_directory(prefix: str) -> Annotated[FlyteDirectory, "my_dataset"]:
"""
Creates a directory with multiple files.
"""
output_dir = "my_dataset_output"
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "file1.txt"), "w") as f:
f.write(f"{prefix} - Data for file 1.")
with open(os.path.join(output_dir, "file2.csv"), "w") as f:
f.write(f"{prefix},value1,value2\n{prefix},value3,value4")
return FlyteDirectory(output_dir)
@task
def process_dataset_directory(dataset_dir: FlyteDirectory) -> str:
"""
Reads files from a directory and returns a summary.
"""
summary = []
for root, _, files in os.walk(dataset_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
with open(file_path, "r") as f:
summary.append(f"Read {file_name}: {f.readline().strip()}")
return "\n".join(summary)
@workflow
def dataset_workflow(input_prefix: str) -> str:
dataset = create_dataset_directory(prefix=input_prefix)
return process_dataset_directory(dataset_dir=dataset)
The create_dataset_directory task creates a local directory my_dataset_output and populates it. Returning FlyteDirectory(output_dir) instructs Flyte to upload the entire directory. The process_dataset_directory task then receives this directory, which is downloaded locally, allowing standard filesystem operations like os.walk.
Common Use Cases
- Passing Intermediate Data:
FlyteFileis ideal for passing large datasets, model checkpoints, or processed results between sequential tasks in a workflow without explicitly managing storage paths. - Integrating with External Tools: When a task needs to interact with external command-line tools or libraries that operate on local files,
FlyteFileprovides the necessary local file path abstraction. - Data Versioning and Lineage: Because
FlyteFileoutputs are stored in a versioned manner by Flyte, they contribute to the overall data lineage, making it easy to track which workflow run produced which specific file artifact. - Machine Learning Pipelines: Storing and retrieving trained models, evaluation metrics, or preprocessed features as
FlyteFileorFlyteDirectoryobjects.
Best Practices and Considerations
- File Size and Performance: While
FlyteFilehandles files of various sizes, be mindful of extremely large files (e.g., terabytes). Transferring such files between remote storage and task execution environments can introduce significant latency. For very large datasets, consider using distributed file systems or specialized data processing frameworks that integrate directly with remote storage. - Temporary Files: When creating files within a task that are not intended as outputs, use Python's
tempfilemodule to ensure they are cleaned up automatically. Only wrap files intended for persistence and output inFlyteFile. - Directory Structure: For
FlyteDirectoryoutputs, maintain a consistent and logical internal directory structure. This makes it easier for downstream tasks to locate and process specific files within the directory. - File Extension Annotations: Using
Annotated[FlyteFile, "extension"]provides useful metadata and can help Flyte's UI or other tools infer the content type, improving user experience and potential optimizations. - Security: Ensure that sensitive data stored in
FlyteFileobjects adheres to appropriate access controls and encryption policies of the underlying remote storage. Flyte itself does not add additional encryption layers beyond what the configured storage backend provides.