Launch Plans: Deploying and Scheduling Workflows
Launch Plans are a fundamental construct for deploying and scheduling workflows. They serve as a deployable and executable configuration for a workflow, allowing you to define how a workflow should run, including its inputs, schedule, and execution environment.
Purpose and Core Capabilities
The primary purpose of Launch Plans is to decouple workflow definitions from their execution configurations. A single workflow can have multiple Launch Plans, each tailored for different use cases, environments, or schedules. This enables flexible deployment and management of complex data and ML pipelines.
Core capabilities include:
- Workflow Association: Every Launch Plan is intrinsically linked to a specific workflow.
- Input Management:
- Default Inputs: Define default values for workflow inputs that can be overridden at execution time.
- Fixed Inputs: Specify input values that are immutable and cannot be changed when the Launch Plan is executed.
- Scheduling: Configure workflows to run automatically at specified intervals using a
Scheduleobject or aLaunchPlanTriggerBase. - Notifications: Set up alerts to be sent based on the execution status transitions of the workflow (e.g., success, failure).
- Resource Configuration: Apply custom Kubernetes labels and annotations to workflow executions, define raw output data configurations (e.g., S3 paths), and control the maximum parallelism for tasks within a workflow.
- Security Context: Specify the security context, including IAM roles or Kubernetes service accounts, under which the workflow execution will run. This ensures proper access control and permissions.
- Caching Control: Override the default caching behavior for workflow executions.
- Auto-activation: Automatically activate a Launch Plan upon registration, making it immediately available for execution or scheduling.
Common Use Cases
Launch Plans are versatile and support a wide range of scenarios:
- Scheduled Data Pipelines: Create a Launch Plan with a daily or hourly schedule to run a data processing workflow automatically.
- Production Deployments: Define a Launch Plan for a production environment with specific fixed inputs (e.g., production database credentials), a dedicated IAM role, and notifications for failures.
- Ad-hoc Executions with Defaults: Provide a Launch Plan that allows users to run a workflow with sensible default parameters, while still enabling them to override specific inputs as needed.
- A/B Testing or Experimentation: Set up multiple Launch Plans for the same workflow, each with different fixed inputs representing experimental parameters, to easily compare results.
- Environment-Specific Configurations: Use different Launch Plans for development, staging, and production environments, each with distinct resource allocations, security settings, and output locations.
- Referencing Existing Deployments: Create a
ReferenceLaunchPlanto point to a Launch Plan already registered on the platform, allowing local code to interact with remote deployments without re-registering.
Creating and Configuring Launch Plans
Launch Plans are created using the LaunchPlan class. The primary methods for creation are get_or_create and create.
Default Launch Plans
Every workflow implicitly has a default Launch Plan. This plan uses the workflow's inherent default input values (if any) and default authentication information, without any custom schedules, notifications, or fixed inputs.
To obtain a default Launch Plan:
from flytekit import workflow, LaunchPlan
@workflow
def my_wf(a: int = 1, c: str = "hello") -> str:
# ... workflow logic ...
return f"{c} {a}"
default_lp = LaunchPlan.get_or_create(workflow=my_wf)
Attempting to add additional properties (like schedule or fixed_inputs) when creating a default Launch Plan (i.e., without providing a name) will result in an error.
Custom Launch Plans
For more control, create a named Launch Plan. This allows you to specify default inputs, fixed inputs, schedules, notifications, and other execution-time configurations.
from flytekit import workflow, LaunchPlan
from flytekit.models.schedule import Schedule
from flytekit.models.common import Notification, Labels, Annotations, RawOutputDataConfig, AuthRole
from flytekit.models.core import security
from datetime import timedelta
@workflow
def my_wf(a: int, c: str) -> str:
# ... workflow logic ...
return f"{c} {a}"
# Define a schedule (e.g., every 5 minutes)
daily_schedule = Schedule(cron_expression="0 0 * * *") # Daily at midnight UTC
# Define notifications
email_notification = Notification(
phases=[Notification.Phase.SUCCEEDED, Notification.Phase.FAILED],
recipients_email=["dev-team@example.com"]
)
# Define security context
my_security_context = security.SecurityContext(
run_as=security.Identity(iam_role="arn:aws:iam::123456789012:role/my-flyte-role")
)
# Create a named Launch Plan with custom configurations
scheduled_lp = LaunchPlan.create(
name="daily_workflow_run",
workflow=my_wf,
default_inputs={"a": 10}, # Default value for 'a', can be overridden
fixed_inputs={"c": "scheduled run"}, # Fixed value for 'c', cannot be overridden
schedule=daily_schedule,
notifications=[email_notification],
labels=Labels(values={"environment": "production"}),
annotations=Annotations(values={"owner": "data-team"}),
raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://my-bucket/flyte-outputs"),
max_parallelism=5, # Limit concurrent tasks
security_context=my_security_context,
overwrite_cache=True, # Always overwrite cache for this LP
auto_activate=True, # Activate on registration
)
When using get_or_create with a name, if a Launch Plan with that name already exists in the cache and its properties match the provided arguments, the cached version is returned. If properties differ, an AssertionError is raised to prevent inconsistent definitions.
Input Parameters
default_inputs: A dictionary of Python native values that serve as defaults for workflow inputs. These can be overridden when the Launch Plan is executed.fixed_inputs: A dictionary of Python native values that are permanently bound to the Launch Plan. These inputs cannot be changed at execution time. If an input is specified in bothdefault_inputsandfixed_inputs, thefixed_inputsvalue takes precedence and removes it from the overridable parameters.
Security Context
The security_context parameter allows specifying the identity under which the workflow execution will run. This is crucial for granting necessary permissions to access external resources (e.g., S3, databases). The older auth_role parameter is deprecated; use security_context instead. Specifying both will raise a ValueError.
Execution and Compilation
When a Launch Plan is invoked directly (e.g., scheduled_lp(a=20)), it forwards the call to its associated workflow, incorporating its saved_inputs (which include both default_inputs and fixed_inputs). During compilation, the Launch Plan contributes to the workflow graph, allowing the platform to understand its configuration.
Referencing Existing Launch Plans
The ReferenceLaunchPlan class allows you to create a local pointer to a Launch Plan that has already been registered on the Flyte platform. This is useful when you want to interact with a remote Launch Plan without needing to define its full configuration locally.
from flytekit import ReferenceLaunchPlan
# Reference a Launch Plan that exists on the Flyte platform
# You must provide the expected input and output types.
remote_lp = ReferenceLaunchPlan(
project="flytesnacks",
domain="development",
name="my_remote_workflow.my_scheduled_lp",
version="v1",
inputs={"a": int, "c": str},
outputs={"o0": str}
)
The ReferenceLaunchPlan does not make a network call during its instantiation. The provided inputs and outputs are used for local type checking and compilation. If these types do not match the actual remote Launch Plan's interface during registration or execution, an error will occur.
Internal Structure and State
Under the hood, Launch Plans are represented by several core models:
LaunchPlanSpec: This model defines the static configuration of a Launch Plan, including its associatedworkflow_id,default_inputs,fixed_inputs,labels,annotations,auth_role(orsecurity_context),raw_output_data_config,max_parallelism, andoverwrite_cache. It represents the desired state of the Launch Plan.LaunchPlanMetadata: Contains metadata related to the execution of the Launch Plan, primarily thescheduleandnotifications.LaunchPlanClosure: Captures the current state of a Launch Plan on the platform, including itsLaunchPlanState(e.g.,ACTIVE,INACTIVE) and itsexpected_inputsandexpected_outputs.LaunchPlanState: An enumeration defining the activation status of a Launch Plan. AnACTIVELaunch Plan can be executed or trigger scheduled runs, while anINACTIVEone cannot.
These models are used for serialization and communication with the Flyte backend, ensuring that the Launch Plan's configuration is consistently managed across the system.
Considerations
- Uniqueness: Named Launch Plans must have unique names within a given project, domain, and version. Attempting to create two named Launch Plans with the same name but different properties will raise an
AssertionError. - Caching: The
LaunchPlanclass maintains a cache (LaunchPlan.CACHE) to store created Launch Plans. This prevents redundant object creation and ensures consistency whenget_or_createis called multiple times for the same Launch Plan. AuthRolevs.SecurityContext:AuthRoleis deprecated. Always preferSecurityContextfor defining execution identity.max_parallelism: This parameter controls the maximum number of task nodes that can run concurrently within a workflow execution. It helps manage resource consumption and fairness. Note that MapTasks are treated as a single unit, and their internal parallelism is managed separately.