This page is a whirlwind tour of Dagster's core concepts and how to use them to build and operate your data pipelines.
The most common way to build a data pipeline in Dagster is with software-defined assets. With software-defined assets, you define your data pipeline in terms of the data assets that it produces and the data dependencies between those assets. A data asset is an object in persistent storage, such as a table, file, or persisted machine learning model.
You create a software-defined asset by writing Python code. That code specifies:
You can also load asset definitions in bulk from systems like dbt.
Here's an example of some code that defines a set of software-defined assets, and the visualization of those assets in Dagster's web UI:
from dagster import asset from pandas import DataFrame, read_html, get_dummies from sklearn.linear_model import LinearRegression as Regression @asset def country_stats() -> DataFrame: df = read_html("https://tinyurl.com/mry64ebh")[0] df.columns = ["country", "continent", "pop_change"] df["pop_change"] = df["pop_change"].str.rstrip("%").astype("float") return df @asset def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) dummies = get_dummies(data[["continent"]]) return Regression().fit(dummies, data["pop_change"]) @asset def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() result["pop_change_factor"] = change_model.coef_ return result
Once you have a graph of assets, you can launch runs that materialize individual assets or sets of assets. Materializing an asset means computing its contents and writing them to persistent storage.
How software-defined assets relate to other Dagster concepts:
In some situations, it's doesn't make sense to define your pipelines purely in terms of software-defined assets, and you want a lower-level API that gives you more control. For example:
In those cases, you can define ops and connect those ops together in op graphs. Ops are basically functions on steroids. Op graphs hook up the outputs of ops to the inputs of other ops.
Here's an example of some code that defines a graph of ops, and the visualization of that graph in Dagster's web UI:
from dagster import graph, op from dagster_snowflake import snowflake_resource from dagster_slack import slack_resource from .config import SLACK_CONFIG, SNOWFLAKE_CONFIG @op(required_resource_keys={"snowflake"}) def find_stale_tables(context) -> list[str]: return [ record[0] for record in context.resources.snowflake.execute_query( """select table_schema || '.' || table_name from information_schema.tables where last_altered > dateadd(day, -30, current_timestamp)""", fetch_results=True, ) ] @op(required_resource_keys={"slack"}) def post_stale_tables(context, tables: list[str]) -> None: context.resources.slack.chat_postMessage( channel="#stale-tables", text="Stale tables:\\n" + "\\n- ".join(tables) ) @graph def report_stale_tables(): post_stale_tables(find_stale_tables())
If you're using software-defined assets, you're also using ops, just usually not directly. Each software-defined asset has an op or graph that's responsible for computing its contents.
How ops and graphs relate to other Dagster concepts:
Jobs are units of bulk execution and monitoring.
If you’re building pipelines with software defined assets, you can write jobs that each point to a selection of assets. Launching a job will materialize all the assets it points to.
from dagster import AssetSelection, define_asset_job my_job = define_asset_job("my_job", selection=AssetSelection.assets(asset1, asset2))
If you're using op graphs, you can write a job that contains a graph of ops. Launching the job will execute all the ops. Think of ops and graphs like functions and jobs like scripts: to run an op or graph, you have to invoke it inside a Python process.
You can run jobs from the command line, from Dagster's web UI, or automatically with schedules or sensors.
In Dagster's web UI you can see the history of all the runs for each job:
How jobs relate to other Dagster concepts:
A partitioned asset is an asset that's composed of a set of partitions, which can be materialized and tracked independently. For example, the asset might refer to a directory in S3 where each file inside it corresponds to a date. You can launch runs that materialize individual partitions, which would mean creating or overwriting one of those date files.
You can launch backfills over partitioned assets: a backfill is a collection of runs that each target one partition.
Here's an example of a partitioned asset:
import pandas as pd from dagster import DailyPartitionsDefinition, asset @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")) def my_daily_partitioned_asset(context) -> pd.DataFrame: partition_date_str = context.asset_partition_key_for_output() return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
and a representation in Dagit of which partitions have been materialized:
Schedules and sensors let you launch runs automatically and repeatedly. Schedules launch runs on fixed intervals. Sensors let you write code that determines whether runs should be launched - usually in response to observing that new data has arrived or something about the world outside Dagster has changed.
Here's an example of how you'd schedule a set of assets to be materialized every day:
from dagster import AssetSelection, ScheduleDefinition, define_asset_job my_schedule = ScheduleDefinition( job=define_asset_job("orders_job", selection=AssetSelection.assets(raw_orders, orders)) )
Here's an example of how you'd materialize a set when new upstream data arrives:
from dagster import AssetSelection, RunRequest, define_asset_job, sensor @sensor(job=define_asset_job("orders_job", selection=AssetSelection.assets(raw_orders, orders))) def my_sensor(): if new_data_has_arrived(): return RunRequest()
How schedulers and sensors relate to other Dagster concepts:
To reduce boilerplate and make pipelines more testable, Dagster encourages separating business logic from I/O.
You can define your assets and ops as "pure" functions that accept data and return data, then separately define I/O managers that know how to write and read that data to and from storage systems like filesystems or data warehouses. The same I/O manager can be shared across many different assets or ops.
Here's an example of using an I/O manager to assign a pair of assets to be stored in a particular directory on the local filesystem:
from dagster import asset, repository, with_resources from pandas import DataFrame snowflake_io_manager = ... s3_io_manager = ... @asset(io_manager_key="warehouse") def events() -> DataFrame: ... @asset(io_manager_key="warehouse") def logins(events) -> DataFrame: ... @asset(io_manager_key="object_store") def login_bot_classifier(logins) -> DataFrame: ... @repository def my_repo(): return with_resources( [events, logins, login_bot_classifier], {"warehouse": snowflake_io_manager, "object_store": s3_io_manager}, )
Dagster doesn't require that you use IO managers: you can also write assets and ops as "impure" functions that have side effects and don't accept or return any values.
How I/O managers relate to other Dagster concepts:
Resources help you model external systems that your ops or software-defined assets interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.
Resources enable a simple form of dependency injection: you can define software-defined assets and ops without specifying the implementations of the resources that they interact with. It then becomes easy to vary the resources in different environments. For example, in your production environment, you could connect to your production Slack instance, in your staging environment, you could connect to a staging Slack instance, and in unit tests, you could use a mock Slack instance.
How resources relate to other Dagster concepts:
Dagster repositories (not to be confused with git repositories) and workspaces are how you tell Dagster how to execute your code.
To do its job, Dagster needs to execute the Python code you write. It does this in two settings:
In Dagster, a repository is a Python object, defined at module scope, that holds a set of asset, job, schedule, and sensor definitions. When you launch Dagster's web UI or scheduler daemon, you tell it the Python module and attribute name within that module of every Dagster repository that you want it to know about.
Different Dagster repositories typically correspond to different Docker containers. This allows you to avoid having all your pipelines rely on the same set of Python dependencies.
Here's an example of defining a Dagster repository with a set of assets, jobs, and schedules:
from dagster import load_all_assets_from_current_module, repository @repository def repo(): return [load_all_assets_from_current_module(), job1, job2, schedule1, sensor1]
A workspace is a collection of all the repositories that are loaded within a single Dagster deployment.