Skip to main content

Plugin System for Customization

The Plugin System for Customization provides a robust mechanism for developers to extend and modify core Flytekit behaviors without altering the main codebase. This system allows for deep integration and tailored configurations, enabling Flytekit to adapt to diverse operational environments and organizational requirements.

At its core, the system defines a contract through the FlytekitPluginProtocol. Any class adhering to this protocol can serve as a custom plugin, overriding default Flytekit functionalities. The FlytekitPlugin class serves as the default implementation, providing standard behaviors that can be selectively replaced by custom plugins.

Customization Capabilities

The plugin system offers several key extension points, each corresponding to a static method defined in the FlytekitPluginProtocol. Implementing these methods in a custom plugin allows for specific aspects of Flytekit's operation to be modified.

Remote Connection Configuration

The get_remote method enables customization of how FlyteRemote objects are instantiated for CLI sessions. This is crucial for environments requiring specific connection parameters, authentication mechanisms, or data upload locations.

@staticmethod
def get_remote(
config: Optional[str], project: str, domain: str, data_upload_location: Optional[str] = None
) -> FlyteRemote:
# Custom logic to create and return a FlyteRemote instance
# This could involve reading custom configuration files,
# interacting with environment-specific credential stores,
# or enforcing particular default settings.
pass

CLI Extension

The configure_pyflyte_cli method allows for extending or modifying the pyflyte command-line interface. Developers can inject custom commands, subcommands, or modify existing ones to streamline workflows specific to their organization.

from click import Group

@staticmethod
def configure_pyflyte_cli(main: Group) -> Group:
# Add custom commands or modify existing ones
# For example, adding a 'pyflyte deploy-env' command
@main.command()
def deploy_env():
"""Deploys the current environment configuration."""
print("Deploying environment...")
return main

Secret Group Requirement

The secret_requires_group method determines whether secrets necessitate a group entry during registration. This provides control over security policies and how secrets are managed within the Flyte environment.

@staticmethod
def secret_requires_group() -> bool:
# Return True to enforce group requirement for secrets, False otherwise
return True

Default Image Specification

The get_default_image method allows for specifying a default Docker image to be used for tasks. This is useful for enforcing consistent base images across an organization or for providing environment-specific defaults. Returning None defers to Flytekit's internal default image configuration.

@staticmethod
def get_default_image() -> Optional[str]:
# Return a specific image string, e.g., "my-org/flyte-base:latest"
return "my-org/flyte-base:v1.2.3"

Authentication Success Page Customization

The get_auth_success_html method provides a way to customize the HTML content displayed upon successful authentication. This can be used to brand the authentication experience or provide specific post-authentication instructions. Returning None uses Flytekit's default success page.

@staticmethod
def get_auth_success_html(endpoint: str) -> Optional[str]:
# Return a custom HTML string for the authentication success page
return f"<h1>Welcome to MyOrg Flyte!</h1><p>You are now authenticated to {endpoint}.</p>"

Default Cache Policies

The get_default_cache_policies method enables the definition of default cache policies for tasks. This allows for consistent caching behavior across tasks unless explicitly overridden at the task level.

from typing import List
from flytekit.models.core.tasks import CachePolicy # Assuming CachePolicy is available

@staticmethod
def get_default_cache_policies() -> List[CachePolicy]:
# Return a list of default CachePolicy objects
# For example, enabling caching by default with a specific duration
return [CachePolicy(enabled=True, ttl_seconds=3600)]

Implementing a Custom Plugin

To implement a custom plugin, define a class that provides static methods matching the signature of FlytekitPluginProtocol. Flytekit will discover and utilize this custom implementation to override its default behaviors.

from typing import Optional, List
from click import Group
# Assuming FlyteRemote, Config, CachePolicy are available from flytekit's context
# from flytekit.remote import FlyteRemote
# from flytekit.configuration import Config
# from flytekit.models.core.tasks import CachePolicy

class MyCustomFlytePlugin:
@staticmethod
def get_remote(
config: Optional[str], project: str, domain: str, data_upload_location: Optional[str] = None
) -> FlyteRemote:
# Custom logic for remote connection
print(f"Using custom remote for project {project}, domain {domain}")
# Example: Always connect to a specific internal endpoint
# cfg_obj = Config.for_endpoint("https://my-internal-flyte.com")
# return FlyteRemote(cfg_obj, default_project=project, default_domain=domain)
# For demonstration, defer to default FlytekitPlugin behavior
return FlytekitPlugin.get_remote(config, project, domain, data_upload_location)

@staticmethod
def configure_pyflyte_cli(main: Group) -> Group:
@main.command()
def custom_command():
"""A custom command added by MyCustomFlytePlugin."""
print("Executing custom plugin command!")
return main

@staticmethod
def get_default_image() -> Optional[str]:
return "my-org/custom-base:1.0.0"

# Other methods can be implemented similarly or omitted to use Flytekit's defaults

Common Use Cases

  • Environment-Specific Configurations: Automatically configure FlyteRemote objects to point to different Flyte clusters (e.g., development, staging, production) based on environment variables or custom configuration files.
  • Organizational Standards Enforcement: Mandate specific default Docker images for all tasks, ensuring consistency and compliance with security policies.
  • Custom CLI Tools: Integrate organization-specific deployment scripts, data management utilities, or workflow orchestration commands directly into the pyflyte CLI.
  • Authentication Integration: Customize the authentication flow's success page to match corporate branding or provide specific next steps for users.
  • Default Caching Strategies: Establish default caching policies for tasks to optimize execution times and resource usage across the platform.
  • Security Policy Management: Control whether secrets require group association, aligning with internal security best practices.