Using dbt Cloud with Dagster#

Dagster allows you to run dbt Cloud jobs alongside other technologies. You can schedule them to run on a regular basis, and as part of larger pipelines.

If you're using the dbt CLI, check out Using dbt with Dagster instead!


Running a dbt Cloud job#

To run a dbt Cloud job, you'll need to configure three values:

  • the job_id of the job you want to run
  • the account_id of your dbt Cloud account
  • an auth_token for connecting with the dbt Cloud API

The first two values can be obtained by navigating to the page for your job in the dbt Cloud console, and looking at the URL. For example, in this screenshot, the account_id is 11111, and the job_id is 33333:

Screenshot of the dbt Cloud console on the job page.

The auth_token can also be found or generated in the dbt Cloud console. It's recommended that you use a Service account token for this purpose, and that you store this value in an environment variable, rather than hardcoding its value in your codebase.

Putting it all together, you'll get the following:

from dagster import job
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op

# configure an operation to run the specific job
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
    {"job_id": 33333}, name="run_dbt_nightly_sync"
)

# configure a resource to connect to your dbt Cloud instance
my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 11111}
)

# create a job that uses your op and resource
@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def my_dbt_cloud_job():
    run_dbt_nightly_sync()

Running a dbt Cloud job after another op completes#

The dbt_cloud_run_op has an optional start_after input. If you supply the output of another operation to this input, the dbt Cloud op will not start until that upstream operation successfully completes:

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def my_two_op_job():
    run_dbt_nightly_sync(start_after=another_op())

Scheduling dbt Cloud jobs#

You can run dbt Cloud jobs on a schedule in the same way as any other Dagster job:

from dagster import ScheduleDefinition, repository

@repository
def my_repo():
    return [
        ScheduleDefinition(
            job=my_dbt_cloud_job,
            cron_schedule="@daily",
        ),
    ]

Refer to the Schedule documentation for more info on running jobs on a schedule.


Conclusion#

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt Cloud with Dagster, we'd love to hear from you:

join-us-on-slack