Automated Scheduling & Triggers
Automate task execution in flyte-sdk by associating a Trigger with your task via the @env.task decorator. Triggers allow you to run tasks on a recurring schedule using either standard Cron expressions or fixed-rate intervals.
Using Convenience Constructors
The simplest way to schedule a task is using predefined convenience methods on the Trigger class, such as hourly(), daily(), weekly(), monthly(), or minutely().
from datetime import datetime
import flyte
env = flyte.TaskEnvironment(name="example_app")
# Schedule a task to run every hour
@env.task(triggers=flyte.Trigger.hourly())
def hourly_cleanup(trigger_time: datetime):
print(f"Running cleanup for scheduled time: {trigger_time}")
Defining Custom Cron Schedules
For more complex schedules, use the Cron class. It supports the standard five-field cron format and allows you to specify a timezone to handle daylight saving time (DST) transitions.
# Run every weekday at 9:00 AM Eastern Time
morning_trigger = flyte.Trigger(
name="weekday_morning",
automation=flyte.Cron("0 9 * * 1-5", timezone="America/New_York"),
description="Runs every weekday morning at 9 AM ET"
)
@env.task(triggers=morning_trigger)
def morning_report():
...
Using Fixed-Rate Intervals
If you need a task to run at a consistent interval regardless of the clock time, use the FixedRate class. This is useful for high-frequency tasks or intervals that don't align well with cron boundaries.
# Run every 45 minutes
interval_trigger = flyte.Trigger(
name="frequent_sync",
automation=flyte.FixedRate(interval_minutes=45),
description="Syncs data every 45 minutes"
)
@env.task(triggers=interval_trigger)
def sync_data():
...
Binding Schedule Time to Task Inputs
You can pass the trigger's scheduled execution time into your task by using the flyte.TriggerTime marker in the inputs dictionary. This is particularly useful for tasks that process data for a specific time window.
custom_trigger = flyte.Trigger(
name="data_processor",
automation=flyte.Cron("0 * * * *"),
inputs={
"start_time": flyte.TriggerTime,
"batch_size": 100
}
)
@env.task(triggers=custom_trigger)
def process_batch(start_time: datetime, batch_size: int):
# start_time will be the scheduled time of the trigger
print(f"Processing batch of {batch_size} starting at {start_time}")
Managing Triggers Remotely
Once a task is deployed, you can manage its triggers (pause, resume, or delete) using the flyte.remote.Trigger class. This allows for operational control without redeploying code.
from flyte.remote import Trigger
# Pause a trigger by setting active=False
Trigger.update(name="data_processor", task_name="process_batch", active=False)
# Resume a trigger
Trigger.update(name="data_processor", task_name="process_batch", active=True)
# Delete a trigger entirely
Trigger.delete(name="data_processor", task_name="process_batch")
Troubleshooting and Constraints
- Unique Names: Every trigger associated with a task must have a unique
name. - Description Limits: The
descriptionfield in aTriggeris limited to 255 characters. flyte-sdk automatically truncates descriptions that exceed this limit. - Auto-Activation: By default,
auto_activateis set toTrue. Triggers are activated automatically when the task is deployed viaflyte.deploy(). - DST Transitions: When using
Cronwith a specific timezone, be aware that daylight saving transitions may cause a run to be skipped or duplicated depending on how the local clock shifts. - Deployment Requirement: Triggers are only registered and activated when the task is deployed to a Flyte cluster; they do not run during local execution.