Workflows: Orchestrating Tasks
Workflows: Orchestrating Tasks
Workflows provide a powerful mechanism for orchestrating sequences of tasks, enabling the creation of complex, robust, and reproducible computational pipelines. They define the execution order, manage data dependencies between individual operations, and handle various execution scenarios, including failures. By composing smaller, independent tasks into a larger workflow, developers can build scalable and maintainable applications.
Core Concepts and Capabilities
Workflows are built upon a foundational structure that manages inputs, outputs, and the relationships between tasks. The system offers two primary methods for defining workflows: declarative (function-based) and imperative (programmatic).
Workflow Definition
Declarative Workflows
The most common way to define a workflow is declaratively using a Python function decorated with @workflow. This approach leverages Python's native syntax to express the flow of execution, making workflows intuitive to write and read. The PythonFunctionWorkflow class underpins this method, transforming a decorated Python function into an executable workflow graph.
Tasks are invoked within the workflow function, and their outputs can be passed as inputs to subsequent tasks, forming a directed acyclic graph (DAG).
from flytekit import task, workflow
@task
def greet(name: str) -> str:
return f"Hello, {name}!"
@task
def personalize_message(greeting: str, suffix: str) -> str:
return f"{greeting} {suffix}"
@workflow
def my_greeting_workflow(user_name: str, message_suffix: str) -> str:
# Task outputs are automatically passed as inputs
greeting_message = greet(name=user_name)
final_message = personalize_message(greeting=greeting_message, suffix=message_suffix)
return final_message
Imperative Workflows
For scenarios requiring dynamic workflow construction or programmatic generation, imperative workflows offer fine-grained control. The ImperativeWorkflow class allows developers to build workflows step-by-step by explicitly adding inputs, tasks, and defining output bindings. This method is particularly useful when the workflow structure is not known at design time or needs to be assembled based on external logic.
from flytekit import task, Workflow
@task
def t1(a: str) -> str:
return a + " world"
@task
def t2():
print("side effect")
# Create the workflow with a unique name
my_imperative_workflow = Workflow(name="my_imperative_workflow")
# Add top-level inputs to the workflow
input_promise = my_imperative_workflow.add_workflow_input("in1", str)
# Add tasks and connect their inputs/outputs
node1 = my_imperative_workflow.add_entity(t1, a=input_promise)
my_imperative_workflow.add_entity(t2) # t2 has no inputs, runs independently
# Define workflow outputs
my_imperative_workflow.add_workflow_output("from_t1", node1.outputs["o0"])
Task Orchestration and Data Flow
Workflows orchestrate tasks by managing the flow of data between them. When a task is invoked within a workflow, it returns a Promise object, not the actual computed value. This Promise represents the future output of the task and can be passed as an input to downstream tasks. The system automatically resolves these promises during execution, ensuring that tasks run only when their dependencies are met.
The WorkflowBase class manages these internal inputs, unbound inputs, nodes, and output bindings to construct the complete execution graph.
Workflow Inputs and Outputs
Workflows define a clear interface for inputs and outputs, similar to a function.
- Declarative Workflows: Inputs are defined as function parameters, and outputs are defined by the function's return type.
- Imperative Workflows: Inputs are added using
add_workflow_input, specifying a name and Python type. Outputs are defined usingadd_workflow_output, binding a workflow output name to aPromisefrom an internal node.
Failure Handling
Robust error handling is crucial for reliable pipelines. Workflows provide mechanisms to define how failures are managed:
-
Failure Policy: The
WorkflowFailurePolicyenum dictates the workflow's behavior upon a task failure:FAIL_IMMEDIATELY: The default behavior, causing the entire workflow execution to fail as soon as any component node fails.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE: Allows other independent, runnable nodes to complete before the workflow ultimately fails. This is useful for gathering more diagnostic information or allowing non-critical paths to finish. This policy is configured viaWorkflowMetadata.
-
On-Failure Handler: Workflows can specify an
on_failurehandler, which can be another task or an entire sub-workflow. This handler is invoked when a task within the main workflow fails, allowing for custom error recovery, logging, or notification logic. TheWorkflowBaseclass includes an_on_failureattribute and logic to execute this handler.
Workflow Metadata and Defaults
Workflows can be configured with metadata and default settings that influence their execution and the behavior of their constituent tasks.
- WorkflowMetadata: Contains workflow-specific settings, such as the
on_failurepolicy. - WorkflowMetadataDefaults: Defines default settings that are inherited by tasks within the workflow, such as
interruptible(whether tasks can be interrupted). This allows for consistent application of policies across all tasks in a workflow without needing to configure each task individually.
Compilation and Execution
Before execution, workflows undergo a compilation step (the compile method in WorkflowBase and PythonFunctionWorkflow). This process transforms the Python definition into a portable WorkflowTemplate (part of WorkflowSpec and WorkflowClosure), which is a language-agnostic representation of the workflow's graph, inputs, outputs, and dependencies. This compiled representation is then used for remote execution.
For local development and testing, workflows can be executed directly using the local_execute method. This allows developers to run and debug their workflows in a local Python environment without deploying them to a remote system.
Common Use Cases
Workflows are versatile and applicable across a wide range of domains:
- Data Pipelines (ETL): Orchestrating data extraction, transformation, and loading processes from various sources to data warehouses or lakes.
- Machine Learning Training and Deployment: Managing the entire ML lifecycle, from data preprocessing and feature engineering to model training, evaluation, and deployment.
- Complex Business Processes: Automating multi-step business logic, such as order fulfillment, financial reporting, or customer onboarding.
- Microservice Orchestration: Coordinating calls between multiple microservices, ensuring correct sequencing and data exchange.
- Scientific Simulations: Chaining together computational steps for complex scientific experiments and analyses.
Best Practices and Considerations
- Modularity and Reusability: Design tasks to be small, focused, and reusable. This promotes clarity, simplifies testing, and allows tasks to be shared across multiple workflows.
- Clear Input/Output Definitions: Explicitly define all workflow and task inputs and outputs with appropriate Python types. This improves readability, enables static analysis, and prevents runtime errors.
- Robust Error Handling: Leverage the
on_failurehandler andWorkflowFailurePolicyto define clear strategies for handling errors, ensuring workflows can recover gracefully or provide detailed diagnostics. - Local Testing: Utilize the
local_executecapabilities to thoroughly test workflows in a local environment before deploying them. This significantly speeds up development and debugging cycles. - Naming Conventions: Use descriptive and consistent naming for workflows, tasks, inputs, and outputs to enhance readability and maintainability.
- Documentation: Provide clear documentation for workflows, explaining their purpose, inputs, outputs, and any specific considerations. The
docsattribute inWorkflowBasecan be used for this.