A Dagster schedule submits job runs at a fixed interval.
Name | Description |
---|---|
@schedule | Decorator that defines a schedule that executes according to a given cron schedule. |
ScheduleDefinition | Class for schedules. |
build_schedule_from_partitioned_job | A function that constructs a schedule whose interval matches the partitioning of a partitioned job. |
ScheduleEvaluationContext | The context passed to the schedule definition execution function |
build_schedule_context | A function that constructs a ScheduleEvaluationContext , typically used for testing. |
A schedule is a definition in Dagster that is used to execute a job at a fixed interval. Each time at which a schedule is evaluated is called a tick. The schedule definition can generate run configuration for the job on each tick.
Each schedule:
RunRequest
objects. Each run request launches a run.SkipReason
, which specifies a message which describes why no runs were requested.Dagster includes a scheduler, which runs as part of the dagster-daemon process. Once you have defined a schedule, see the dagster-daemon page for instructions on how to run the daemon in order to execute your schedules.
You define a schedule by constructing a ScheduleDefinition
.
Here's a simple schedule that runs a job every day, at midnight. The cron_schedule
accepts standard cron expressions. It also accepts "@hourly"
, "@daily"
, "@weekly"
, and "@monthly"
if your croniter
dependency's version is >= 1.0.12.
@job def my_job(): ... basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
In order to run schedules for assets, you can build a job that materializes assets and construct a ScheduleDefinition
similarly:
from dagster import AssetSelection, define_asset_job asset_job = define_asset_job("asset_job", AssetSelection.groups("some_asset_group")) basic_schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")
If you want to vary the behavior of your job based on the time it's scheduled to run, you can use the @schedule
decorator, which decorates a function that returns run config based on a provided ScheduleEvaluationContext
.
@op(config_schema={"scheduled_date": str}) def configurable_op(context): context.log.info(context.op_config["scheduled_date"]) @job def configurable_job(): configurable_op() @schedule(job=configurable_job, cron_schedule="0 0 * * *") def configurable_job_schedule(context: ScheduleEvaluationContext): scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") return RunRequest( run_key=None, run_config={ "ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}} }, tags={"date": scheduled_date}, )
If you don't need access to the context parameter, you can omit it from the decorated function.
When you have a partitioned job that's partitioned by time, you can use the build_schedule_from_partitioned_job
function to construct a schedule for it whose interval matches the spacing of partitions in your job.
For example, if you have a daily partitioned job that fills in a date partition of a table each time it runs, you likely want to run that job every day.
The Partitioned Jobs concepts page includes an example of how to define a date-partitioned job. Having defined that job, you can construct a schedule for it using build_schedule_from_partitioned_job
. For example:
from dagster import build_schedule_from_partitioned_job, job @job(config=my_partitioned_config) def do_stuff_partitioned(): ... do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( do_stuff_partitioned, )
The Partitioned Jobs concepts page also includes an example of a date-partitioned asset. You can define a schedule similarly using build_schedule_from_partitioned_job
:
from dagster import define_asset_job partitioned_asset_job = define_asset_job( "partitioned_job", selection="*", partitions_def=HourlyPartitionsDefinition(start_date="2022-05-31", fmt="%Y-%m-%d"), ) asset_partitioned_schedule = build_schedule_from_partitioned_job( partitioned_asset_job, )
Each schedule tick of a partitioned job fills in the latest partition in the partition set that exists as of the tick time. Note that this implies that when the schedule submits a run on a particular day, it will typically be for the partition whose key corresponds to the previous day. For example, the schedule will fill in the 2020-04-01
partition on 2020-04-02
. That's because each partition corresponds to a time window. The key of the partition is the start of the time window, but the partition isn't included in the list until its time window has completed. Waiting until the time window has finished before Kicking off a run means the run can process data from within that entire time window.
However, you can use the end_offset
parameter of @daily_partitioned_config
to change which partition is the most recent partition that is filled in at each schedule tick. Setting end_offset
to 1
will extend the partitions forward so that the schedule tick that runs on day N
will fill in day N
's partition instead of day N-1
, and setting end_offset
to a negative number will cause the schedule to fill in earlier days' partitions. In general, setting end_offset
to X
will cause the partition that runs on day N
to fill in the partition for day N - 1 + X
. The same holds true for hourly, weekly, and monthly partitioned jobs, for their respective partition sizes.
You can use the minute_of_hour
, hour_of_day
, day_of_week
, and day_of_month
parameters of build_schedule_from_partitioned_job
to control the timing of the schedule. For example, if you have a job that's partitioned by date, and you set minute_of_hour
to 30
and hour_of_day
to 1
, the schedule would submit the run for partition 2020-04-01
at 1:30 AM on 2020-04-02
.
You can also create a schedule for a static partition. The Partitioned Jobs concepts page also includes an example of how to define a static partitioned job. To define a schedule for a static partitioned job, we will construct a schedule from scratch, rather than using a helper function like build_schedule_from_partitioned_job
this will allow more flexibility in determining which partitions should be run by the schedule.
For example, if we have the continents static partitioned job from the Partitioned Jobs concept page
from dagster import job, op, static_partitioned_config CONTINENTS = [ "Africa", "Antarctica", "Asia", "Europe", "North America", "Oceania", "South America", ] @static_partitioned_config(partition_keys=CONTINENTS) def continent_config(partition_key: str): return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}} @op(config_schema={"continent_name": str}) def continent_op(context): context.log.info(context.op_config["continent_name"]) @job(config=continent_config) def continent_job(): continent_op()
We can write a schedule that will run this partition
from dagster import schedule @schedule(cron_schedule="0 0 * * *", job=continent_job) def continent_schedule(): for c in CONTINENTS: request = continent_job.run_request_for_partition(partition_key=c, run_key=c) yield request
Or a schedule that will run a subselection of the partition
@schedule(cron_schedule="0 0 * * *", job=continent_job) def antarctica_schedule(): request = continent_job.run_request_for_partition( partition_key="Antarctica", run_key=None ) yield request
You can customize the timezone in which your schedule executes by setting the execution_timezone
parameter on your schedule to any tz timezone. Schedules with no timezone set run in UTC.
For example, the following schedule executes daily at 9AM in US/Pacific time:
my_timezone_schedule = ScheduleDefinition( job=my_job, cron_schedule="0 9 * * *", execution_timezone="US/Pacific" )
The @schedule
decorator accepts the same argument. Schedules from partitioned jobs execute in the timezone defined on the partitioned config.
Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not exist for every scheduled interval. For example, say you have a daily schedule with an execution time of 2:30 AM in the US/Eastern timezone. On 2019/03/10, the time jumps from 2:00 AM to 3:00 AM when Daylight Savings Time begins. Therefore, the time of 2:30 AM did not exist for the day.
If you specify such an execution time, Dagster runs your schedule at the next time that exists. In the previous example, the schedule would run at 3:00 AM.
It's also possible to specify an execution time that exists twice on one day every year. For example, on 2019/11/03 in US/Eastern time, the hour from 1:00 AM to 2:00 AM repeats, so a daily schedule running at 1:30 AM has two possible times in which it could execute. In this case, Dagster will execute your schedule at the latter of the two possible times.
Hourly schedules will be unaffected by daylight savings time transitions - the schedule will continue to run exactly once every hour, even as the timezone changes. In the example above where the hour from 1:00 AM to 2:00 AM repeats, an hourly schedule running at 30 minutes past the hour would run at 12:30 AM, both instances of 1:30 AM, and then proceed normally from 2:30 AM on.
In order for a schedule to run, it must be started. You can start or stop the schedule in Dagit, or with the dagster schedule start
and dagster schedule stop
commands. You can also start your schedule by setting the default status to DefaultScheduleStatus.RUNNING
in code:
my_running_schedule = ScheduleDefinition( job=my_job, cron_schedule="0 9 * * *", default_status=DefaultScheduleStatus.RUNNING )
If you manually start or stop a schedule in Dagit, that will override any default status that is set in code.
Once your schedule is started, if you're running the dagster-daemon process as part of your deployment, the schedule will begin executing immediately, without needing to restart the dagster-daemon process. See the Troubleshooting section below if your schedule has been started but isn't submitting runs.
To test a function decorated by the @schedule
decorator, you can invoke the schedule definition like it's a regular Python function. The invocation will return run config, which can then be validated using the validate_run_config
function. Below is a test for the configurable_job_schedule
that we defined in an earlier section.
It uses build_schedule_context
to construct a ScheduleEvaluationContext
to provide for the context
parameter.
from dagster import build_schedule_context, validate_run_config def test_configurable_job_schedule(): context = build_schedule_context( scheduled_execution_time=datetime.datetime(2020, 1, 1) ) run_request = configurable_job_schedule(context) assert validate_run_config(configurable_job, run_request.run_config)
If your @schedule
-decorated function doesn't have a context parameter, you don't need to provide one when invoking it.
For more examples of schedules, check out the following in our Hacker News example:
Try these steps if you're trying to run a schedule and are running into problems.
The left-hand navigation bar in Dagit shows all of the schedules for the currently-selected repository, with a green dot next to each schedule that is running. Make sure that your schedule appears in this list with a green dot. To ensure that Dagit has loaded the latest version of your schedule code, you can press the reload button next to your repository name to reload all the code in your repository.
When you click on your schedule name in the left-hand nav in Dagit, you'll be take to a page where you can view more information about the schedule. If the schedule is running, there should be a "Next tick" row near the top of the page that tells you when the schedule is expected to run next. Make sure that time is what you expect (including the timezone).
It's possible that the dagster-daemon
process that submits runs for your schedule is not working correctly. If you haven't set up dagster-daemon
yet, check the Deploying Dagster section to find the steps to do so.
First, check that the daemon is running. Click on "Status" in the left nav in Dagit, and examine the "Scheduler" row under the "Daemon statuses" section. The daemon process periodically sends out a heartbeat from the scheduler, so if the scheduler daemon is listed as "Not running", that indicates that there's a problem with your daemon deployment. If the daemon ran into an error that caused it to throw an exception, that error will often appear in this UI as well.
If there isn't a clear error on this page, or if the daemon should be sending heartbeats but isn't, you may need to check the logs from the daemon process. The steps to do this will depend on your deployment - for example, if you're using Kubernetes, you'll need to get the logs from the pod that's running the daemon. You should be able to search those logs for the name of your schedule (or SchedulerDaemon
to see all logs associated with the scheduler) to gain an understanding of what's going wrong.
If the daemon output shows errors about not being able to find the schedule, make sure the daemon is using the same workspace.yaml
file as Dagit. The daemon does not need to restart in order to pick up changes to the workspace.yaml
file.
Finally, it's possible that the daemon is running correctly, but there's a problem with your schedule code. Check the "Latest tick" row on the page for your schedule. If there was an error while trying to submit runs for your schedule, there should be a red "Failure" box next to the time. Clicking on the box should display an error with a stack trace showing you why the schedule couldn't execute. If the schedule is working as expected, it should display a blue box instead with information about any runs that were created by that schedule tick.
If these steps didn't help and your schedule still isn't running, reach out in Slack or file an issue and we'll be happy to help investigate.