Skip to main content

Reference Tasks

Reference Tasks provide a standardized and efficient mechanism for retrieving specific data or information required by an application. This system abstracts the complexities of data sourcing, allowing developers to define, execute, and manage data lookups consistently across various internal and external systems.

Core Capabilities

The system offers several core capabilities designed to streamline data retrieval:

  • Declarative Task Definition: Define reference tasks using a clear, structured approach, specifying the data source, query parameters, and expected output format. This promotes reusability and maintainability.
  • Unified Execution Interface: Execute diverse reference tasks through a single, consistent interface, regardless of the underlying data source (e.g., databases, REST APIs, configuration files).
  • Result Caching: Implement configurable caching strategies to reduce latency and load on data sources for frequently accessed information.
  • Error Handling and Fallbacks: Define robust error handling mechanisms, including retries, timeouts, and optional fallback values or alternative data sources when a primary lookup fails.
  • Data Transformation: Apply transformations to raw data retrieved from sources, ensuring the output conforms to the expected structure and type for consuming components.
  • Asynchronous Execution: Support asynchronous execution of tasks, enabling non-blocking data retrieval and improving application responsiveness.

Common Use Cases

Reference Tasks are ideal for scenarios requiring reliable and performant data lookups:

  • User Profile Enrichment: Fetching additional user details (e.g., preferences, subscription status) from a user service or database based on a user ID.

    from reference_tasks import TaskRegistry, ReferenceTaskExecutor

    # Assume 'UserProfileTask' is a pre-defined task in the registry
    # that fetches user data from a database.
    user_id = "user_123"
    user_profile_task = TaskRegistry.get_task("UserProfileTask", user_id=user_id)

    executor = ReferenceTaskExecutor()
    user_data = executor.execute(user_profile_task)

    if user_data:
    print(f"User Name: {user_data.get('name')}, Email: {user_data.get('email')}")
    else:
    print(f"User profile for {user_id} not found.")
  • Configuration Retrieval: Dynamically loading application configuration settings from a centralized configuration service or file based on environment or service name.

    # Assume 'ServiceConfigTask' fetches configuration from a remote API.
    service_name = "payment_gateway"
    config_task = TaskRegistry.get_task("ServiceConfigTask", service_name=service_name)

    executor = ReferenceTaskExecutor()
    service_config = executor.execute(config_task)

    if service_config:
    print(f"Payment Gateway URL: {service_config.get('api_url')}")
    else:
    print(f"Configuration for {service_name} not available.")
  • Product Information Lookup: Retrieving product details (e.g., price, description, inventory) from an e-commerce product catalog API using a product SKU.

    # Assume 'ProductDetailsTask' queries an external product API.
    product_sku = "PROD-XYZ-789"
    product_task = TaskRegistry.get_task("ProductDetailsTask", sku=product_sku)

    executor = ReferenceTaskExecutor()
    product_details = executor.execute(product_task)

    if product_details:
    print(f"Product: {product_details.get('name')}, Price: ${product_details.get('price')}")
    else:
    print(f"Product with SKU {product_sku} not found.")
  • Data Validation and Enrichment: Validating incoming data against a reference dataset or enriching it with supplementary information from another source.

Defining Reference Tasks

To define a new reference task, extend the ReferenceTask abstract base class and implement the _execute_source_lookup method. This method encapsulates the specific logic for interacting with the data source.

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

class ReferenceTask(ABC):
"""Abstract base class for all reference tasks."""
def __init__(self, task_id: str, **kwargs):
self.task_id = task_id
self.params = kwargs

@abstractmethod
def _execute_source_lookup(self) -> Optional[Dict[str, Any]]:
"""
Implement the specific data retrieval logic for the task.
Returns the raw data as a dictionary or None if not found.
"""
pass

def execute(self) -> Optional[Dict[str, Any]]:
"""Executes the task, including pre/post processing and error handling."""
# This method would typically handle caching, retries, transformations, etc.
# For simplicity, we directly call the source lookup here.
try:
result = self._execute_source_lookup()
# Apply transformations here if defined
return result
except Exception as e:
print(f"Error executing task {self.task_id}: {e}")
# Implement fallback logic here
return None

class DatabaseLookupTask(ReferenceTask):
"""A concrete task for looking up data in a database."""
def __init__(self, task_id: str, table_name: str, key_column: str, key_value: Any):
super().__init__(task_id, table_name=table_name, key_column=key_column, key_value=key_value)
self.table_name = table_name
self.key_column = key_column
self.key_value = key_value

def _execute_source_lookup(self) -> Optional[Dict[str, Any]]:
print(f"Simulating DB lookup for {self.table_name} where {self.key_column}={self.key_value}")
# In a real scenario, this would interact with a database client
if self.key_value == "user_123":
return {"id": "user_123", "name": "Alice", "email": "alice@example.com"}
return None

# Registering the task (conceptual)
class TaskRegistry:
_tasks = {}

@staticmethod
def register_task(name: str, task_class: type):
TaskRegistry._tasks[name] = task_class

@staticmethod
def get_task(name: str, **kwargs) -> ReferenceTask:
task_class = TaskRegistry._tasks.get(name)
if not task_class:
raise ValueError(f"Task '{name}' not registered.")
return task_class(task_id=name, **kwargs)

# Example usage with the custom task
TaskRegistry.register_task("UserProfileTask", DatabaseLookupTask)

user_id = "user_123"
user_profile_task = TaskRegistry.get_task("UserProfileTask", table_name="users", key_column="id", key_value=user_id)
user_data = user_profile_task.execute() # Direct execution for demonstration

if user_data:
print(f"Retrieved user data: {user_data}")

Integration Points

Reference Tasks integrate seamlessly into various application architectures:

  • Service Layers: Embed task execution within service methods to encapsulate data retrieval logic, keeping business logic clean.
  • API Endpoints: Use tasks to fetch data required for API responses, ensuring consistent data sourcing.
  • Background Jobs: Leverage tasks in asynchronous workers or cron jobs for data processing, reporting, or synchronization.
  • Event-Driven Architectures: Trigger tasks in response to events to enrich event data or perform lookups based on event payloads.

Performance Considerations

  • Caching Strategy: Configure appropriate caching policies (e.g., time-to-live, cache invalidation) for each task based on data volatility and access patterns. The ReferenceTaskExecutor manages cache interactions.
  • Asynchronous Execution: For tasks that involve network I/O or long-running operations, utilize the asynchronous execution capabilities provided by the task executor to prevent blocking the main application thread.
  • Batching: When possible, design tasks to support batch lookups to minimize round trips to data sources. The _execute_source_lookup method can be extended to accept multiple keys.

Limitations and Considerations

  • Overhead for Simple Lookups: For extremely simple, single-value lookups from local memory or highly optimized caches, the overhead of defining and executing a ReferenceTask might be unnecessary. Direct access might be more suitable in such niche cases.
  • Complexity of Transformations: While the system supports data transformations, overly complex or stateful transformations might be better handled by dedicated data processing pipelines outside the scope of a single reference task.
  • Dependency Management: Ensure that the underlying data source clients (e.g., database drivers, HTTP libraries) are properly configured and available within the execution environment of the tasks.
  • Security: Implement proper authentication and authorization for tasks accessing sensitive data sources. The task definition itself should not store credentials directly; instead, it should reference secure configuration stores.