Skip to main content

Nodes: Connecting Tasks in a Workflow

Nodes represent the fundamental building blocks for defining and connecting tasks within a workflow. Each node encapsulates a specific executable entity, such as a task, a sub-workflow, or a launch plan, and defines its properties, dependencies, and how data flows into and out of it.

The primary purpose of nodes is to construct a directed acyclic graph (DAG) that orchestrates the execution order and data dependencies between various operations in a workflow.

Defining Nodes

A node is instantiated with a unique identifier, metadata, input bindings, and a reference to the underlying executable entity it represents.

The Node object holds:

  • id: A unique, DNS-compliant string identifier for the node within the workflow.
  • flyte_entity: The actual executable component, which can be a task, a workflow, or a launch plan. The run_entity property provides access to the underlying Python function for tasks, handling special cases like map tasks.
  • metadata: Contains execution-related configurations such as timeouts, retry strategies, and caching settings.
  • bindings: Specifies how inputs from upstream nodes or the workflow itself are mapped to the inputs of the flyte_entity.
  • upstream_nodes: A list of Node objects that must complete successfully before this node can begin execution.

Connecting Nodes: Defining Dependencies

Nodes establish dependencies to control the execution order of tasks. A node can be explicitly declared as running before another node, forming a sequential dependency.

Use the runs_before method or the >> (right shift) operator to define dependencies:

# Assuming task_1 and task_2 are defined tasks
node_1 = create_node(task_1) # create_node is a conceptual factory function
node_2 = create_node(task_2)

# Explicitly declare node_1 runs before node_2
node_1.runs_before(node_2)

# Alternatively, use the right shift operator for conciseness
node_1 >> node_2

Both node_1.runs_before(node_2) and node_1 >> node_2 achieve the same result: node_2 will not start until node_1 has completed. The upstream_nodes property of node_2 will include node_1.

Passing Data Between Nodes: Node Outputs

Workflows often require the output of one task to serve as the input for another. NodeOutput objects facilitate this data flow.

A NodeOutput represents a specific output variable from a Node. When a node completes, its outputs become available and can be referenced by downstream nodes.

# Assuming task_producer returns a dictionary with key 'result'
# and task_consumer takes 'input_data' as an argument
node_producer = create_node(task_producer)
node_consumer = create_node(task_consumer, input_data=node_producer.outputs['result'])

The outputs property of a Node provides access to its declared output variables. Each item accessed from outputs is a NodeOutput object.

For complex outputs (e.g., dictionaries or objects), use the with_attr method on a NodeOutput to access nested attributes:

# If node_producer.outputs['complex_data'] is a dictionary
# and you need to pass 'complex_data.nested_key' to node_consumer
node_consumer = create_node(task_consumer, data_part=node_producer.outputs['complex_data'].with_attr('nested_key'))

The node_id property of a NodeOutput refers to the ID of the node that produced it, ensuring correct serialization even when node IDs are overridden.

Customizing Node Behavior: Overrides

Nodes provide extensive customization capabilities through the with_overrides method. This allows fine-tuning execution parameters for individual nodes without altering the underlying task definition.

The with_overrides method returns the modified Node object, enabling chaining of overrides.

node = create_node(my_task).with_overrides(
node_name="custom-task-name",
timeout=300, # 5 minutes
retries=3,
requests=Resources(cpu="1", mem="2Gi"),
limits=Resources(cpu="2", mem="4Gi"),
cache=True,
cache_version="v1.0",
container_image="my-custom-image:latest",
# ... other overrides
)

Key parameters for with_overrides include:

  • node_name: Overrides the unique identifier of the node. This name must be DNS-compliant.
  • aliases: Provides alternative names for the node's outputs, useful for clarity in complex workflows.
  • resources: Specifies compute resource requirements (CPU, memory, GPU) for the node's execution. This can be set using requests (guaranteed resources) and limits (maximum allowed resources). If resources is used, requests and limits cannot be set simultaneously.
  • timeout: Sets the maximum duration for the node's execution. Can be an integer (seconds) or a datetime.timedelta object.
  • retries: Configures the number of times the node should be retried in case of failure.
  • interruptible: Determines if the node's execution can be interrupted (e.g., for pre-emptible instances).
  • cache: Enables or disables caching for the node's outputs. When cache is True, it's recommended to provide a Cache object with a version and serialize flag for explicit control.
  • container_image: Specifies a custom Docker image to use for executing this specific node, overriding the default image defined for the task.
  • accelerator: Attaches specific hardware accelerators (e.g., GPUs) to the node.
  • shared_memory: Configures shared memory settings for the node's container.
  • pod_template: Provides a Kubernetes PodTemplate to customize the underlying Pod configuration for the node.
  • task_config: Allows overriding specific configuration parameters of the underlying task. This is an advanced feature and requires the new configuration to be of the same type as the original task configuration.

When overriding parameters, ensure that the values provided are concrete (e.g., int, str, Resources object) and not promises (outputs from other nodes), as overrides are applied during workflow definition.

Common Use Cases

  • Sequential Execution: Define a series of tasks that must run one after another, with data flowing between them.
    # task_a >> task_b >> task_c
  • Parallel Branches: Create independent execution paths that run concurrently.
    # task_a >> [task_b, task_c] >> task_d
    # task_b and task_c run in parallel after task_a, and task_d runs after both complete.
  • Resource Optimization: Allocate more CPU or memory to computationally intensive tasks using with_overrides to prevent bottlenecks.
  • Fault Tolerance: Implement retry strategies for tasks that might experience transient failures.
  • Caching Intermediate Results: Speed up development and re-runs by caching outputs of idempotent tasks.
  • A/B Testing or Experimentation: Easily swap out different task implementations or configurations by overriding the container_image or task_config for specific nodes.