Using dbt Cloud? Check out the dbt Cloud with Dagster guide!
This reference provides a high-level look at working with dbt models through Dagster's software-defined assets framework using the dagster-dbt
integration library.
For a step-by-step implementation walkthrough, refer to the Using dbt with Dagster software-defined assets tutorial.
The dagster-dbt
library offers two methods of loading dbt models from a project into Dagster:
load_assets_from_dbt_project
function, orload_assets_from_dbt_manifest
function to load from an existing dbt manifest.json
fileCheck out part two of the dbt + Dagster tutorial to see this concept in context.
For smaller dbt projects where compilation time is not a concern, the simplest way to load your dbt assets into Dagster is the following:
from dagster_dbt import load_assets_from_dbt_project dbt_assets = load_assets_from_dbt_project(project_dir="path/to/dbt/project")
The load_assets_from_dbt_project
function:
For larger dbt projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json
file using the load_assets_from_dbt_manifest
function:
import json from dagster_dbt import load_assets_from_dbt_manifest dbt_assets = load_assets_from_dbt_manifest( json.load("path/to/dbt/manifest.json", encoding="utf8"), )
Note: if you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.
Check out part three of the dbt + Dagster tutorial to see this concept in context.
Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. The dagster-dbt
integration provides the dbt_cli_resource
for this purpose. This resource can be configured with CLI flags that are passed into every dbt invocation.
The most important flag to set is the project_dir
flag, which points Dagster at the directory of your dbt project. For a full list of configuration options, refer to the dbt_cli_resource
API docs.
You can configure this resource and add it to your dbt assets by doing the following:
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project from dagster import with_resources DBT_PROJECT_PATH = "path/to/dbt_project" dbt_assets = with_resources( load_assets_from_dbt_project(DBT_PROJECT_PATH), { "dbt": dbt_cli_resource.configured( {"project_dir": DBT_PROJECT_PATH}, ) }, )
Once you have your dbt assets, you can define a job that runs some or all of these assets on a schedule:
from dagster import ScheduleDefinition, define_asset_job, repository run_everything_job = define_asset_job("run_everything", selection="*") # only my_model and its children run_something_job = define_asset_job("run_something", selection="my_model*") @repository def my_repo(): return [ dbt_assets, ScheduleDefinition( job=run_something_job, cron_schedule="@daily", ), ScheduleDefinition( job=run_everything_job, cron_schedule="@weekly", ), ]
Refer to the Schedule documentation for more info on running jobs on a schedule.
In Dagster, each asset has an asset key to identify it. Dagster automatically generates these keys for each dbt node in the project as well as the sources for each node.
For models, seeds, and snapshots, the default asset key will be the configured schema for that node (if any), concatenated with the name of the node.
For example, if you have configured a custom schema for a subdirectory in your dbt_project.yml
file:
models: my_project: marketing: +schema: marketing
Then the asset key for a model named some_model
will be marketing/some_model
. If you haven't configured a custom schema, then the asset key will be some_model
.
For sources, the default asset key will be the name of the source concatenated with the name of the source table.
For example, the source table defined in the following sources.yaml
will be jaffle_shop/orders
:
sources: - name: jaffle_shop tables: - name: orders
A common pattern is to use the prefix of an asset key to indicate what database an asset is stored in. For example, you might want all of your assets stored in Snowflake to start with the prefix snowflake
.
To add a prefix to the models generated by your dbt project, you can pass in a key_prefix
argument to either the load_assets_from_dbt_manifest
or load_assets_from_dbt_project
function:
dbt_assets = load_assets_from_dbt_project( "path/to/dbt_project", key_prefix="snowflake", )
Note: The key_prefix
argument only applies to models. If you want to apply a prefix to the source keys that Dagster generates, pass in a source_key_prefix
argument:
dbt_assets = load_assets_from_dbt_project( "path/to/dbt_project", source_key_prefix="snowflake", )
Check out parts two and three of the dbt + Dagster tutorial to see this concept in context.
Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt source, that source will be parsed as an upstream asset.
For example, if you defined a source in your sources.yml
file like this:
sources: - name: jaffle_shop tables: - name: orders
and use it in a model:
select * from {{ source("jaffle_shop", "orders") }} where foo=1
Then the asset created for that model will be given an upstream asset key of jaffle_shop/orders
. In many cases, this upstream asset might also be managed by Dagster.
If you add an asset definition to your repository which produces jaffle_shop/orders
, then this asset will be upstream of your dbt model:
@asset(key_prefix="jaffle_shop") def orders(): return ...
Check out part four of the dbt + Dagster tutorial to see this concept in context.
Dagster allows you to define assets that are downstream of specific dbt models. One property of dbt-based assets is that the external tool - in this case, dbt - handles storing each model in the database internally, rather than Dagster directly storing the tables that are updated.
This means that there's a range of ways to load a dbt model as input to a Python function. For example, you might want to load the contents as a Pandas dataframe or into a PySpark session. You can specify this loading behavior on each downstream asset.
For example, if you wanted to consume a dbt model with the asset key my_dbt_model
as a Pandas dataframe, that would look something like the following:
@asset( ins={"my_dbt_model": AssetIn(input_manager_key="pandas_df_manager")}, ) def my_downstream_asset(my_dbt_model): # my_dbt_model is a Pandas dataframe return my_dbt_model.where(foo="bar")
To materialize your dbt assets, you need to tell Dagster how to handle the assets' inputs and outputs. You can do this using an I/O manager.
The implementation of your I/O manager depends on:
A simple I/O manager implementation that loads data from a dbt-managed table into a Pandas dataframe would look something like the following:
import pandas as pd from dagster import IOManager, io_manager class PandasIOManager(IOManager): def __init__(self, con_string: str): self._con = con_string def handle_output(self, context, obj): # dbt handles outputs for us pass def load_input(self, context) -> pd.DataFrame: """Load the contents of a table as a pandas DataFrame.""" table_name = context.asset_key.path[-1] return pd.read_sql(f"SELECT * FROM {table_name}", con=self._con) @io_manager(config_schema={"con_string": str}) def pandas_io_manager(context): return PandasIOManager(context.resource_config["con_string"])
Once the I/O manager is defined, you can supply it like any other resource when calling with_resources
:
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project from dagster import with_resources dbt_assets = with_resources( load_assets_from_dbt_project(...), { "dbt": dbt_cli_resource.configured( {"project_dir": "path/to/dbt_project"}, ), "pandas_df_manager": pandas_io_manager.configured( {"con_string": "..."}, ), }, )