Data Science with Notebooks#

Fast iteration, the literate combination of arbitrary code with markdown blocks, and inline plotting make notebooks an indispensible tool for data science. The Dagstermill package makes it straightforward to run notebooks using the Dagster tools and to integrate them into data jobs with heterogeneous ops: for instance, Spark jobs, SQL statements run against a data warehouse, or arbitrary Python code.

pip install dagstermill

Dagstermill lets you:

  • Run notebooks as ops in heterogeneous data jobs with minimal changes to notebook code
  • Define data dependencies to flow inputs and outputs between notebooks, and between notebooks and other ops
  • Use Dagster resources, and the Dagster config system, from inside notebooks
  • Aggregate notebook logs with logs from other Dagster ops
  • Yield custom materializations and other Dagster events from your notebook code

Our goal is to make it unnecessary to go through a tedious "productionization" process where code developed in notebooks must be translated into some other (less readable and interpretable) format in order to be integrated into production workflows. Instead, we can use notebooks as ops directly, with minimal, incremental metadata declarations to integrate them into jobs that may also contain arbitrary heterogeneous ops.

You can find the code for this example on Github

Notebooks as ops#

Let's consider the classic Iris dataset (1, 2), collected in 1936 by the American botanist Edgar Anderson and made famous by statistician Ronald Fisher. The Iris dataset is a basic example in machine learning because it contains three classes of observation, one of which is straightforwardly linearly separable from the other two, which in turn can only be distinguished by more sophisticated methods.

Like many notebooks, this example does some fairly sophisticated work, producing diagnostic plots and a (flawed) statistical model -- which are then locked away in the .ipynb format, can only be reproduced using a complex Jupyter setup, and are only programmatically accessible within the notebook context.

We can simply turn a notebook into an op using define_dagstermill_op. Once we create an op, we can start to make its outputs more accessible.

import dagstermill as dm

from dagster import job

k_means_iris = dm.define_dagstermill_op(
    "k_means_iris",
    script_relative_path("iris-kmeans.ipynb"),
    output_notebook_name="iris_kmeans_output",
)


@job(
    resource_defs={
        "output_notebook_io_manager": dm.local_output_notebook_io_manager,
    }
)
def iris_classify():
    k_means_iris()

This is the simplest form of notebook integration -- we don't actually have to make any changes in the notebook itself to run it using the Dagster tooling. Just run:

dagit -f iris_classify.py

What's more, every time we run the notebook from Dagit, a copy of the notebook as executed will be written to disk and the path of this output notebook will be made available in Dagit:

iris_output_notebook.png

The output notebook is both a rich log of notebook computations as they actually occurred, including all inline plots and results, and also an important tool for interactive debugging. When a notebook fails, the output notebook can be used to determine the cause of the failure.

Expressing dependencies#

Notebooks often have implicit dependencies on external state like data warehouses, filesystems, and batch processes. For example, even in our simple Iris example we're making assumptions about data that's available locally, in this case the iris.data file:

[2]:
ipython3

iris = pd.read_csv(
    'iris.data',
    ...
)

The ability to express data dependencies between heterogeneous units of computation is core to Dagster, and we can easily make notebooks depend on upstream ops.

We'll illustrate this process by adding a non-notebook op to our job, which will take care of downloading the Iris data from the UCI repository. This is a somewhat contrived example; in practice, your notebook ops are more likely to rely on upstream jobs whose outputs might be handles to tables in the data warehouse or to files on S3, and your notebook will likely handle the task of fetching the data.

import dagstermill as dm

from dagster import In, job
from docs_snippets.legacy.data_science.download_file import download_file

k_means_iris = dm.define_dagstermill_op(
    "k_means_iris",
    script_relative_path("iris-kmeans_2.ipynb"),
    output_notebook_name="iris_kmeans_output",
    ins={"path": In(str, description="Local path to the Iris dataset")},
)


@job(
    resource_defs={
        "output_notebook_io_manager": dm.local_output_notebook_io_manager,
    }
)
def iris_classify():
    k_means_iris(download_file())

We'll configure the download_file op with the URL to download the file from, and the local path at which to save it. This op has one output—the path to the downloaded file.

from urllib.request import urlretrieve

