Skip to main content

Automated Scheduling & Triggers

Automate task execution in flyte-sdk by associating a Trigger with your task via the @env.task decorator. Triggers allow you to run tasks on a recurring schedule using either standard Cron expressions or fixed-rate intervals.

Using Convenience Constructors

The simplest way to schedule a task is using predefined convenience methods on the Trigger class, such as hourly(), daily(), weekly(), monthly(), or minutely().

from datetime import datetime
import flyte

env = flyte.TaskEnvironment(name="example_app")

# Schedule a task to run every hour
@env.task(triggers=flyte.Trigger.hourly())
def hourly_cleanup(trigger_time: datetime):
print(f"Running cleanup for scheduled time: {trigger_time}")

Defining Custom Cron Schedules

For more complex schedules, use the Cron class. It supports the standard five-field cron format and allows you to specify a timezone to handle daylight saving time (DST) transitions.

# Run every weekday at 9:00 AM Eastern Time
morning_trigger = flyte.Trigger(
name="weekday_morning",
automation=flyte.Cron("0 9 * * 1-5", timezone="America/New_York"),
description="Runs every weekday morning at 9 AM ET"
)

@env.task(triggers=morning_trigger)
def morning_report():
...

Using Fixed-Rate Intervals

If you need a task to run at a consistent interval regardless of the clock time, use the FixedRate class. This is useful for high-frequency tasks or intervals that don't align well with cron boundaries.

# Run every 45 minutes
interval_trigger = flyte.Trigger(
name="frequent_sync",
automation=flyte.FixedRate(interval_minutes=45),
description="Syncs data every 45 minutes"
)

@env.task(triggers=interval_trigger)
def sync_data():
...

Binding Schedule Time to Task Inputs

You can pass the trigger's scheduled execution time into your task by using the flyte.TriggerTime marker in the inputs dictionary. This is particularly useful for tasks that process data for a specific time window.

custom_trigger = flyte.Trigger(
name="data_processor",
automation=flyte.Cron("0 * * * *"),
inputs={
"start_time": flyte.TriggerTime,
"batch_size": 100
}
)

@env.task(triggers=custom_trigger)
def process_batch(start_time: datetime, batch_size: int):
# start_time will be the scheduled time of the trigger
print(f"Processing batch of {batch_size} starting at {start_time}")

Managing Triggers Remotely

Once a task is deployed, you can manage its triggers (pause, resume, or delete) using the flyte.remote.Trigger class. This allows for operational control without redeploying code.

from flyte.remote import Trigger

# Pause a trigger by setting active=False
Trigger.update(name="data_processor", task_name="process_batch", active=False)

# Resume a trigger
Trigger.update(name="data_processor", task_name="process_batch", active=True)

# Delete a trigger entirely
Trigger.delete(name="data_processor", task_name="process_batch")

Troubleshooting and Constraints

  • Unique Names: Every trigger associated with a task must have a unique name.
  • Description Limits: The description field in a Trigger is limited to 255 characters. flyte-sdk automatically truncates descriptions that exceed this limit.
  • Auto-Activation: By default, auto_activate is set to True. Triggers are activated automatically when the task is deployed via flyte.deploy().
  • DST Transitions: When using Cron with a specific timezone, be aware that daylight saving transitions may cause a run to be skipped or duplicated depending on how the local clock shifts.
  • Deployment Requirement: Triggers are only registered and activated when the task is deployed to a Flyte cluster; they do not run during local execution.