Low-Level Flyte Client API
The Low-Level Flyte Client API provides direct programmatic access to the Flyte Admin service, enabling fine-grained control over Flyte resources and executions. It serves as the foundational interface for interacting with a Flyte cluster, offering capabilities to manage workflows, tasks, launch plans, and executions at a granular level.
Primary Purpose
The primary purpose of the Low-Level Flyte Client API is to facilitate direct interaction with the Flyte control plane (Admin service) using its native gRPC interface. This allows developers to build custom tools, integrations, and advanced automation scripts that require precise control over Flyte entities, bypassing the higher-level abstractions offered by client SDKs. It is ideal for scenarios where a deep understanding of Flyte's internal object model and execution lifecycle is necessary.
Core Features
The low-level client exposes a comprehensive set of operations for managing Flyte resources:
- Resource Management: Create, update, fetch, and delete Flyte entities such as workflows, tasks, and launch plans. This includes registering new versions of these entities and retrieving their definitions.
- Execution Control: Initiate, monitor, terminate, and retrieve details of workflow and task executions. This includes fetching execution status, logs, inputs, and outputs.
- Project and Domain Management: Interact with Flyte projects and domains, listing available ones and managing their configurations.
- Artifact Handling: Upload and retrieve artifacts associated with tasks and workflows, often interacting with underlying blob storage.
- Querying and Listing: Efficiently query and list various Flyte resources and executions based on filters, pagination, and sorting criteria.
- Authentication and Authorization: Integrate with Flyte's security mechanisms, typically involving token-based authentication to secure API calls.
Common Use Cases
Developers leverage the low-level client for a variety of advanced scenarios:
- Custom Schedulers and Orchestrators: Building bespoke systems that schedule Flyte workflows based on external events or complex business logic, rather than relying solely on Flyte's built-in scheduling.
- Advanced Monitoring and Alerting: Creating custom dashboards, monitoring tools, or alerting systems that pull detailed execution metrics and status updates directly from the Flyte Admin service.
- Automated Resource Provisioning: Scripting the automated registration and update of a large number of workflows, tasks, or launch plans, especially in CI/CD pipelines or infrastructure-as-code setups.
- Cross-Language Integrations: Integrating Flyte with applications written in languages other than Python, by directly consuming the gRPC API definitions.
- Data Migration and Backup: Developing utilities to export or import Flyte resource definitions and execution metadata for backup, migration, or disaster recovery purposes.
- Debugging and Diagnostics: Performing deep dives into execution states, retrieving raw protobuf messages, or simulating specific API interactions for advanced debugging.
Key Concepts and Components
The low-level client typically interacts with the Flyte Admin service through a FlyteClient object. This object encapsulates the gRPC connection and provides methods corresponding to the various service endpoints.
Client Initialization
To establish a connection, instantiate the client with the Flyte Admin service endpoint and any necessary authentication credentials.
from flytekit.remote.remote import FlyteRemote # This is an example of a higher-level client, but the low-level client would be similar in concept for connection
from flytekit.configuration import Config
# For the low-level client, you would typically interact with a gRPC stub directly
# This is a conceptual example, actual implementation depends on the gRPC client library
class LowLevelFlyteClient:
def __init__(self, host: str, insecure: bool = False, token: str = None):
# Initialize gRPC channel and service stubs
# e.g., self._admin_service_stub = flyteidl.admin.admin_pb2_grpc.AdminServiceStub(channel)
pass
def create_execution(self, project: str, domain: str, launch_plan_name: str, inputs: dict):
# Construct protobuf messages and make gRPC call
# e.g., request = flyteidl.admin.admin_pb2.WorkflowExecutionCreateRequest(...)
# return self._admin_service_stub.CreateWorkflowExecution(request)
print(f"Creating execution for {launch_plan_name} in {project}/{domain} with inputs: {inputs}")
return {"id": "exec-123", "status": "QUEUED"}
# Example usage (conceptual)
client = LowLevelFlyteClient(host="flyte.example.com:80", insecure=True)
Managing Executions
The client provides methods to launch new executions and monitor their progress.
# Assuming 'client' is an initialized LowLevelFlyteClient instance
# Launch a workflow execution
execution_details = client.create_execution(
project="flytesnacks",
domain="development",
launch_plan_name="my_workflow_lp",
inputs={"input_param": "value"}
)
print(f"Execution created: {execution_details['id']}")
# Get execution status (conceptual)
class ExecutionStatus:
SUCCEEDED = "SUCCEEDED"
RUNNING = "RUNNING"
FAILED = "FAILED"
def get_execution(execution_id: str):
# Make gRPC call to get execution details
# e.g., request = flyteidl.admin.admin_pb2.WorkflowExecutionGetRequest(id=execution_id)
# response = self._admin_service_stub.GetWorkflowExecution(request)
# return response.closure.phase
if execution_id == "exec-123":
return ExecutionStatus.RUNNING
return ExecutionStatus.SUCCEEDED
status = get_execution(execution_details['id'])
print(f"Execution {execution_details['id']} status: {status}")
Registering Resources
Registering new versions of tasks or workflows involves constructing the appropriate protobuf messages and sending them via the client.
# Assuming 'client' is an initialized LowLevelFlyteClient instance
def register_workflow(project: str, domain: str, workflow_name: str, workflow_definition_proto: bytes):
# Construct and send a WorkflowRegisterRequest
# e.g., request = flyteidl.admin.admin_pb2.WorkflowRegisterRequest(...)
# return self._admin_service_stub.RegisterWorkflow(request)
print(f"Registering workflow '{workflow_name}' in {project}/{domain}")
return {"message": "Workflow registered successfully"}
# In a real scenario, workflow_definition_proto would be a serialized Flyte workflow IDL
# For demonstration, we use a placeholder
workflow_proto_data = b"some_serialized_workflow_definition"
register_workflow(
project="flytesnacks",
domain="development",
workflow_name="my_new_workflow",
workflow_definition_proto=workflow_proto_data
)
Integration Patterns
The low-level client is often integrated into:
- Custom CLI Tools: Building command-line interfaces for specific operational tasks or developer workflows.
- Webhooks and Event-Driven Systems: Responding to external events by triggering Flyte workflows or updating Flyte resources.
- Data Pipelines: Orchestrating complex data processing pipelines where Flyte serves as a component, and external systems need to interact with it directly.
- Cloud Functions/Serverless: Deploying functions that interact with Flyte for specific, isolated tasks, such as launching a workflow on demand.
Limitations and Considerations
- Complexity: Direct interaction with the gRPC API requires a deep understanding of Flyte's internal data models (protobuf definitions). This increases development complexity compared to using higher-level SDKs.
- Versioning: The API can evolve. Maintaining compatibility with different Flyte Admin service versions requires careful management of gRPC stubs and protobuf definitions.
- Error Handling: Error responses from the Admin service are typically gRPC status codes and messages, requiring robust error handling logic in the client application.
- Performance: While direct gRPC calls are efficient, improper use (e.g., excessive polling, unoptimized queries) can still lead to performance bottlenecks. Batching requests and using efficient filtering are crucial.
- Security: Managing authentication tokens and ensuring secure communication (TLS) is the responsibility of the client application.
Best Practices
- Use Generated Code: Leverage gRPC tools to generate client stubs and message classes from Flyte's
.protodefinitions. This ensures type safety and reduces boilerplate. - Implement Robust Error Handling: Anticipate and handle various gRPC status codes and application-level errors returned by the Admin service.
- Implement Retries with Backoff: Network issues or transient service unavailability can occur. Implement exponential backoff and retry logic for idempotent operations.
- Paginate and Filter Queries: When listing resources or executions, always use pagination and apply filters to limit the data fetched, improving performance and reducing load on the Admin service.
- Cache Static Data: Cache frequently accessed, static information (e.g., project/domain lists, common workflow definitions) to reduce redundant API calls.
- Monitor API Usage: Implement logging and monitoring for your low-level client interactions to track usage patterns, identify potential issues, and debug effectively.
- Prefer Higher-Level SDKs When Possible: Only resort to the low-level client when higher-level SDKs do not provide the necessary functionality or control. Higher-level SDKs abstract away much of the complexity and offer a more Pythonic interface.