from dagster import Field, Out, String, op


@op(
    name="download_file",
    config_schema={
        "url": Field(String, description="The URL from which to download the file"),
        "path": Field(String, description="The path to which to download the file"),
    },
    out={"path": Out(String, description="The path to which the file was downloaded")},
    description=(
        "A simple utility op that downloads a file from a URL to a path using urllib.urlretrieve"
    ),
)
def download_file(context):
    output_path = script_relative_path(context.op_config["path"])
    urlretrieve(context.op_config["url"], output_path)
    return output_path

We'll want to use this path in place of the hardcoded string when we read the csv in to our notebook:

[2]:
ipython3

iris = pd.read_csv(
    path,
    ...
)

We need to make one more change to our notebook so that the path parameter is injected by the Dagstermill machinery at runtime.

Dagstermill is built on Papermill (3), which uses Jupyter cell tags to identify the cell into which it should inject parameters at runtime. You will need to be running Jupyter 5.0 or later and may need to turn the display of cell tags on (select View > Cell Toolbar > Tags from the Jupyter menu).

tags

Tag the cell you want Dagstermill to replace at runtime with the tag parameters.

In the source notebook., this cell will look like this:

parameters.png

In the source notebook, we can give our parameters values that are useful for interactive development (say, with a test dataset).

Now we are ready to execute a job that flows the output of arbitrary Python code into a notebook:

iris_classify.png

We'll use the following config:

ops:
  download_file:
    config:
      path: "iris.data"
      url: "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"

When we execute this job with Dagit, the parameters cell in the source notebook will be dynamically replaced in the output notebook by a new injected-parameters cell.

The execution log contains the path to the output notebook so that you can access it after execution to examine and potentially debug the output. This path is also displayed in Dagit.

The notebook context#

You'll notice that the injected-parameters cell in your output notebooks defines a variable called context.

This context object mirrors the execution context object that's available in the body of any other op's compute function.

As with the parameters that dagstermill injects, you can also construct a context object for interactive exploration and development by using the dagstermill.get_context API in the tagged parameters cell of your input notebook. When dagstermill executes your notebook, this development context will be replaced with the injected runtime context.

You can use the development context to access op config and resources, to log messages, and to yield results and other Dagster events just as you would in production. When the runtime context is injected by dagstermill, none of your other code needs to change.

For instance, suppose we want to make the number of clusters (the _k_ in k-means) configurable. We'll change our op definition to include a config field:

import dagstermill as dm

from dagster import Field, In, Int, job
from docs_snippets.legacy.data_science.download_file import download_file

k_means_iris = dm.define_dagstermill_op(
    "k_means_iris",
    script_relative_path("iris-kmeans_2.ipynb"),
    output_notebook_name="iris_kmeans_output",
    ins={"path": In(str, description="Local path to the Iris dataset")},
    config_schema=Field(
        Int,
        default_value=3,
        is_required=False,
        description="The number of clusters to find",
    ),
)


@job(
    resource_defs={
        "output_notebook_io_manager": dm.local_output_notebook_io_manager,
    }
)
def iris_classify():
    k_means_iris(download_file())

In our notebook, we'll stub the context as follows (in the parameters cell):

import dagstermill

context = dagstermill.get_context(op_config=3)

Now we can use our config value in our estimator. In production, this will be replaced by the config value provided to the job:

estimator = sklearn.cluster.KMeans(n_clusters=context.op_config)

Results and custom materializations#

If you'd like to yield a result to be consumed downstream of a dagstermill notebook, you can call dagstermill.yield_result with the value of the result and its name. In interactive execution, this is a no-op, so you don't need to change anything when moving from interactive exploration and development to production.

You can also yield custom AssetMaterialization objects (for instance, to tell Dagit where you've saved a plot) by calling dagstermill.yield_event.

References#

  1. Dua, D. and Graff, C. (2019). Iris Data Set. UCI Machine Learning Repository [https://archive.ics.uci.edu/ml/datasets/iris]. Irvine, CA: University of California, School of Information and Computer Science.
  1. Iris flower data set [https://en.wikipedia.org/wiki/Iris_flower_data_set]
  1. nteract/papermill [https://papermill.readthedocs.io/en/latest/]