Source code for dagster_duckdb.io_manager

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Generic, Optional, Sequence, Type, TypeVar

import duckdb

from dagster import Field, IOManager, IOManagerDefinition, InputContext, OutputContext
from dagster import _check as check
from dagster import io_manager
from dagster._seven.temp_dir import get_system_temp_directory
from dagster._utils.backoff import backoff

T = TypeVar("T")


[docs]class DbTypeHandler(ABC, Generic[T]): @abstractmethod def handle_output( self, context: OutputContext, obj: T, conn: duckdb.DuckDBPyConnection, base_path: str ): """Stores the given object at the provided filepath.""" @abstractmethod def load_input(self, context: InputContext, conn: duckdb.DuckDBPyConnection) -> T: """Loads the return of the query as the correct type.""" @abstractmethod def _get_path(self, context: OutputContext, base_path: str) -> Path: """Creates the path where the file should be stored.""" @property def supported_input_types(self) -> Sequence[Type]: pass @property def supported_output_types(self) -> Sequence[Type]: pass def _schema(self, context, output_context: OutputContext): if context.has_asset_key: # the schema is the second to last component of the asset key, e.g. # AssetKey([database, schema, tablename]) return context.asset_key.path[-2] else: # for non-asset outputs, the schema should be specified as metadata, or will default to public context_metadata = output_context.metadata or {} return context_metadata.get("schema", "public") def _table(self, context, output_context: OutputContext): if context.has_asset_key: # the table is the last component of the asset key, e.g. # AssetKey([database, schema, tablename]) return context.asset_key.path[-1] else: # for non-asset outputs, the table is the output name return output_context.name def _table_path(self, context, output_context: OutputContext): return f"{self._schema(context, output_context)}.{self._table(context, output_context)}"
class DuckDBIOManager(IOManager): """Stores data in csv files and creates duckdb views over those files.""" def __init__(self, base_path, type_handlers: Sequence[DbTypeHandler]): self._base_path = base_path self._output_handlers_by_type: Dict[Optional[Type], DbTypeHandler] = {} self._input_handlers_by_type: Dict[Optional[Type], DbTypeHandler] = {} for type_handler in type_handlers: for handled_output_type in type_handler.supported_output_types: check.invariant( handled_output_type not in self._output_handlers_by_type, "DuckDBIOManager provided with two handlers for the same type. " f"Type: '{handled_output_type}'. Handler classes: '{type(type_handler)}' and " f"'{type(self._output_handlers_by_type.get(handled_output_type))}'.", ) self._output_handlers_by_type[handled_output_type] = type_handler for handled_input_type in type_handler.supported_input_types: check.invariant( handled_input_type not in self._input_handlers_by_type, "DuckDBIOManager provided with two handlers for the same type. " f"Type: '{handled_input_type}'. Handler classes: '{type(type_handler)}' and " f"'{type(self._input_handlers_by_type.get(handled_input_type))}'.", ) self._input_handlers_by_type[handled_input_type] = type_handler def handle_output(self, context: OutputContext, obj: object): if obj is not None: # if this is a dbt output, then the value will be None obj_type = type(obj) check.invariant( obj_type in self._output_handlers_by_type, f"DuckDBIOManager does not have a handler that supports outputs of type '{obj_type}'. Has handlers " f"for types '{', '.join([str(handler_type) for handler_type in self._output_handlers_by_type.keys()])}'", ) conn = self._connect_duckdb(context).cursor() # type handler will store the output in duckdb self._output_handlers_by_type[obj_type].handle_output( context, obj=obj, conn=conn, base_path=self._base_path ) conn.close() def load_input(self, context: InputContext): obj_type = context.dagster_type.typing_type check.invariant( obj_type in self._input_handlers_by_type, f"DuckDBIOManager does not have a handler that supports inputs of type '{obj_type}'. Has handlers " f"for types '{', '.join([str(handler_type) for handler_type in self._input_handlers_by_type.keys()])}'", ) check.invariant( not context.has_asset_partitions, "DuckDBIOManager can't load partitioned inputs" ) conn = self._connect_duckdb(context).cursor() ret = self._input_handlers_by_type[obj_type].load_input(context, conn=conn) conn.close() return ret def _connect_duckdb(self, context): return backoff( fn=duckdb.connect, retry_on=(RuntimeError, duckdb.IOException), kwargs={"database": context.resource_config["duckdb_path"], "read_only": False}, )
[docs]def build_duckdb_io_manager(type_handlers: Sequence[DbTypeHandler]) -> IOManagerDefinition: """ Builds an IO manager definition that reads inputs from and writes outputs to DuckDB. Note that the DuckDBIOManager cannot load partitioned assets. Args: type_handlers (Sequence[DbTypeHandler]): Each handler defines how to translate between DuckDB tables and an in-memory type - e.g. a Pandas DataFrame. Returns: IOManagerDefinition Examples: .. code-block:: python from dagster_duckdb import build_duckdb_io_manager from dagster_duckdb_pandas import DuckDBPandasTypeHandler duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()]) @job(resource_defs={'io_manager': duckdb_io_manager}) def my_job(): ... You may configure the returned IO Manager as follows: .. code-block:: YAML resources: io_manager: config: base_path: path/to/store/files # all data will be stored at this path duckdb_path: path/to/database.duckdb # path to the duckdb database """ @io_manager(config_schema={"base_path": Field(str, is_required=False), "duckdb_path": str}) def duckdb_io_manager(init_context): """IO Manager for storing outputs in a DuckDB database Supports storing and loading Pandas DataFrame objects. Converts the DataFrames to CSV and creates DuckDB views over the files. Assets will be stored in the schema and table name specified by their AssetKey. Subsequent materializations of an asset will overwrite previous materializations of that asset. Op outputs will be stored in the schema specified by output metadata (defaults to public) in a table of the name of the output. """ return DuckDBIOManager( base_path=init_context.resource_config.get("base_path", get_system_temp_directory()), type_handlers=type_handlers, ) return duckdb_io_manager