Execution & Dataflow
Execution & Dataflow
The Execution & Dataflow system provides a robust framework for defining, orchestrating, and executing complex computational graphs. It manages the dependencies between operations and the movement of data through a series of interconnected tasks, ensuring efficient and reliable processing.
Primary Purpose
The primary purpose of Execution & Dataflow is to enable developers to construct and manage sophisticated data processing pipelines and workflows. It abstracts away the complexities of task scheduling, dependency resolution, and data passing, allowing focus on the business logic of individual operations. This system is ideal for scenarios requiring deterministic execution, clear data lineage, and scalable processing of interdependent tasks.
Core Features
The system offers several core features designed to facilitate the creation and management of data-driven workflows:
- Declarative Graph Definition: Define workflows as directed acyclic graphs (DAGs) where nodes represent operations and edges represent data dependencies. This allows for clear visualization and understanding of the entire process.
- Automated Dependency Resolution: The system automatically determines the correct execution order of tasks based on their declared inputs and outputs, ensuring that all prerequisites are met before an operation begins.
- Efficient Data Passing: Manages the transfer of data between connected tasks, optimizing for memory usage and performance. It supports various data types and ensures data integrity across task boundaries.
- Execution Orchestration: Provides an engine to run the defined graphs, handling task lifecycle, resource allocation, and concurrent execution where applicable.
- Error Handling and Resilience: Includes mechanisms for gracefully handling task failures, retries, and providing insights into execution status.
- Extensibility: Allows developers to define custom tasks and integrate external systems or libraries seamlessly into the dataflow.
Defining Execution Graphs
Execution graphs are constructed from Operation nodes and DataChannel connections.
Operations
An Operation represents a single unit of work within the dataflow. Each operation declares its expected inputs and produced outputs.
To define a custom operation, inherit from the Operation base class and implement the execute method:
from execution_dataflow import Operation, InputPort, OutputPort
class DataLoadOperation(Operation):
def __init__(self, source_path: str):
super().__init__()
self.source_path = source_path
self.add_output_port("loaded_data", OutputPort(data_type=bytes))
def execute(self, context):
print(f"Loading data from {self.source_path}...")
# Simulate data loading
data = f"Content from {self.source_path}".encode('utf-8')
self.set_output("loaded_data", data)
print("Data loaded.")
class DataTransformOperation(Operation):
def __init__(self):
super().__init__()
self.add_input_port("raw_data", InputPort(data_type=bytes))
self.add_output_port("transformed_data", OutputPort(data_type=str))
def execute(self, context):
raw_data = self.get_input("raw_data")
print(f"Transforming data: {raw_data[:20]}...")
transformed_data = raw_data.decode('utf-8').upper()
self.set_output("transformed_data", transformed_data)
print("Data transformed.")
Data Channels
DataChannel instances represent the flow of data between the output port of one Operation and the input port of another. The system automatically manages the buffering and transfer of data through these channels.
Graph Construction
Graphs are built using a GraphDefinition object, which allows connecting operations via their input and output ports.
from execution_dataflow import GraphDefinition
# Instantiate operations
load_op = DataLoadOperation(source_path="s3://my-bucket/input.csv")
transform_op = DataTransformOperation()
# Define the graph
graph = GraphDefinition()
graph.add_operation(load_op)
graph.add_operation(transform_op)
# Connect operations: output of load_op to input of transform_op
graph.connect(
source_operation=load_op,
source_port_name="loaded_data",
target_operation=transform_op,
target_port_name="raw_data"
)
Executing Dataflows
The ExecutionEngine is responsible for running the defined GraphDefinition.
Execution Engine
The ExecutionEngine takes a GraphDefinition and an ExecutionContext to orchestrate the execution. It resolves dependencies, schedules tasks, and manages data flow.
from execution_dataflow import ExecutionEngine, ExecutionContext
# Assuming 'graph' is defined as above
# Create an execution context (can hold global parameters, logging, etc.)
context = ExecutionContext(run_id="workflow-123", log_level="INFO")
# Initialize and run the engine
engine = ExecutionEngine()
result = engine.run(graph, context)
if result.is_successful():
print("Dataflow executed successfully.")
# Access outputs of the final operation if needed
final_output = result.get_output(transform_op, "transformed_data")
print(f"Final transformed data: {final_output}")
else:
print(f"Dataflow failed: {result.get_error_message()}")
Execution Context
The ExecutionContext provides runtime information and resources to operations. This can include configuration parameters, logging facilities, and shared state. Operations can access this context via their execute method.
Error Handling and Resilience
The ExecutionEngine captures exceptions raised by Operation instances. The ExecutionResult object provides details on success or failure, including error messages and stack traces. The system supports configurable retry policies for transient failures at the operation level.
To configure retries for an operation:
from execution_dataflow import Operation, RetryPolicy
class ResilientOperation(Operation):
def __init__(self):
super().__init__(retry_policy=RetryPolicy(max_attempts=3, delay_seconds=5))
# ... port definitions ...
def execute(self, context):
# ... potentially failing logic ...
pass
Advanced Capabilities
Parallel and Asynchronous Execution
The ExecutionEngine can leverage multiple threads or processes to execute independent operations concurrently. When operations have no direct data dependencies, the engine automatically schedules them for parallel execution, significantly improving throughput for wide graphs. For I/O-bound tasks, operations can be designed to be asynchronous, allowing the engine to manage non-blocking calls efficiently.
State Management
While DataChannel instances handle transient data flow, the system also supports persistent state management for operations that need to maintain information across multiple runs or for checkpointing. This is typically achieved by integrating with external storage systems (e.g., databases, object storage) within the Operation's execute method, often facilitated by the ExecutionContext.
Monitoring and Observability
The ExecutionEngine emits events throughout the lifecycle of a graph and its operations (e.g., OperationStarted, OperationCompleted, OperationFailed). Developers can subscribe to these events to integrate with monitoring systems, log progress, or trigger alerts. The ExecutionContext often includes a logging interface that operations should use for consistent output.
Common Use Cases
- ETL Pipelines: Extracting data from sources, transforming it, and loading it into data warehouses or analytical systems. Each step (extract, clean, transform, load) can be a distinct
Operation. - Machine Learning Workflows: Orchestrating data preprocessing, model training, evaluation, and deployment steps. This ensures reproducibility and proper sequencing of ML tasks.
- Complex Business Process Automation: Automating multi-step business processes where each step depends on the successful completion and output of previous steps.
- Data Validation and Quality Checks: Building pipelines that validate data against various rules, flagging or correcting inconsistencies as data flows through the system.
Integration Patterns
The Execution & Dataflow system is designed for flexible integration:
- External Data Sources/Sinks: Operations commonly interact with databases, message queues, file systems, or cloud storage services (e.g., S3, Azure Blob Storage) to fetch or store data. The
ExecutionContextcan provide credentials or client instances for these services. - Microservices Orchestration: An
Operationcan encapsulate a call to an external microservice, with its inputs forming the request payload and its outputs parsing the service's response. - Containerized Environments: The entire
ExecutionEngineand its operations can run within Docker containers or Kubernetes pods, enabling scalable and isolated execution environments. Each operation can potentially be a separate container image for complex scenarios.
Limitations and Considerations
- Cyclic Dependencies: The system strictly enforces directed acyclic graphs (DAGs). Cyclic dependencies between operations are not supported and will result in a graph validation error during
GraphDefinitionconstruction orExecutionEngine.run()invocation. - Data Volume: While optimized for data passing, extremely large data transfers between in-memory operations might consume significant memory. For petabyte-scale data, consider operations that write intermediate results to persistent storage and pass only references (e.g., file paths, object IDs) through
DataChannelinstances. - Distributed Execution: The default
ExecutionEngineruns on a single machine. For truly distributed execution across multiple nodes, customExecutionEngineimplementations or integration with distributed task queues (e.g., Celery, Apache Airflow) would be necessary, whereOperationinstances become tasks submitted to the distributed system.
Best Practices
- Granular Operations: Design operations to be small, focused, and single-responsibility. This improves reusability, testability, and makes graphs easier to understand and debug.
- Idempotency: Strive to make operations idempotent where possible. This simplifies retry logic and recovery from failures.
- Clear Port Definitions: Explicitly define input and output ports with clear names and expected data types. This acts as a contract between operations.
- Leverage Execution Context: Use the
ExecutionContextfor configuration, logging, and shared resources rather than hardcoding values within operations. This makes workflows more flexible and easier to manage. - Error Handling within Operations: Implement robust error handling within the
executemethod of eachOperationto catch expected issues and provide meaningful error messages. - Monitoring and Logging: Integrate with monitoring tools and ensure operations log relevant information to track progress and diagnose issues effectively.