Skip to main content

Execution & Dataflow

Execution & Dataflow

The Execution & Dataflow system provides a robust framework for defining, orchestrating, and executing complex computational graphs. It manages the dependencies between operations and the movement of data through a series of interconnected tasks, ensuring efficient and reliable processing.

Primary Purpose

The primary purpose of Execution & Dataflow is to enable developers to construct and manage sophisticated data processing pipelines and workflows. It abstracts away the complexities of task scheduling, dependency resolution, and data passing, allowing focus on the business logic of individual operations. This system is ideal for scenarios requiring deterministic execution, clear data lineage, and scalable processing of interdependent tasks.

Core Features

The system offers several core features designed to facilitate the creation and management of data-driven workflows:

  • Declarative Graph Definition: Define workflows as directed acyclic graphs (DAGs) where nodes represent operations and edges represent data dependencies. This allows for clear visualization and understanding of the entire process.
  • Automated Dependency Resolution: The system automatically determines the correct execution order of tasks based on their declared inputs and outputs, ensuring that all prerequisites are met before an operation begins.
  • Efficient Data Passing: Manages the transfer of data between connected tasks, optimizing for memory usage and performance. It supports various data types and ensures data integrity across task boundaries.
  • Execution Orchestration: Provides an engine to run the defined graphs, handling task lifecycle, resource allocation, and concurrent execution where applicable.
  • Error Handling and Resilience: Includes mechanisms for gracefully handling task failures, retries, and providing insights into execution status.
  • Extensibility: Allows developers to define custom tasks and integrate external systems or libraries seamlessly into the dataflow.

Defining Execution Graphs

Execution graphs are constructed from Operation nodes and DataChannel connections.

Operations

An Operation represents a single unit of work within the dataflow. Each operation declares its expected inputs and produced outputs.

To define a custom operation, inherit from the Operation base class and implement the execute method:

from execution_dataflow import Operation, InputPort, OutputPort

class DataLoadOperation(Operation):
def __init__(self, source_path: str):
super().__init__()
self.source_path = source_path
self.add_output_port("loaded_data", OutputPort(data_type=bytes))

def execute(self, context):
print(f"Loading data from {self.source_path}...")
# Simulate data loading
data = f"Content from {self.source_path}".encode('utf-8')
self.set_output("loaded_data", data)
print("Data loaded.")

class DataTransformOperation(Operation):
def __init__(self):
super().__init__()
self.add_input_port("raw_data", InputPort(data_type=bytes))
self.add_output_port("transformed_data", OutputPort(data_type=str))

def execute(self, context):
raw_data = self.get_input("raw_data")
print(f"Transforming data: {raw_data[:20]}...")
transformed_data = raw_data.decode('utf-8').upper()
self.set_output("transformed_data", transformed_data)
print("Data transformed.")

Data Channels

DataChannel instances represent the flow of data between the output port of one Operation and the input port of another. The system automatically manages the buffering and transfer of data through these channels.

Graph Construction

Graphs are built using a GraphDefinition object, which allows connecting operations via their input and output ports.

from execution_dataflow import GraphDefinition

# Instantiate operations
load_op = DataLoadOperation(source_path="s3://my-bucket/input.csv")
transform_op = DataTransformOperation()

# Define the graph
graph = GraphDefinition()
graph.add_operation(load_op)
graph.add_operation(transform_op)

# Connect operations: output of load_op to input of transform_op
graph.connect(
source_operation=load_op,
source_port_name="loaded_data",
target_operation=transform_op,
target_port_name="raw_data"
)

Executing Dataflows

The ExecutionEngine is responsible for running the defined GraphDefinition.

Execution Engine

The ExecutionEngine takes a GraphDefinition and an ExecutionContext to orchestrate the execution. It resolves dependencies, schedules tasks, and manages data flow.

from execution_dataflow import ExecutionEngine, ExecutionContext

# Assuming 'graph' is defined as above

# Create an execution context (can hold global parameters, logging, etc.)
context = ExecutionContext(run_id="workflow-123", log_level="INFO")

# Initialize and run the engine
engine = ExecutionEngine()
result = engine.run(graph, context)

if result.is_successful():
print("Dataflow executed successfully.")
# Access outputs of the final operation if needed
final_output = result.get_output(transform_op, "transformed_data")
print(f"Final transformed data: {final_output}")
else:
print(f"Dataflow failed: {result.get_error_message()}")

Execution Context

