Skip to main content

Automated Triggers

Automated triggers in flyte-sdk allow you to schedule task executions using cron expressions or fixed intervals. You can define triggers directly in your code using decorators or manage them programmatically via the remote client.

Scheduling Tasks with Decorators

The most common way to automate a task is by passing a Trigger object to the triggers parameter of the @env.task decorator.

import flyte
from flyte import env
from datetime import datetime

# Define a trigger that runs every hour
hourly_trigger = flyte.Trigger(
name="hourly-cleanup",
automation=flyte.Cron("0 * * * *"),
description="Runs at the start of every hour",
inputs={"threshold": 0.5}
)

@env.task(triggers=[hourly_trigger])
def cleanup_task(threshold: float):
print(f"Cleaning up with threshold: {threshold}")

Using TriggerTime Sentinel

If your task needs to know the exact time it was scheduled to run, use the flyte.TriggerTime sentinel in the inputs dictionary. This binds the scheduled execution time to a specific task input.

@env.task(
triggers=[
flyte.Trigger(
name="daily-report",
automation=flyte.Cron("0 0 * * *"),
inputs={"report_date": flyte.TriggerTime}
)
]
)
def generate_report(report_date: datetime):
print(f"Generating report for {report_date.isoformat()}")

Convenience Methods

flyte-sdk provides several class methods on Trigger for common schedules. These methods optionally accept a trigger_time_input_key to automatically bind the execution time.

# Runs every hour and passes the time to the 'execution_time' argument
@env.task(triggers=[flyte.Trigger.hourly(trigger_time_input_key="execution_time")])
def hourly_task(execution_time: datetime):
...

# Runs daily at midnight
@env.task(triggers=[flyte.Trigger.daily()])
def daily_task():
...

Available convenience methods include:

  • Trigger.minutely()
  • Trigger.hourly()
  • Trigger.daily()
  • Trigger.weekly()
  • Trigger.monthly()

Schedule Types

flyte-sdk supports two primary automation types: Cron and FixedRate.

Cron Schedules

Cron uses the standard five-field format and supports timezones.

# Run every weekday at 9:00 AM New York time
nyc_trigger = flyte.Trigger(
name="weekday-9am-nyc",
automation=flyte.Cron("0 9 * * 1-5", timezone="America/New_York")
)

FixedRate Schedules

FixedRate runs at a consistent interval (in minutes) regardless of the clock time.

# Run every 45 minutes
interval_trigger = flyte.Trigger(
name="every-45-mins",
automation=flyte.FixedRate(interval_minutes=45)
)

Managing Triggers Remotely

You can use flyte.remote.Trigger to manage triggers on a deployed Flyte cluster. This is useful for programmatically pausing, resuming, or deleting automations without redeploying code.

Deploying a Trigger Programmatically

If a task is already deployed, you can attach a new trigger to it using Trigger.create.

from flyte.remote import Trigger
import flyte

# Define the specification
spec = flyte.Trigger(
name="dynamic-trigger",
automation=flyte.FixedRate(120)
)

# Deploy it for an existing task
remote_trigger = Trigger.create(
trigger=spec,
task_name="my_project.my_domain.my_task",
task_version="v1" # Optional, defaults to latest
)

Pausing and Resuming Triggers

Use Trigger.update to toggle the active state of a trigger.

# Pause a trigger
Trigger.update(name="hourly-cleanup", task_name="my_task", active=False)

# Resume a trigger
Trigger.update(name="hourly-cleanup", task_name="my_task", active=True)

Listing and Deleting Triggers

You can list all triggers for a specific task or delete them by name.

# List all triggers for a task
for trigger in Trigger.listall(task_name="my_task"):
print(f"Trigger: {trigger.name}, Active: {trigger.is_active}")

# Delete a trigger
Trigger.delete(name="hourly-cleanup", task_name="my_task")

Troubleshooting and Gotchas

  • Name Uniqueness: Trigger names must be unique within the context of a project, domain, and task.
  • Description Limits: The description field in Trigger is capped at 255 characters. flyte-sdk will automatically truncate longer strings using an internal parser.
  • TriggerTime Compatibility: When using flyte.TriggerTime, ensure the corresponding task input is typed as datetime.datetime.
  • DST Transitions: When using Cron with a specific timezone, be aware that Daylight Saving Time transitions may cause a run to be skipped or executed twice depending on the schedule.
  • Input Validation: If you provide inputs in a Trigger, the keys must match the input parameters defined in the task's interface. If they do not match, Trigger.create will raise a ValueError.