Skip to main content

Launch Plans: Deploying and Scheduling Workflows

Launch Plans are a fundamental construct for deploying and scheduling workflows. They serve as a deployable and executable configuration for a workflow, allowing you to define how a workflow should run, including its inputs, schedule, and execution environment.

Purpose and Core Capabilities

The primary purpose of Launch Plans is to decouple workflow definitions from their execution configurations. A single workflow can have multiple Launch Plans, each tailored for different use cases, environments, or schedules. This enables flexible deployment and management of complex data and ML pipelines.

Core capabilities include:

  • Workflow Association: Every Launch Plan is intrinsically linked to a specific workflow.
  • Input Management:
    • Default Inputs: Define default values for workflow inputs that can be overridden at execution time.
    • Fixed Inputs: Specify input values that are immutable and cannot be changed when the Launch Plan is executed.
  • Scheduling: Configure workflows to run automatically at specified intervals using a Schedule object or a LaunchPlanTriggerBase.
  • Notifications: Set up alerts to be sent based on the execution status transitions of the workflow (e.g., success, failure).
  • Resource Configuration: Apply custom Kubernetes labels and annotations to workflow executions, define raw output data configurations (e.g., S3 paths), and control the maximum parallelism for tasks within a workflow.
  • Security Context: Specify the security context, including IAM roles or Kubernetes service accounts, under which the workflow execution will run. This ensures proper access control and permissions.
  • Caching Control: Override the default caching behavior for workflow executions.
  • Auto-activation: Automatically activate a Launch Plan upon registration, making it immediately available for execution or scheduling.

Common Use Cases

Launch Plans are versatile and support a wide range of scenarios:

  • Scheduled Data Pipelines: Create a Launch Plan with a daily or hourly schedule to run a data processing workflow automatically.
  • Production Deployments: Define a Launch Plan for a production environment with specific fixed inputs (e.g., production database credentials), a dedicated IAM role, and notifications for failures.
  • Ad-hoc Executions with Defaults: Provide a Launch Plan that allows users to run a workflow with sensible default parameters, while still enabling them to override specific inputs as needed.
  • A/B Testing or Experimentation: Set up multiple Launch Plans for the same workflow, each with different fixed inputs representing experimental parameters, to easily compare results.
  • Environment-Specific Configurations: Use different Launch Plans for development, staging, and production environments, each with distinct resource allocations, security settings, and output locations.
  • Referencing Existing Deployments: Create a ReferenceLaunchPlan to point to a Launch Plan already registered on the platform, allowing local code to interact with remote deployments without re-registering.

Creating and Configuring Launch Plans

Launch Plans are created using the LaunchPlan class. The primary methods for creation are get_or_create and create.

Default Launch Plans

Every workflow implicitly has a default Launch Plan. This plan uses the workflow's inherent default input values (if any) and default authentication information, without any custom schedules, notifications, or fixed inputs.

To obtain a default Launch Plan:

from flytekit import workflow, LaunchPlan

@workflow
def my_wf(a: int = 1, c: str = "hello") -> str:
# ... workflow logic ...
return f"{c} {a}"

default_lp = LaunchPlan.get_or_create(workflow=my_wf)

Attempting to add additional properties (like schedule or fixed_inputs) when creating a default Launch Plan (i.e., without providing a name) will result in an error.

Custom Launch Plans

For more control, create a named Launch Plan. This allows you to specify default inputs, fixed inputs, schedules, notifications, and other execution-time configurations.

from flytekit import workflow, LaunchPlan
from flytekit.models.schedule import Schedule
from flytekit.models.common import Notification, Labels, Annotations, RawOutputDataConfig, AuthRole
from flytekit.models.core import security
from datetime import timedelta

@workflow
def my_wf(a: int, c: str) -> str:
# ... workflow logic ...
return f"{c} {a}"

# Define a schedule (e.g., every 5 minutes)
daily_schedule = Schedule(cron_expression="0 0 * * *") # Daily at midnight UTC

# Define notifications
email_notification = Notification(
phases=[Notification.Phase.SUCCEEDED, Notification.Phase.FAILED],
recipients_email=["dev-team@example.com"]
)

# Define security context
my_security_context = security.SecurityContext(
run_as=security.Identity(iam_role="arn:aws:iam::123456789012:role/my-flyte-role")
)

