Skip to main content

Promises and Dynamic Control Flow

Promises and Dynamic Control Flow enable the construction of robust and adaptable workflows by abstracting the execution of tasks and managing their interdependencies. This system allows for the definition of complex logic, including conditional execution and dynamic data manipulation, that can operate seamlessly across both local development and remote, distributed environments.

The primary purpose of Promises and Dynamic Control Flow is to bridge the gap between Python's synchronous execution model and the asynchronous, distributed nature of workflow orchestration. It achieves this by representing the future results of tasks as Promise objects during workflow compilation, rather than immediately evaluating them. This abstraction facilitates:

  • Duality of Execution: A task function, when called within a workflow, can either execute locally and return concrete Python values or, during compilation, return Promise objects that represent the outputs of a Node in the workflow graph.
  • Node Configuration: It allows for the application of execution-time overrides (e.g., resource requests, retries, timeouts) directly on the result of a task call, even before the task has executed.
  • Conditional Logic: It provides a mechanism to define dynamic control flow based on the outcomes of upstream tasks, enabling branching and conditional execution paths within a workflow.

Core Features

The Promise Object

The Promise object is a central component, acting as a placeholder for a value that will be available after a task completes.

  • Future Value Representation: A Promise can be in one of two states:
    • Ready: It holds an actual evaluated value (_literals_models.Literal) when executed locally. The is_ready property indicates this state, and the val property provides access to the literal value.
    • Not Ready (Incomplete): It holds a reference (NodeOutput) to the output of an upstream Node in the workflow graph. The ref property provides access to this NodeOutput.
  • Local Evaluation: The eval() method allows for the retrieval of the concrete Python value from a Promise if it is is_ready and holds a primitive type. Attempting to eval() an incomplete Promise raises an error.
  • Node Overrides: The with_overrides() method enables the configuration of the underlying Node that produces the Promise. This allows developers to specify runtime parameters like requests, limits, timeout, retries, or cache settings for the task associated with the Promise.
    # Example: Applying overrides to a task's output
    output_promise = my_task().with_overrides(retries=3, timeout=datetime.timedelta(minutes=5))
  • Dependency Management: The __rshift__ operator (>>) establishes explicit execution order dependencies between tasks. If p1 and p2 are Promise objects (or VoidPromise objects) representing task outputs, p1 >> p2 ensures that the task producing p1 completes before the task producing p2 begins.
    # Example: Defining task dependencies
    result_a = task_a()
    result_b = task_b()
    result_a >> result_b # task_a runs before task_b
  • Attribute and Item Access: Promise objects support attribute access (.) and item access ([]) to extract specific fields from complex output types (e.g., dataclasses, typing.Dict). This creates a new Promise with an updated attr_path, allowing for dynamic data extraction from structured outputs.
    # Example: Accessing structured outputs
    @task
    def get_user_data() -> typing.Dict[str, str]:
    return {"name": "Alice", "email": "alice@example.com"}

    @workflow
    def my_workflow():
    user_data = get_user_data()
    user_name = user_data["name"] # Accessing a dictionary key
    # Or if get_user_data returned a dataclass with a 'name' field:
    # user_name = user_data.name
    process_name(name=user_name)

The VoidPromise Object

The VoidPromise object is a specialized Promise returned by tasks that do not produce any outputs (i.e., their declared return type is empty).

  • No-Output Tasks: It signifies that a task has no return value.
  • Restricted Operations: VoidPromise objects explicitly disallow most operations (e.g., comparisons, arithmetic, truth value testing) by raising AssertionError, preventing misuse of non-existent values.
  • Dependency and Overrides Support: Like Promise, VoidPromise supports __rshift__ for defining dependencies and with_overrides() for configuring the associated Node.
    # Example: Task with no output
    @task
    def log_message(message: str):
    print(message)

    @workflow
    def my_workflow():
    log_task = log_message(message="Starting workflow")
    # log_task is a VoidPromise
    # It can be used for dependencies or overrides, but not for its "value"
    log_task.with_overrides(retries=1)
    log_task >> another_task()

Comparison and Conjunction Expressions

