Using Airbyte with Dagster#

Dagster can orchestrate your Airbyte connections, making it easy to chain an Airbyte sync with upstream or downstream steps in your workflow.

This guide focuses on how to work with Airbyte connections using Dagster's software-defined asset (SDA) framework.

Screenshot of the Airbyte UI in a browser, showing the connection ID in the URL.

Airbyte connections and Dagster software-defined assets#

An Airbyte connection defines a series of data streams which are synced between a source and a destination. During a sync, a replica of the data from each data stream is written to the destination, typically as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:

  • Visualize the streams involved in an Airbyte connection and execute a sync from Dagster
  • Define downstream computations which depend on replicas produced by Airbyte
  • Track historical metadata and logs for each data stream
  • Track data lineage through Airbyte and other tools

Loading Airbyte asset definitions into Dagster#

The easiest way to get started using Airbyte with Dagster is to have Dagster automatically generate asset defintions from your Airbyte project. This can be done in one of two ways:

You can also manually-build asset definitions on a per-connection basis.

Loading Airbyte asset definitions from YAML config#

To load Airbyte assets into Dagster from a set of YAML configuration files, specify the Octavia project directory, which contains the sources, destinations, and connections subfolders. This is the directory where you first ran octavia init. Here, the YAML files are treated as the source of truth for building Dagster assets.

from dagster_airbyte import load_assets_from_airbyte_project

airbyte_assets = load_assets_from_airbyte_project(
    project_dir="path/to/airbyte/project"
)

The load_assets_from_airbyte_project function parses the YAML metadata, generating a set of software-defined assets which reflect each of the data streams synced by your connections. Each connection has an associated op which triggers a sync of that connection.

Adding a resource#

Assets loaded from Airbyte require an airbyte_resource, which defines how to connect and interact with your Airbyte instance.

You can configure this resource and add it to your Airbyte assets by doing the following:

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_project

from dagster import with_resources

airbyte_assets = with_resources(
    load_assets_from_airbyte_project(project_dir="path/to/airbyte/project"),
    {
        "airbyte": airbyte_resource.configured(
            {
                "host": "localhost",
                "port": "8000",
            }
        )
    },
)

Loading Airbyte asset definitions from an Airbyte instance#

To load Airbyte assets into Dagster from a live Airbyte instance, you will need to configure an Airbyte resource which defines how to connect to that instance. Here, the Airbyte instance is treated as the source of truth.

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance

airbyte_instance = airbyte_resource.configured(
    {
        "host": "localhost",
        "port": "8000",
    }
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

The load_assets_from_airbyte_instance function retrieves all of the connections you have defined in the Airbyte interface, creating software-defined assets for each data stream. Each connection has an associated op which triggers a sync of that connection.

You don't need to use with_resources to bind an Airbyte resource to the assets produced by load_asset_from_airbyte_instance; the Airbyte instance you supply to the function will automatically be bound to them.

Manually building Airbyte asset definitions#

Instead of having Dagster automatically create the asset defintions for your Airbyte instance, you can opt to individually build them. First, determine the connection IDs for each of the connections you would like to build assets for. The connection ID can be seen in the URL of the connection page when viewing the Airbyte UI.

Screenshot of the Airbyte UI in a browser, showing the connection ID in the URL.

Then, supply the connection ID and the list of tables which the connection creates in the destination to build_airbyte_assets:

from dagster_airbyte import build_airbyte_assets

airbyte_assets = build_airbyte_assets(
    connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
    destination_tables=["releases", "tags", "teams"],
)

Adding a resource#

Manually built Airbyte assets require an airbyte_resource, which defines how to connect and interact with your Airbyte instance.

You can configure this resource and add it to your Airbyte assets by doing the following:

from dagster_airbyte import airbyte_resource, build_airbyte_assets

from dagster import with_resources

airbyte_assets = with_resources(
    build_airbyte_assets(
        connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
        destination_tables=["releases", "tags", "teams"],
    ),
    {
        "airbyte": airbyte_resource.configured(
            {
                "host": "localhost",
                "port": "8000",
            }
        )
    },
)

Scheduling Airbyte syncs#

Once you have Airbyte assets, you can define a job that runs some or all of these assets on a schedule, triggering the underlying Airbyte sync:

from dagster import ScheduleDefinition, define_asset_job, repository, AssetSelection

run_everything_job = define_asset_job("run_everything", selection="*")

# only my_airbyte_connection
run_specific_connection_job = define_asset_job(
    "run_specific_connection", AssetSelection.groups("my_airbyte_connection")
)

@repository
def my_repo():
    return [
        airbyte_assets,
        ScheduleDefinition(
            job=run_specific_connection_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@weekly",
        ),
    ]

Refer to the Schedule documentation for more info on running jobs on a schedule.


Conclusion#

If you find a bug or want to add a feature to the dagster-airbyte library, we invite you to contribute.

If you have questions on using Airbyte with Dagster, we'd love to hear from you:

join-us-on-slack