High-Level Flyte Client API
The High-Level Flyte Client API simplifies programmatic interaction with a Flyte deployment, abstracting away the complexities of gRPC communication and protobuf serialization. It provides a Pythonic interface for managing and interacting with Flyte entities, enabling developers to integrate Flyte's powerful workflow orchestration capabilities into external applications and systems.
Purpose
The primary purpose of the High-Level Flyte Client API is to offer a straightforward, idiomatic Python interface for interacting with the Flyte Admin service. This allows developers to programmatically control and observe Flyte workflows, tasks, and executions without needing to delve into the underlying low-level API details. It facilitates building custom tools, automation scripts, and integrations that leverage Flyte's capabilities.
Core Capabilities
The client API provides a comprehensive set of functionalities for managing Flyte resources and executions.
Workflow and Task Execution Management
The client API enables direct control over the lifecycle of Flyte executions. Developers can initiate new workflow executions, resume suspended ones, or terminate active executions. It supports specifying inputs, overriding launch plan parameters, and managing execution tags.
from flyte_client import FlyteClient, WorkflowIdentifier
# Assume client is initialized with appropriate configuration
client = FlyteClient(
host="flyte.example.com",
insecure=False,
# ... other configuration like credentials
)
workflow_id = WorkflowIdentifier(
project="flyte-project",
domain="development",
name="my_workflow",
version="v1.0.0"
)
# Launch a workflow execution
execution = client.execute_workflow(
workflow_id=workflow_id,
inputs={"input_param": "some_value"},
execution_name="my_custom_run_123",
labels={"triggered_by": "api"}
)
print(f"Launched execution: {execution.id.name}")
Execution Monitoring and Data Retrieval
Developers can monitor the status of ongoing executions, retrieve detailed information about their progress, and access execution outputs. This includes fetching logs, inspecting node and task execution details, and retrieving structured outputs.
from flyte_client import ExecutionStatus
# Assuming 'execution' object from a previous launch
# Or retrieve an existing execution by its ID
# execution = client.get_execution(execution_id="my_custom_run_123")
# Poll for status updates
while execution.status not in [ExecutionStatus.SUCCEEDED, ExecutionStatus.FAILED, ExecutionStatus.ABORTED]:
execution = client.get_execution(execution_id=execution.id.name)
print(f"Execution {execution.id.name} status: {execution.status}")
# time.sleep(5) # In a real application, add a delay
if execution.status == ExecutionStatus.SUCCEEDED:
outputs = client.get_execution_outputs(execution_id=execution.id.name)
print(f"Execution outputs: {outputs}")
else:
print(f"Execution {execution.id.name} failed or was aborted.")
error_info = client.get_execution_error(execution_id=execution.id.name)
print(f"Error details: {error_info}")
Resource Administration
The client API provides methods for managing Flyte projects, domains, workflows, tasks, and launch plans. This includes listing available resources, retrieving their definitions, and performing administrative actions like archiving or activating entities.
# List all projects
projects = client.list_projects()
print("Available Projects:")
for p in projects:
print(f"- {p.id.name}")
# Get a specific workflow definition
workflow_definition = client.get_workflow(workflow_id)
print(f"Workflow '{workflow_id.name}' definition: {workflow_definition.metadata}")
Type System Integration
The client API handles the serialization and deserialization of Python native types to and from Flyte's literal types. This ensures seamless data exchange when providing inputs to workflows or consuming outputs from executions, abstracting away the underlying protobuf representations.
Common Use Cases
The High-Level Flyte Client API is instrumental in various integration scenarios.
Automated Workflow Orchestration
Triggering Flyte workflows from external systems, such as event-driven architectures, scheduled cron jobs, or CI/CD pipelines. For example, a data ingestion service might trigger a Flyte workflow upon new data arrival, passing the data location as an input.
Custom Monitoring and Reporting
Building custom dashboards, alerting systems, or reporting tools that aggregate information from multiple Flyte executions. Developers can query execution statuses, retrieve metrics, and generate reports tailored to specific operational needs.
CI/CD Integration
Integrating Flyte into continuous integration and continuous deployment pipelines. This allows for automated testing of Flyte workflows, programmatic deployment of new workflow versions, and triggering production workflows as part of a release process.
Programmatic Data Analysis
Analyzing historical execution data to understand performance trends, resource utilization, or identify common failure patterns. The client API facilitates querying past executions and extracting their inputs, outputs, and metadata for further analysis.
Getting Started
Client Initialization
Initialize the client by providing the Flyte Admin service endpoint and any necessary authentication details. The client supports various authentication methods, including token-based authentication and client certificates.
from flyte_client import FlyteClient
# Basic initialization for an insecure endpoint (e.g., local development)
client = FlyteClient(host="localhost:30081", insecure=True)
# Initialization with token-based authentication for a secure endpoint
# client = FlyteClient(
# host="flyte.example.com",
# insecure=False,
# token="your_auth_token", # Or use a token provider
# # ca_cert_file="path/to/ca.pem" # If using custom CA
# )
Launching a Workflow
To launch a workflow, identify it using a WorkflowIdentifier or LaunchPlanIdentifier and provide the necessary inputs as a Python dictionary.
from flyte_client import FlyteClient, WorkflowIdentifier
client = FlyteClient(host="localhost:30081", insecure=True)
workflow_id = WorkflowIdentifier(
project="flytesnacks",
domain="development",
name="core.basic.lp.my_wf", # Example from flytesnacks
version="v1" # Use the correct version for your workflow
)
inputs = {"name": "World"}
execution = client.execute_workflow(
workflow_id=workflow_id,
inputs=inputs,
execution_name="hello-world-api-run"
)
print(f"Started execution: {execution.id.name}")
Monitoring Execution Status
Use the get_execution method to retrieve the current status of an execution. The ExecutionStatus enum provides states like QUEUED, RUNNING, SUCCEEDED, FAILED, and ABORTED.
from flyte_client import FlyteClient, ExecutionStatus
import time
client = FlyteClient(host="localhost:30081", insecure=True)
execution_id_to_monitor = "hello-world-api-run" # Replace with your execution ID
current_execution = client.get_execution(execution_id=execution_id_to_monitor)
while current_execution.status not in [ExecutionStatus.SUCCEEDED, ExecutionStatus.FAILED, ExecutionStatus.ABORTED]:
print(f"Execution {current_execution.id.name} is {current_execution.status}...")
time.sleep(5)
current_execution = client.get_execution(execution_id=execution_id_to_monitor)
print(f"Execution {current_execution.id.name} finished with status: {current_execution.status}")
Retrieving Execution Outputs
Once an execution succeeds, its outputs can be retrieved using get_execution_outputs. The client API automatically converts Flyte literal outputs back into Python native types.
from flyte_client import FlyteClient, ExecutionStatus
client = FlyteClient(host="localhost:30081", insecure=True)
execution_id_to_monitor = "hello-world-api-run" # Replace with your execution ID
execution_result = client.get_execution(execution_id=execution_id_to_monitor)
if execution_result.status == ExecutionStatus.SUCCEEDED:
outputs = client.get_execution_outputs(execution_id=execution_id_to_monitor)
print(f"Outputs: {outputs}")
# Example: if the workflow returns a 'greeting' string
# print(f"Greeting: {outputs['greeting']}")
else:
print(f"Execution did not succeed. Status: {execution_result.status}")
Advanced Considerations
Authentication and Authorization
The client API supports various authentication mechanisms, including static tokens, token providers, and client certificates. For production environments, configure robust authentication to secure interactions with the Flyte Admin service. Ensure the provided credentials have the necessary permissions (e.g., execute_workflow, get_execution) for the operations performed.
Performance and Scalability
When interacting with the client API, consider the following for performance:
- Batch Operations: For scenarios requiring multiple similar operations (e.g., listing many executions), leverage methods that support pagination or filtering to reduce network overhead.
- Rate Limiting: Be mindful of potential API rate limits imposed by the Flyte Admin service. Implement appropriate backoff and retry mechanisms in client applications.
- Efficient Data Retrieval: When retrieving execution outputs or logs, specify only the necessary data to avoid transferring large, unneeded payloads.
Error Handling and Retries
Client applications should implement comprehensive error handling for network issues, authentication failures, and Flyte-specific errors. The client API raises specific exceptions for different error conditions. Implement exponential backoff and retry logic for transient errors to improve application resilience.
Version Compatibility
Ensure the client API version is compatible with the target Flyte Admin service version. Mismatches can lead to unexpected behavior or API incompatibilities. Regularly update the client API to benefit from new features and bug fixes.
Best Practices
- Configuration Management: Externalize client configuration (host, credentials, project, domain) to environment variables or configuration files, avoiding hardcoding sensitive information.
- Resource Identifiers: Always use fully qualified identifiers (
WorkflowIdentifier,TaskIdentifier,LaunchPlanIdentifier) including project, domain, name, and version to ensure unambiguous targeting of Flyte entities. - Asynchronous Operations: For long-running operations like monitoring execution status, consider using asynchronous programming patterns to avoid blocking the main application thread.
- Logging: Integrate client API interactions with your application's logging framework to capture requests, responses, and errors for debugging and auditing.
- Idempotency: When designing systems that trigger workflows, ensure that repeated calls to
execute_workflowfor the same logical operation are handled idempotently, either by Flyte's execution naming or by external logic.