These classes enable the construction of dynamic control flow logic based on the values of Promise objects.

  • ComparisonExpression: Represents a binary comparison (e.g., ==, !=, >, <, >=, <=) between a Promise and another Promise or a literal value.
    • Promise objects overload standard comparison operators (__eq__, __ne__, etc.) to return ComparisonExpression instances.
    • The is_, is_true, is_false, and is_none methods provide explicit ways to create comparison expressions for boolean and None checks.
    • The eval() method locally evaluates the comparison if both operands are ready.
    # Example: Creating a comparison expression
    @task
    def get_number() -> int: return 5

    @workflow
    def conditional_wf():
    num_promise = get_number()
    is_greater_than_zero = num_promise > 0 # This creates a ComparisonExpression
    # This expression can then be used in conditional statements
  • ConjunctionExpression: Represents logical AND (&) or OR (|) operations between ComparisonExpression objects or other ConjunctionExpression objects.
    • ComparisonExpression and ConjunctionExpression objects overload the bitwise & and | operators to form complex logical conditions.
    • The eval() method locally evaluates the conjunction.
    # Example: Creating a conjunction expression
    @task
    def get_bool_a() -> bool: return True
    @task
    def get_bool_b() -> bool: return False

    @workflow
    def complex_conditional_wf():
    bool_a = get_bool_a()
    bool_b = get_bool_b()
    condition = (bool_a.is_true()) & (bool_b.is_false()) # This creates a ConjunctionExpression
    # This expression can be used in conditional statements
  • ComparisonOps and ConjunctionOps: Enums that define the available comparison and logical operators, respectively.

Common Use Cases

  • Conditional Workflows: Define branching logic where different tasks or sub-workflows execute based on the output of an upstream task. This is achieved by constructing ComparisonExpression and ConjunctionExpression objects from Promise outputs and using them in if statements within the workflow definition.
    # Example: Conditional execution based on a Promise
    @task
    def check_condition() -> bool:
    # ... logic to determine condition ...
    return True

    @task
    def task_if_true():
    print("Condition was true")

    @task
    def task_if_false():
    print("Condition was false")

    @workflow
    def my_conditional_workflow():
    condition_result = check_condition()
    if condition_result.is_true():
    task_if_true()
    else:
    task_if_false()
  • Dynamic Data Flow: Extract specific pieces of data from complex task outputs (e.g., a field from a dataclass, a value from a dictionary) and pass them as inputs to subsequent tasks. This allows for flexible data transformations and routing within the workflow.
  • Workflow Chaining and Ordering: Explicitly define the execution order of tasks, ensuring that certain tasks complete before others begin, even if there are no direct data dependencies. This is crucial for managing side effects or resource contention.
    # Example: Explicit task ordering
    @task
    def setup_environment():
    print("Setting up environment")

    @task
    def run_main_process():
    print("Running main process")

    @workflow
    def ordered_workflow():
    setup_output = setup_environment()
    setup_output >> run_main_process() # Ensures setup completes before main process starts
  • Resource Management and Retries: Configure resource requests, limits, timeouts, and retry policies for individual tasks or specific outputs within a workflow using with_overrides(). This allows for fine-grained control over execution characteristics without modifying the task code itself.

Limitations and Considerations

  • Truth Value Testing: Direct Python truth value testing (if my_promise:) on Promise or ComparisonExpression objects is not supported and will raise a ValueError. Instead, use explicit comparison methods like my_promise.is_true() or combine expressions with bitwise & and | operators.
  • Iteration: Promise objects are not iterable. Attempting to iterate over a Promise (e.g., for item in my_promise:) will raise a ValueError.
  • eval() Restrictions: The eval() method on a Promise can only be invoked when the Promise is is_ready (i.e., holds a concrete value from local execution) and that value is a primitive type. It cannot evaluate complex types or incomplete promises.
  • Type Schema for Indexing: When accessing attributes or items on a Promise (e.g., my_promise.field or my_promise["key"]), the underlying type must be schematized (e.g., a typing.Dict or a @dataclass). Attempting to index into an unschematized STRUCT or GENERIC type will raise a ValueError.
  • VoidPromise Immutability: VoidPromise objects are designed to represent the absence of a return value. Most operations on them are disallowed to prevent logical errors, reinforcing that they carry no data.