Automated Triggers
Automated triggers in flyte-sdk allow you to schedule task executions using cron expressions or fixed intervals. You can define triggers directly in your code using decorators or manage them programmatically via the remote client.
Scheduling Tasks with Decorators
The most common way to automate a task is by passing a Trigger object to the triggers parameter of the @env.task decorator.
import flyte
from flyte import env
from datetime import datetime
# Define a trigger that runs every hour
hourly_trigger = flyte.Trigger(
name="hourly-cleanup",
automation=flyte.Cron("0 * * * *"),
description="Runs at the start of every hour",
inputs={"threshold": 0.5}
)
@env.task(triggers=[hourly_trigger])
def cleanup_task(threshold: float):
print(f"Cleaning up with threshold: {threshold}")
Using TriggerTime Sentinel
If your task needs to know the exact time it was scheduled to run, use the flyte.TriggerTime sentinel in the inputs dictionary. This binds the scheduled execution time to a specific task input.
@env.task(
triggers=[
flyte.Trigger(
name="daily-report",
automation=flyte.Cron("0 0 * * *"),
inputs={"report_date": flyte.TriggerTime}
)
]
)
def generate_report(report_date: datetime):
print(f"Generating report for {report_date.isoformat()}")
Convenience Methods
flyte-sdk provides several class methods on Trigger for common schedules. These methods optionally accept a trigger_time_input_key to automatically bind the execution time.
# Runs every hour and passes the time to the 'execution_time' argument
@env.task(triggers=[flyte.Trigger.hourly(trigger_time_input_key="execution_time")])
def hourly_task(execution_time: datetime):
...
# Runs daily at midnight
@env.task(triggers=[flyte.Trigger.daily()])
def daily_task():
...
Available convenience methods include:
Trigger.minutely()Trigger.hourly()Trigger.daily()Trigger.weekly()Trigger.monthly()
Schedule Types
flyte-sdk supports two primary automation types: Cron and FixedRate.
Cron Schedules
Cron uses the standard five-field format and supports timezones.
# Run every weekday at 9:00 AM New York time
nyc_trigger = flyte.Trigger(
name="weekday-9am-nyc",
automation=flyte.Cron("0 9 * * 1-5", timezone="America/New_York")
)
FixedRate Schedules
FixedRate runs at a consistent interval (in minutes) regardless of the clock time.
# Run every 45 minutes
interval_trigger = flyte.Trigger(
name="every-45-mins",
automation=flyte.FixedRate(interval_minutes=45)
)
Managing Triggers Remotely
You can use flyte.remote.Trigger to manage triggers on a deployed Flyte cluster. This is useful for programmatically pausing, resuming, or deleting automations without redeploying code.
Deploying a Trigger Programmatically
If a task is already deployed, you can attach a new trigger to it using Trigger.create.
from flyte.remote import Trigger
import flyte
# Define the specification
spec = flyte.Trigger(
name="dynamic-trigger",
automation=flyte.FixedRate(120)
)
# Deploy it for an existing task
remote_trigger = Trigger.create(
trigger=spec,
task_name="my_project.my_domain.my_task",
task_version="v1" # Optional, defaults to latest
)
Pausing and Resuming Triggers
Use Trigger.update to toggle the active state of a trigger.
# Pause a trigger
Trigger.update(name="hourly-cleanup", task_name="my_task", active=False)
# Resume a trigger
Trigger.update(name="hourly-cleanup", task_name="my_task", active=True)
Listing and Deleting Triggers
You can list all triggers for a specific task or delete them by name.
# List all triggers for a task
for trigger in Trigger.listall(task_name="my_task"):
print(f"Trigger: {trigger.name}, Active: {trigger.is_active}")
# Delete a trigger
Trigger.delete(name="hourly-cleanup", task_name="my_task")
Troubleshooting and Gotchas
- Name Uniqueness: Trigger names must be unique within the context of a project, domain, and task.
- Description Limits: The
descriptionfield inTriggeris capped at 255 characters. flyte-sdk will automatically truncate longer strings using an internal parser. - TriggerTime Compatibility: When using
flyte.TriggerTime, ensure the corresponding task input is typed asdatetime.datetime. - DST Transitions: When using
Cronwith a specifictimezone, be aware that Daylight Saving Time transitions may cause a run to be skipped or executed twice depending on the schedule. - Input Validation: If you provide
inputsin aTrigger, the keys must match the input parameters defined in the task's interface. If they do not match,Trigger.createwill raise aValueError.