Graph-Backed Assets#

Basic software-defined assets are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and building a graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries but doesn't require you to link each intermediate value to an asset in persistent storage.

Graph-backed assets are useful if you have an existing graph that produces and consumes assets. Wrapping your graph inside a software-defined asset gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph.

Additionally, graph-backed assets allow you to reuse an existing op across other graph-backed assets and jobs, or within the same graph.


Relevant APIs#

NameDescription
AssetsDefinition.from_graphConstructs an asset given a graph definition.

Defining graph-backed assets#

To define a graph-backed asset, use the from_graph attribute on the AssetsDefinition object. The value returned from the graph that becomes the graph-backed asset will be stored in persistent storage as the asset:

import pandas as pd
from dagster import AssetsDefinition, asset, graph, op


@op(required_resource_keys={"slack"})
def fetch_files_from_slack(context) -> pd.DataFrame:
    files = context.resources.slack.files_list(channel="#random")
    return pd.DataFrame(
        [
            {
                "id": file.get("id"),
                "created": file.get("created"),
                "title": file.get("title"),
                "permalink": file.get("permalink"),
            }
            for file in files
        ]
    )


@op
def store_files(files):
    return files.to_sql(name="slack_files", con=create_db_connection())


@graph
def store_slack_files_in_sql():
    return store_files(fetch_files_from_slack())


graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)

Defining basic dependencies for graph-backed assets#

The from_graph attribute on the AssetsDefinition object infers upstream and downstream asset dependencies from the graph definition provided. In the most simple case when the graph returns a singular output, Dagster infers the name of the graph to be the outputted asset key.

In the example below, Dagster creates an asset with key middle_asset from the middle_asset graph. Just like assets defined via @asset, each argument to the decorated graph function is an upstream asset name. middle_asset depends on upstream_asset, and downstream_asset depends on middle_asset:

from dagster import AssetsDefinition, asset, graph


@asset
def upstream_asset():
    return 1


@graph
def middle_asset(upstream_asset):
    return add_one(upstream_asset)


middle_asset = AssetsDefinition.from_graph(middle_asset)


@asset
def downstream_asset(middle_asset):
    return middle_asset + 1

When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, two_assets_graph accepts upstream_asset and outputs two assets, first_asset and second_asset:

from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
    one, two = two_outputs(upstream_asset)
    return {"first_asset": one, "second_asset": two}


two_assets = AssetsDefinition.from_graph(two_assets_graph)

Defining explicit dependencies for graph-backed assets#

You can also define dependencies for graph-backed assets explicitly via the asset_keys_by_input_name and asset_keys_by_output_name arguments to from_graph:

from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
    one, two = two_outputs(zero)
    return {"one": one, "two": two}


explicit_deps_asset = AssetsDefinition.from_graph(
    return_one_and_two,
    keys_by_input_name={"zero": AssetKey("upstream_asset")},
    keys_by_output_name={
        "one": AssetKey("asset_one"),
        "two": AssetKey("asset_two"),
    },
)

Advanced: Subsetting graph-backed assets#

By default, when executing a graph-backed asset, every asset produced by the graph must be materialized. This means that attempting to selectively execute a subset of assets defined in the graph-backed asset will result in an error.

If the underlying computation is sufficiently flexible to selectively output a subset of assets, a graph-backed asset can be subsetted. For example, let’s say we wanted to define a graph-backed asset with the structure depicted in the image below. In this case, we want to independently materialize foo_asset and baz_asset.

Graph-backed asset

In order to selectively output an asset from a graph-backed asset, Dagster will run each op that is a dependency of the outputted asset. In the example, if we wanted to selectively materialize foo_asset, Dagster would run foo and bar. If we wanted to selectively materialize baz_asset, Dagster would run all three ops (foo, bar, and baz).

Because the foo op yields an asset output (foo_asset) and is an upstream dependency of another asset generated from the graph (baz_asset), we need to structure foo to selectively return outputs depending on the asset subset selected for execution. We can do this by defining foo to have optional outputs that are yielded conditionally. Dagster provides a context.selected_output_names object on the op context that will return the outputs necessary to generate the asset subset.

During execution, if we select just baz_asset for materialization, the below implementation of foo will return {"foo_2"} for context.selected_output_names, preventing foo_asset from being materialized.

@op(out={"foo_1": Out(is_required=False), "foo_2": Out(is_required=False)})
def foo(context, bar_1):
    # Selectively returns outputs based on selected assets
    if "foo_1" in context.selected_output_names:
        yield Output(bar_1 + 1, output_name="foo_1")
    if "foo_2" in context.selected_output_names:
        yield Output(bar_1 + 2, output_name="foo_2")

Because Dagster flattens each graph into a flat input/output mapping between ops under the hood, any op that produces an output of the graph must be structured to yield its outputs optionally, enabling the outputs to be returned independently.

In the example, foo and baz produce outputs of my_graph. Subsequently, their outputs need to be yielded optionally. Because foo yields multiple outputs, we must structure our code to conditionally yield its outputs like in the code snippet above.

However, because baz only yields a singular output, Dagster will only run baz when its asset output baz_asset is selected. So, we don’t have to structure baz to return an optional output. Because bar does not yield any outputs that are returned from my_graph, its outputs do not have to be selectively returned.

We could define the asset using the code below. Notice that can_subset must be set to True in the asset definition to signify that the graph-backed asset can be subsetted.

@op(out={"bar_1": Out(), "bar_2": Out()})
def bar():
    return 1, 2


@op
def baz(foo_2, bar_2):
    return foo_2 + bar_2


@graph(out={"foo_asset": GraphOut(), "baz_asset": GraphOut()})
def my_graph():
    bar_1, bar_2 = bar()
    foo_1, foo_2 = foo(bar_1)
    return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)}


@repository
def my_repo():
    return [
        define_asset_job("graph_asset"),
        AssetsDefinition.from_graph(my_graph, can_subset=True),
    ]

Depending on how outputs are returned from the ops within a graph-backed asset, there could be unexpected materializations. For example, the following foo implementation would unexpectedly materialize foo_asset if baz_asset was the only asset selected for execution.

@op(out={"foo_1": Out(), "foo_2": Out()})
def foo():
    return 1, 2


# Will unexpectedly materialize foo_asset
my_repo.get_job("graph_asset").execute_in_process(
    asset_selection=[AssetKey("baz_asset")]
)

This is because the foo op is an upstream dependency of baz_asset, and this implementation of foo returns both the foo_1 and foo_2 outputs. The foo_1 output is returned as the foo_asset output of the graph, causing an unexpected materialization of foo_asset.