Data Handling and I/O Strategies
Data Handling and I/O Strategies provide robust mechanisms for managing data ingress and egress within task executions. These strategies enable fine-grained control over how inputs are downloaded into a container and how outputs are uploaded from it, optimizing for various data sizes, formats, and performance requirements.
DataLoadingConfig
The DataLoadingConfig component defines the fundamental configuration for data handling. It specifies the paths for input and output data, controls whether automatic data loading is enabled, and dictates the serialization format for structured data.
- Input and Output Paths: The
input_pathandoutput_pathparameters define the locations where data is expected to be downloaded from and uploaded to, respectively. These paths typically point to storage locations like S3, GCS, or other compatible object stores. - Enabled State: The
enabledflag determines if the configured data loading mechanisms are active. When set toFalse, automatic data handling is bypassed, and the task is responsible for managing its own data transfers. - Literal Map Formats:
DataLoadingConfigsupports various formats for serializing and deserializing literal maps, which often represent task parameters, metadata, or small structured datasets.LITERALMAP_FORMAT_PROTO: Protocol Buffer format (default).LITERALMAP_FORMAT_JSON: JSON format.LITERALMAP_FORMAT_YAML: YAML format. Choosing the appropriate format depends on interoperability requirements and the complexity of the data structure.
- I/O Strategy Integration:
DataLoadingConfigintegrates withIOStrategyto provide advanced control over data transfer modes, allowing for customized download and upload behaviors.
Example:
from flytekit.core.task import DataLoadingConfig
from flytekit.core.task import IOStrategy
# Configure basic data loading with JSON format
config = DataLoadingConfig(
input_path="s3://my-bucket/inputs/",
output_path="s3://my-bucket/outputs/",
format=DataLoadingConfig.LITERALMAP_FORMAT_JSON,
enabled=True
)
# Configure with a specific I/O strategy for streaming large data
io_strategy = IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_STREAM,
upload_mode=IOStrategy.UPLOAD_MODE_ON_EXIT
)
config_with_strategy = DataLoadingConfig(
input_path="s3://my-bucket/large-data-inputs/",
output_path="s3://my-bucket/large-data-outputs/",
io_strategy=io_strategy
)
IOStrategy
The IOStrategy component provides granular control over how data is transferred into and out of the execution environment. It defines distinct modes for both downloading input data and uploading output data. This component is only active when DataLoadingConfig is enabled.
- Download Modes: These modes dictate how input data is retrieved into the container before task execution.
DOWNLOAD_MODE_EAGER: Data is downloaded entirely before the task execution begins. This mode is suitable for smaller datasets where immediate and complete availability is required. It simplifies task logic as all data is present locally from the start.DOWNLOAD_MODE_STREAM: Data is streamed into the container, allowing for processing of very large datasets without loading the entire content into memory. This mode is ideal for tasks that can process data incrementally, such as reading line-by-line or chunk-by-chunk.DOWNLOAD_MODE_NO_DOWNLOAD: No automatic download occurs. The task is responsible for fetching its own input data, typically when using direct storage access libraries (e.g., boto3 for S3, google-cloud-storage for GCS) or database connectors. This mode is useful for highly customized data access patterns or when data is already accessible within the execution environment.
- Upload Modes: These modes determine when and how output data is persisted from the container after task execution.
UPLOAD_MODE_EAGER: Output data is uploaded as soon as it becomes available. This can be useful for real-time processing, when intermediate results need to be accessible quickly, or for debugging purposes. Be aware of potential performance overhead due to frequent uploads.UPLOAD_MODE_ON_EXIT: Output data is uploaded only after the task execution completes successfully. This is the default and most common mode, ensuring that only final, complete results are persisted. It minimizes I/O operations during task execution.UPLOAD_MODE_NO_UPLOAD: No automatic upload occurs. The task is responsible for persisting its own output data, similar toDOWNLOAD_MODE_NO_DOWNLOAD. This mode is used when outputs are handled by custom logic, such as writing directly to a database or an external service.
Example:
from flytekit.core.task import IOStrategy
# Default I/O strategy: eager download, upload on exit
default_strategy = IOStrategy()
# Strategy for large inputs, processing incrementally, and uploading results on exit
streaming_strategy = IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_STREAM,
upload_mode=IOStrategy.UPLOAD_MODE_ON_EXIT
)
# Strategy for tasks that handle their own I/O (e.g., direct S3 access)
manual_io_strategy = IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_NO_DOWNLOAD,
upload_mode=IOStrategy.UPLOAD_MODE_NO_UPLOAD
)
Common Use Cases and Best Practices
-
Small to Medium Datasets: For inputs that fit comfortably in memory, use
DOWNLOAD_MODE_EAGERwithUPLOAD_MODE_ON_EXIT. This provides simplicity and ensures all data is ready before processing begins.from flytekit.core.task import DataLoadingConfig, IOStrategy
config = DataLoadingConfig(
input_path="s3://my-bucket/small-data/",
output_path="s3://my-bucket/processed-small-data/",
io_strategy=IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_EAGER,
upload_mode=IOStrategy.UPLOAD_MODE_ON_EXIT
)
) -
Large Datasets Requiring Streaming: When dealing with multi-gigabyte or terabyte inputs, employ
DOWNLOAD_MODE_STREAM. This prevents out-of-memory errors and allows for efficient processing of data chunks. The task logic must be designed to consume data incrementally.from flytekit.core.task import DataLoadingConfig, IOStrategy
config = DataLoadingConfig(
input_path="s3://my-bucket/large-stream-data/",
output_path="s3://my-bucket/processed-stream-data/",
io_strategy=IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_STREAM,
upload_mode=IOStrategy.UPLOAD_MODE_ON_EXIT
)
)
# Within the task, data would be read using a streaming interface. -
Custom I/O Handling: For tasks that manage their own data transfer (e.g., interacting directly with a database, a specialized storage system, or a custom API), disable automatic I/O using
DOWNLOAD_MODE_NO_DOWNLOADandUPLOAD_MODE_NO_UPLOAD. This avoids redundant data transfers and allows for specialized logic within the task.from flytekit.core.task import DataLoadingConfig, IOStrategy
config = DataLoadingConfig(
input_path="s3://my-bucket/metadata/", # Path can still be useful for metadata
output_path="s3://my-bucket/results/",
io_strategy=IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_NO_DOWNLOAD,
upload_mode=IOStrategy.UPLOAD_MODE_NO_UPLOAD
)
)
# Within the task, use client libraries (e.g., pandas.read_sql, boto3)
# to handle data directly. -
Real-time or Intermediate Result Persistence: If partial results need to be available immediately or for debugging purposes, consider
UPLOAD_MODE_EAGER. Be mindful of the potential performance overhead of frequent uploads, especially for large or numerous outputs.from flytekit.core.task import DataLoadingConfig, IOStrategy
config = DataLoadingConfig(
input_path="s3://my-bucket/inputs/",
output_path="s3://my-bucket/intermediate-results/",
io_strategy=IOStrategy(
download_mode=IOStrategy.DOWNLOAD_MODE_EAGER,
upload_mode=IOStrategy.UPLOAD_MODE_EAGER
)
)
Considerations
- Performance:
DOWNLOAD_MODE_EAGERcan introduce significant latency for large inputs due to the full download occurring before task execution.DOWNLOAD_MODE_STREAMis generally more performant for large files but requires the task to implement streaming logic. - Resource Usage:
DOWNLOAD_MODE_EAGERconsumes memory proportional to the input data size, potentially leading to out-of-memory errors for very large datasets.DOWNLOAD_MODE_STREAMis more memory-efficient as it processes data in chunks. - Error Handling: When using
NO_DOWNLOADorNO_UPLOAD, the task assumes full responsibility for error handling during data transfer. This includes retries, connection management, and data integrity checks. - Compatibility: Ensure the chosen
LITERALMAP_FORMATforDataLoadingConfigis compatible with downstream consumers or upstream producers of structured data to avoid serialization/deserialization issues.