The ExecutionContext provides runtime information and resources to operations. This can include configuration parameters, logging facilities, and shared state. Operations can access this context via their execute method.

Error Handling and Resilience

The ExecutionEngine captures exceptions raised by Operation instances. The ExecutionResult object provides details on success or failure, including error messages and stack traces. The system supports configurable retry policies for transient failures at the operation level.

To configure retries for an operation:

from execution_dataflow import Operation, RetryPolicy

class ResilientOperation(Operation):
def __init__(self):
super().__init__(retry_policy=RetryPolicy(max_attempts=3, delay_seconds=5))
# ... port definitions ...

def execute(self, context):
# ... potentially failing logic ...
pass

Advanced Capabilities

Parallel and Asynchronous Execution

The ExecutionEngine can leverage multiple threads or processes to execute independent operations concurrently. When operations have no direct data dependencies, the engine automatically schedules them for parallel execution, significantly improving throughput for wide graphs. For I/O-bound tasks, operations can be designed to be asynchronous, allowing the engine to manage non-blocking calls efficiently.

State Management

While DataChannel instances handle transient data flow, the system also supports persistent state management for operations that need to maintain information across multiple runs or for checkpointing. This is typically achieved by integrating with external storage systems (e.g., databases, object storage) within the Operation's execute method, often facilitated by the ExecutionContext.

Monitoring and Observability

The ExecutionEngine emits events throughout the lifecycle of a graph and its operations (e.g., OperationStarted, OperationCompleted, OperationFailed). Developers can subscribe to these events to integrate with monitoring systems, log progress, or trigger alerts. The ExecutionContext often includes a logging interface that operations should use for consistent output.

Common Use Cases

  • ETL Pipelines: Extracting data from sources, transforming it, and loading it into data warehouses or analytical systems. Each step (extract, clean, transform, load) can be a distinct Operation.
  • Machine Learning Workflows: Orchestrating data preprocessing, model training, evaluation, and deployment steps. This ensures reproducibility and proper sequencing of ML tasks.
  • Complex Business Process Automation: Automating multi-step business processes where each step depends on the successful completion and output of previous steps.
  • Data Validation and Quality Checks: Building pipelines that validate data against various rules, flagging or correcting inconsistencies as data flows through the system.

Integration Patterns

The Execution & Dataflow system is designed for flexible integration:

  • External Data Sources/Sinks: Operations commonly interact with databases, message queues, file systems, or cloud storage services (e.g., S3, Azure Blob Storage) to fetch or store data. The ExecutionContext can provide credentials or client instances for these services.
  • Microservices Orchestration: An Operation can encapsulate a call to an external microservice, with its inputs forming the request payload and its outputs parsing the service's response.
  • Containerized Environments: The entire ExecutionEngine and its operations can run within Docker containers or Kubernetes pods, enabling scalable and isolated execution environments. Each operation can potentially be a separate container image for complex scenarios.

Limitations and Considerations

  • Cyclic Dependencies: The system strictly enforces directed acyclic graphs (DAGs). Cyclic dependencies between operations are not supported and will result in a graph validation error during GraphDefinition construction or ExecutionEngine.run() invocation.
  • Data Volume: While optimized for data passing, extremely large data transfers between in-memory operations might consume significant memory. For petabyte-scale data, consider operations that write intermediate results to persistent storage and pass only references (e.g., file paths, object IDs) through DataChannel instances.
  • Distributed Execution: The default ExecutionEngine runs on a single machine. For truly distributed execution across multiple nodes, custom ExecutionEngine implementations or integration with distributed task queues (e.g., Celery, Apache Airflow) would be necessary, where Operation instances become tasks submitted to the distributed system.

Best Practices

  • Granular Operations: Design operations to be small, focused, and single-responsibility. This improves reusability, testability, and makes graphs easier to understand and debug.
  • Idempotency: Strive to make operations idempotent where possible. This simplifies retry logic and recovery from failures.
  • Clear Port Definitions: Explicitly define input and output ports with clear names and expected data types. This acts as a contract between operations.
  • Leverage Execution Context: Use the ExecutionContext for configuration, logging, and shared resources rather than hardcoding values within operations. This makes workflows more flexible and easier to manage.
  • Error Handling within Operations: Implement robust error handling within the execute method of each Operation to catch expected issues and provide meaningful error messages.
  • Monitoring and Logging: Integrate with monitoring tools and ensure operations log relevant information to track progress and diagnose issues effectively.