Skip to main content

File Types: Handling Files with FlyteFile

File Types: Handling Files with FlyteFile

The FlyteFile type provides a robust and abstract mechanism for handling files within Flyte workflows. Its primary purpose is to simplify data passing and persistence for file-based artifacts, abstracting away the complexities of underlying storage systems (e.g., S3, GCS, local filesystem). This ensures data lineage, type safety, and seamless execution across different environments.

Core Capabilities

FlyteFile offers several core capabilities that streamline file management in data-intensive workflows:

  • Automatic Data Handling: FlyteFile automatically manages the lifecycle of files. When a FlyteFile is passed as an input to a task, Flyte downloads the file from its remote storage location to the task's local execution environment. Conversely, when a task produces a FlyteFile as an output, Flyte uploads the file from the task's local environment to a designated remote storage path.
  • Type Safety and Serialization: By using FlyteFile in type hints, developers ensure that file inputs and outputs are correctly handled and validated. This integrates seamlessly with Flyte's type system, providing strong guarantees about data consistency.
  • Local Access and Remote Storage Abstraction: Within a task, FlyteFile instances behave like local file paths, allowing standard file operations (read, write, copy). The underlying remote storage details are completely abstracted, enabling tasks to run identically whether files are stored locally, on S3, GCS, or other supported backends.
  • Directory Handling with FlyteDirectory: For scenarios involving multiple files or structured data, the FlyteDirectory type extends FlyteFile's capabilities to manage entire directories. This is particularly useful for datasets, models, or complex outputs that consist of several related files.

Defining File Inputs and Outputs

To leverage FlyteFile and FlyteDirectory, define them as type hints for task inputs and outputs.

Task Inputs

When a task expects a file, declare the input parameter with the FlyteFile type. Flyte automatically stages the remote file to a local path accessible within the task.

from flytekit import task, workflow
from flytekit.types.file import FlyteFile
from typing import Annotated

@task
def process_data_file(input_file: FlyteFile) -> str:
"""
Reads a file and returns its content.
"""
with open(input_file, "r") as f:
content = f.read()
return f"Processed content: {content[:50]}..."

@workflow
def file_processing_workflow(data_source: Annotated[FlyteFile, "txt"]) -> str:
return process_data_file(input_file=data_source)

In this example, data_source is a FlyteFile that will be downloaded to the process_data_file task's local environment before execution. The Annotated type hint can optionally specify the expected file extension, which can aid in validation or content type inference.

Task Outputs

To produce a file as an output, declare the return type of the task as FlyteFile. Within the task, write the output to a temporary local path, and Flyte automatically uploads it to remote storage.

import os
import shutil
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
from typing import Annotated

@task
def generate_report(input_text: str) -> Annotated[FlyteFile, "txt"]:
"""
Generates a report file from input text.
"""
output_path = "report.txt" # Flyte will manage this local path
with open(output_path, "w") as f:
f.write(f"Report generated from: {input_text}\n")
f.write("This is a sample report content.")
return FlyteFile(output_path) # Wrap the local path in FlyteFile

@workflow
def report_workflow(text_input: str) -> Annotated[FlyteFile, "txt"]:
return generate_report(input_text=text_input)

When generate_report completes, the report.txt file is uploaded to the workflow's output location.

Handling Directories

For outputs that are collections of files, use FlyteDirectory.

import os
import shutil
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from typing import Annotated

@task
def create_dataset_directory(prefix: str) -> Annotated[FlyteDirectory, "my_dataset"]:
"""
Creates a directory with multiple files.
"""
output_dir = "my_dataset_output"
os.makedirs(output_dir, exist_ok=True)

with open(os.path.join(output_dir, "file1.txt"), "w") as f:
f.write(f"{prefix} - Data for file 1.")
with open(os.path.join(output_dir, "file2.csv"), "w") as f:
f.write(f"{prefix},value1,value2\n{prefix},value3,value4")

return FlyteDirectory(output_dir)

@task
def process_dataset_directory(dataset_dir: FlyteDirectory) -> str:
"""
Reads files from a directory and returns a summary.
"""
summary = []
for root, _, files in os.walk(dataset_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
with open(file_path, "r") as f:
summary.append(f"Read {file_name}: {f.readline().strip()}")
return "\n".join(summary)

@workflow
def dataset_workflow(input_prefix: str) -> str:
dataset = create_dataset_directory(prefix=input_prefix)
return process_dataset_directory(dataset_dir=dataset)

The create_dataset_directory task creates a local directory my_dataset_output and populates it. Returning FlyteDirectory(output_dir) instructs Flyte to upload the entire directory. The process_dataset_directory task then receives this directory, which is downloaded locally, allowing standard filesystem operations like os.walk.

Common Use Cases

  • Passing Intermediate Data: FlyteFile is ideal for passing large datasets, model checkpoints, or processed results between sequential tasks in a workflow without explicitly managing storage paths.
  • Integrating with External Tools: When a task needs to interact with external command-line tools or libraries that operate on local files, FlyteFile provides the necessary local file path abstraction.
  • Data Versioning and Lineage: Because FlyteFile outputs are stored in a versioned manner by Flyte, they contribute to the overall data lineage, making it easy to track which workflow run produced which specific file artifact.
  • Machine Learning Pipelines: Storing and retrieving trained models, evaluation metrics, or preprocessed features as FlyteFile or FlyteDirectory objects.

Best Practices and Considerations

  • File Size and Performance: While FlyteFile handles files of various sizes, be mindful of extremely large files (e.g., terabytes). Transferring such files between remote storage and task execution environments can introduce significant latency. For very large datasets, consider using distributed file systems or specialized data processing frameworks that integrate directly with remote storage.
  • Temporary Files: When creating files within a task that are not intended as outputs, use Python's tempfile module to ensure they are cleaned up automatically. Only wrap files intended for persistence and output in FlyteFile.
  • Directory Structure: For FlyteDirectory outputs, maintain a consistent and logical internal directory structure. This makes it easier for downstream tasks to locate and process specific files within the directory.
  • File Extension Annotations: Using Annotated[FlyteFile, "extension"] provides useful metadata and can help Flyte's UI or other tools infer the content type, improving user experience and potential optimizations.
  • Security: Ensure that sensitive data stored in FlyteFile objects adheres to appropriate access controls and encryption policies of the underlying remote storage. Flyte itself does not add additional encryption layers beyond what the configured storage backend provides.