Promises and Dynamic Control Flow
Promises and Dynamic Control Flow enable the construction of robust and adaptable workflows by abstracting the execution of tasks and managing their interdependencies. This system allows for the definition of complex logic, including conditional execution and dynamic data manipulation, that can operate seamlessly across both local development and remote, distributed environments.
The primary purpose of Promises and Dynamic Control Flow is to bridge the gap between Python's synchronous execution model and the asynchronous, distributed nature of workflow orchestration. It achieves this by representing the future results of tasks as Promise objects during workflow compilation, rather than immediately evaluating them. This abstraction facilitates:
- Duality of Execution: A task function, when called within a workflow, can either execute locally and return concrete Python values or, during compilation, return
Promiseobjects that represent the outputs of aNodein the workflow graph. - Node Configuration: It allows for the application of execution-time overrides (e.g., resource requests, retries, timeouts) directly on the result of a task call, even before the task has executed.
- Conditional Logic: It provides a mechanism to define dynamic control flow based on the outcomes of upstream tasks, enabling branching and conditional execution paths within a workflow.
Core Features
The Promise Object
The Promise object is a central component, acting as a placeholder for a value that will be available after a task completes.
- Future Value Representation: A
Promisecan be in one of two states:- Ready: It holds an actual evaluated value (
_literals_models.Literal) when executed locally. Theis_readyproperty indicates this state, and thevalproperty provides access to the literal value. - Not Ready (Incomplete): It holds a reference (
NodeOutput) to the output of an upstreamNodein the workflow graph. Therefproperty provides access to thisNodeOutput.
- Ready: It holds an actual evaluated value (
- Local Evaluation: The
eval()method allows for the retrieval of the concrete Python value from aPromiseif it isis_readyand holds a primitive type. Attempting toeval()an incompletePromiseraises an error. - Node Overrides: The
with_overrides()method enables the configuration of the underlyingNodethat produces thePromise. This allows developers to specify runtime parameters likerequests,limits,timeout,retries, orcachesettings for the task associated with thePromise.# Example: Applying overrides to a task's output
output_promise = my_task().with_overrides(retries=3, timeout=datetime.timedelta(minutes=5)) - Dependency Management: The
__rshift__operator (>>) establishes explicit execution order dependencies between tasks. Ifp1andp2arePromiseobjects (orVoidPromiseobjects) representing task outputs,p1 >> p2ensures that the task producingp1completes before the task producingp2begins.# Example: Defining task dependencies
result_a = task_a()
result_b = task_b()
result_a >> result_b # task_a runs before task_b - Attribute and Item Access:
Promiseobjects support attribute access (.) and item access ([]) to extract specific fields from complex output types (e.g.,dataclasses,typing.Dict). This creates a newPromisewith an updatedattr_path, allowing for dynamic data extraction from structured outputs.# Example: Accessing structured outputs
@task
def get_user_data() -> typing.Dict[str, str]:
return {"name": "Alice", "email": "alice@example.com"}
@workflow
def my_workflow():
user_data = get_user_data()
user_name = user_data["name"] # Accessing a dictionary key
# Or if get_user_data returned a dataclass with a 'name' field:
# user_name = user_data.name
process_name(name=user_name)
The VoidPromise Object
The VoidPromise object is a specialized Promise returned by tasks that do not produce any outputs (i.e., their declared return type is empty).
- No-Output Tasks: It signifies that a task has no return value.
- Restricted Operations:
VoidPromiseobjects explicitly disallow most operations (e.g., comparisons, arithmetic, truth value testing) by raisingAssertionError, preventing misuse of non-existent values. - Dependency and Overrides Support: Like
Promise,VoidPromisesupports__rshift__for defining dependencies andwith_overrides()for configuring the associatedNode.# Example: Task with no output
@task
def log_message(message: str):
print(message)
@workflow
def my_workflow():
log_task = log_message(message="Starting workflow")
# log_task is a VoidPromise
# It can be used for dependencies or overrides, but not for its "value"
log_task.with_overrides(retries=1)
log_task >> another_task()
Comparison and Conjunction Expressions
These classes enable the construction of dynamic control flow logic based on the values of Promise objects.
- ComparisonExpression: Represents a binary comparison (e.g.,
==,!=,>,<,>=,<=) between aPromiseand anotherPromiseor a literal value.Promiseobjects overload standard comparison operators (__eq__,__ne__, etc.) to returnComparisonExpressioninstances.- The
is_,is_true,is_false, andis_nonemethods provide explicit ways to create comparison expressions for boolean and None checks. - The
eval()method locally evaluates the comparison if both operands are ready.
# Example: Creating a comparison expression
@task
def get_number() -> int: return 5
@workflow
def conditional_wf():
num_promise = get_number()
is_greater_than_zero = num_promise > 0 # This creates a ComparisonExpression
# This expression can then be used in conditional statements - ConjunctionExpression: Represents logical
AND(&) orOR(|) operations betweenComparisonExpressionobjects or otherConjunctionExpressionobjects.ComparisonExpressionandConjunctionExpressionobjects overload the bitwise&and|operators to form complex logical conditions.- The
eval()method locally evaluates the conjunction.
# Example: Creating a conjunction expression
@task
def get_bool_a() -> bool: return True
@task
def get_bool_b() -> bool: return False
@workflow
def complex_conditional_wf():
bool_a = get_bool_a()
bool_b = get_bool_b()
condition = (bool_a.is_true()) & (bool_b.is_false()) # This creates a ConjunctionExpression
# This expression can be used in conditional statements - ComparisonOps and ConjunctionOps: Enums that define the available comparison and logical operators, respectively.
Common Use Cases
- Conditional Workflows: Define branching logic where different tasks or sub-workflows execute based on the output of an upstream task. This is achieved by constructing
ComparisonExpressionandConjunctionExpressionobjects fromPromiseoutputs and using them inifstatements within the workflow definition.# Example: Conditional execution based on a Promise
@task
def check_condition() -> bool:
# ... logic to determine condition ...
return True
@task
def task_if_true():
print("Condition was true")
@task
def task_if_false():
print("Condition was false")
@workflow
def my_conditional_workflow():
condition_result = check_condition()
if condition_result.is_true():
task_if_true()
else:
task_if_false() - Dynamic Data Flow: Extract specific pieces of data from complex task outputs (e.g., a field from a dataclass, a value from a dictionary) and pass them as inputs to subsequent tasks. This allows for flexible data transformations and routing within the workflow.
- Workflow Chaining and Ordering: Explicitly define the execution order of tasks, ensuring that certain tasks complete before others begin, even if there are no direct data dependencies. This is crucial for managing side effects or resource contention.
# Example: Explicit task ordering
@task
def setup_environment():
print("Setting up environment")
@task
def run_main_process():
print("Running main process")
@workflow
def ordered_workflow():
setup_output = setup_environment()
setup_output >> run_main_process() # Ensures setup completes before main process starts - Resource Management and Retries: Configure resource requests, limits, timeouts, and retry policies for individual tasks or specific outputs within a workflow using
with_overrides(). This allows for fine-grained control over execution characteristics without modifying the task code itself.
Limitations and Considerations
- Truth Value Testing: Direct Python truth value testing (
if my_promise:) onPromiseorComparisonExpressionobjects is not supported and will raise aValueError. Instead, use explicit comparison methods likemy_promise.is_true()or combine expressions with bitwise&and|operators. - Iteration:
Promiseobjects are not iterable. Attempting to iterate over aPromise(e.g.,for item in my_promise:) will raise aValueError. eval()Restrictions: Theeval()method on aPromisecan only be invoked when thePromiseisis_ready(i.e., holds a concrete value from local execution) and that value is a primitive type. It cannot evaluate complex types or incomplete promises.- Type Schema for Indexing: When accessing attributes or items on a
Promise(e.g.,my_promise.fieldormy_promise["key"]), the underlying type must be schematized (e.g., atyping.Dictor a@dataclass). Attempting to index into an unschematizedSTRUCTorGENERICtype will raise aValueError. - VoidPromise Immutability:
VoidPromiseobjects are designed to represent the absence of a return value. Most operations on them are disallowed to prevent logical errors, reinforcing that they carry no data.