# Create a named Launch Plan with custom configurations
scheduled_lp = LaunchPlan.create(
name="daily_workflow_run",
workflow=my_wf,
default_inputs={"a": 10}, # Default value for 'a', can be overridden
fixed_inputs={"c": "scheduled run"}, # Fixed value for 'c', cannot be overridden
schedule=daily_schedule,
notifications=[email_notification],
labels=Labels(values={"environment": "production"}),
annotations=Annotations(values={"owner": "data-team"}),
raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://my-bucket/flyte-outputs"),
max_parallelism=5, # Limit concurrent tasks
security_context=my_security_context,
overwrite_cache=True, # Always overwrite cache for this LP
auto_activate=True, # Activate on registration
)

When using get_or_create with a name, if a Launch Plan with that name already exists in the cache and its properties match the provided arguments, the cached version is returned. If properties differ, an AssertionError is raised to prevent inconsistent definitions.

Input Parameters

  • default_inputs: A dictionary of Python native values that serve as defaults for workflow inputs. These can be overridden when the Launch Plan is executed.
  • fixed_inputs: A dictionary of Python native values that are permanently bound to the Launch Plan. These inputs cannot be changed at execution time. If an input is specified in both default_inputs and fixed_inputs, the fixed_inputs value takes precedence and removes it from the overridable parameters.

Security Context

The security_context parameter allows specifying the identity under which the workflow execution will run. This is crucial for granting necessary permissions to access external resources (e.g., S3, databases). The older auth_role parameter is deprecated; use security_context instead. Specifying both will raise a ValueError.

Execution and Compilation

When a Launch Plan is invoked directly (e.g., scheduled_lp(a=20)), it forwards the call to its associated workflow, incorporating its saved_inputs (which include both default_inputs and fixed_inputs). During compilation, the Launch Plan contributes to the workflow graph, allowing the platform to understand its configuration.

Referencing Existing Launch Plans

The ReferenceLaunchPlan class allows you to create a local pointer to a Launch Plan that has already been registered on the Flyte platform. This is useful when you want to interact with a remote Launch Plan without needing to define its full configuration locally.

from flytekit import ReferenceLaunchPlan

# Reference a Launch Plan that exists on the Flyte platform
# You must provide the expected input and output types.
remote_lp = ReferenceLaunchPlan(
project="flytesnacks",
domain="development",
name="my_remote_workflow.my_scheduled_lp",
version="v1",
inputs={"a": int, "c": str},
outputs={"o0": str}
)

The ReferenceLaunchPlan does not make a network call during its instantiation. The provided inputs and outputs are used for local type checking and compilation. If these types do not match the actual remote Launch Plan's interface during registration or execution, an error will occur.

Internal Structure and State

Under the hood, Launch Plans are represented by several core models:

  • LaunchPlanSpec: This model defines the static configuration of a Launch Plan, including its associated workflow_id, default_inputs, fixed_inputs, labels, annotations, auth_role (or security_context), raw_output_data_config, max_parallelism, and overwrite_cache. It represents the desired state of the Launch Plan.
  • LaunchPlanMetadata: Contains metadata related to the execution of the Launch Plan, primarily the schedule and notifications.
  • LaunchPlanClosure: Captures the current state of a Launch Plan on the platform, including its LaunchPlanState (e.g., ACTIVE, INACTIVE) and its expected_inputs and expected_outputs.
  • LaunchPlanState: An enumeration defining the activation status of a Launch Plan. An ACTIVE Launch Plan can be executed or trigger scheduled runs, while an INACTIVE one cannot.

These models are used for serialization and communication with the Flyte backend, ensuring that the Launch Plan's configuration is consistently managed across the system.

Considerations

  • Uniqueness: Named Launch Plans must have unique names within a given project, domain, and version. Attempting to create two named Launch Plans with the same name but different properties will raise an AssertionError.
  • Caching: The LaunchPlan class maintains a cache (LaunchPlan.CACHE) to store created Launch Plans. This prevents redundant object creation and ensures consistency when get_or_create is called multiple times for the same Launch Plan.
  • AuthRole vs. SecurityContext: AuthRole is deprecated. Always prefer SecurityContext for defining execution identity.
  • max_parallelism: This parameter controls the maximum number of task nodes that can run concurrently within a workflow execution. It helps manage resource consumption and fairness. Note that MapTasks are treated as a single unit, and their internal parallelism is managed separately.