Skip to main content

Error Handling in Eager Workflows

Error Handling in Eager Workflows

Error handling in eager workflows provides a robust mechanism for managing exceptions that arise during the execution of tasks or subworkflows. This capability ensures that eager workflows can gracefully recover from failures, implement fallback logic, or propagate errors in a controlled manner.

Purpose and Core Features

The primary purpose of error handling in eager workflows is to standardize the way exceptions from individual workflow nodes (tasks or subworkflows) are caught and managed within the workflow's execution context. It allows developers to define explicit error recovery strategies directly within the workflow definition.

Core features include:

  • Standardized Exception Type: All exceptions originating from tasks or subworkflows within an eager workflow are encapsulated and presented as an EagerException. This provides a consistent interface for catching and handling diverse underlying errors.
  • Granular Control: Developers can use standard Python try...except blocks to intercept EagerException instances, enabling specific handling for different parts of an eager workflow.
  • Robustness: Facilitates the creation of more resilient workflows that can continue execution or perform alternative actions even when individual components fail.

The EagerException Class

The EagerException class is the central component for error handling in eager workflows. It inherits from Python's built-in Exception class and is raised by the eager workflow runtime when a task or subworkflow encounters an error. This means that any Exception (e.g., ValueError, TypeError, custom exceptions) raised by a task or subworkflow will be caught by the eager workflow runtime and re-raised as an EagerException when it propagates into the eager workflow's try...except block.

Consider the following example demonstrating its usage:

from flytekit import task
from flytekit.exceptions.eager import EagerException
from flytekit.core.task import eager

@task
def add_one(x: int) -> int:
"""
A task that adds one to x, but raises a ValueError if x is negative.
"""
if x < 0:
raise ValueError("x must be positive")
return x + 1

@task
def double(x: int) -> int:
"""
A simple task that doubles the input.
"""
return x * 2

@eager
async def eager_workflow(x: int) -> int:
"""
An eager workflow demonstrating error handling with EagerException.
"""
try:
# Attempt to execute add_one. If x is negative, add_one will raise ValueError.
# The eager workflow runtime catches this ValueError and re-raises it as an EagerException.
out = await add_one(x=x)
except EagerException:
# This block catches the EagerException that encapsulates the original ValueError.
# In this specific example, the caught EagerException is re-raised,
# effectively propagating the error upwards from the workflow.
print(f"Caught an EagerException when processing x={x}. Re-raising.")
raise
# If add_one succeeds, this part of the workflow executes.
return await double(x=out)

# Example usage:
# await eager_workflow(x=5) # Will succeed, output: 12
# await eager_workflow(x=-1) # Will raise EagerException

In this example, when eager_workflow is called with x=-1, the add_one task raises a ValueError. The eager workflow runtime intercepts this ValueError and transforms it into an EagerException. The try...except EagerException block within eager_workflow then catches this EagerException. The raise statement inside the except block re-raises the EagerException, allowing the caller of eager_workflow to handle it.

Common Use Cases and Best Practices

Error handling with EagerException enables several practical scenarios:

  • Implementing Fallback Logic: Instead of re-raising, an except EagerException block can execute alternative tasks or provide default values, allowing the workflow to continue despite a failure in one branch.

    @eager
    async def eager_workflow_with_fallback(x: int) -> int:
    try:
    out = await add_one(x=x)
    except EagerException:
    print(f"Task 'add_one' failed for x={x}. Using fallback value.")
    out = 0 # Provide a default or fallback value
    return await double(x=out)
  • Conditional Error Handling: While EagerException is a generic wrapper, its attributes (if available) or the exception message can sometimes be inspected to determine the original error type or context, allowing for more specific recovery actions.

  • Logging and Monitoring: Catching EagerException provides an opportunity to log detailed error information, trigger alerts, or update monitoring systems before deciding to re-raise or handle the error. This is crucial for debugging and operational visibility.

  • Resource Cleanup: An except block can ensure that any resources opened or allocated within the try block are properly closed or released, even if an error occurs.

Important Considerations

  • Scope: EagerException is specifically designed for error handling within &#123;&#123;< py_func_ref @eager <flytekit.core.task.eager> >&#125;&#125; workflows. It does not apply to errors in standard, non-eager Flyte workflows, which have their own error propagation mechanisms.
  • Granularity: A single try...except EagerException block will catch errors from any task or subworkflow executed within that block. For more fine-grained error handling, consider wrapping individual task calls or smaller groups of tasks in separate try...except blocks.
  • Performance: While error handling adds a slight overhead, its impact on overall workflow performance is generally negligible compared to the benefits of increased robustness and reliability. Focus on clear, maintainable error handling logic rather than micro-optimizations.