Op hooks let you define success and failure handling policies on ops.
Name | Description |
---|---|
@failure_hook | The decorator to define a callback on op failure. |
@success_hook | The decorator to define a callback on op success. |
HookContext | The context object available to a hook function. |
build_hook_context | A function for building a HookContext outside of execution, intended to be used when testing a hook. |
A @success_hook
or @failure_hook
decorated function is called an op hook. Op hooks are designed for generic purposes — it can be anything you would like to do at a per op level.
from dagster import HookContext, failure_hook, success_hook @success_hook(required_resource_keys={"slack"}) def slack_message_on_success(context: HookContext): message = f"Op {context.op.name} finished successfully" context.resources.slack.chat_postMessage(channel="#foo", text=message) @failure_hook(required_resource_keys={"slack"}) def slack_message_on_failure(context: HookContext): message = f"Op {context.op.name} failed" context.resources.slack.chat_postMessage(channel="#foo", text=message)
As you may have noticed, the hook function takes one argument, which is an instance of HookContext
. The available properties on this context are:
context.job_name
: the name of the job where the hook is triggered.context.log
: loggerscontext.hook_def
: the hook that the context object belongs to.context.op
: the op associated with the hook.context.op_config
: The config specific to the associated op.context.op_exception
: The thrown exception in the associated failed op.context.op_output_values
: The computed output values of the associated op.context.step_key
: the key for the step where the hook is triggered.context.resources
: the resources the hook can use.context.required_resource_keys
: the resources required by this hook.Dagster provides different ways to trigger op hooks.
For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job.
The @job
decorator accepts hooks
as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job
function. In the below example, we can pass the slack_message_on_failure
hook above in a set as a parameter to @job
. Then, slack messages will be sent when any op in the job fails.
@job(resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure}) def notif_all(): # the hook "slack_message_on_failure" is applied on every op instance within this graph a() b()
When you run this job, you can provide configuration to the slack resource in the run config:
resources: slack: config: token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
or by using the configured API:
@job( resource_defs={ "slack": slack_resource.configured( {"token": "xoxp-1234123412341234-12341234-1234"} ) }, hooks={slack_message_on_failure}, ) def notif_all_configured(): # the hook "slack_message_on_failure" is applied on every op instance within this graph a() b()
Sometimes a job is a shared responsibility or you only want to be alerted on high-priority op executions. So we also provide a way to set up hooks on op instances which enables you to apply policies on a per-op basis.
@job(resource_defs={"slack": slack_resource}) def selective_notif(): # only op "a" triggers hooks: a slack message will be sent when it fails or succeeds a.with_hooks({slack_message_on_failure, slack_message_on_success})() # op "b" won't trigger any hooks b()
In this case, op "b" won't trigger any hooks, while when op "a" fails or succeeds it will send a slack message.
You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context
function.
from dagster import build_hook_context @success_hook(required_resource_keys={"my_conn"}) def my_success_hook(context): context.resources.my_conn.send("foo") def test_my_success_hook(): my_conn = mock.MagicMock() # construct HookContext with mocked ``my_conn`` resource. context = build_hook_context(resources={"my_conn": my_conn}) my_success_hook(context) assert my_conn.send.call_count == 1
In many cases, you might want to know details about an op failure. You can get the exception object thrown in the failed op via the op_exception
property on HookContext
:
from dagster import HookContext, failure_hook import traceback @failure_hook def my_failure_hook(context: HookContext): op_exception: BaseException = context.op_exception # print stack trace of exception traceback.print_tb(op_exception.__traceback__)
Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys
, the body of the hook can access the corresponding resource via the resources
attribute of its context object.
It also enables you to switch resource values in different jobs so that, for example, you can send slack messages only while executing a production job and mock the slack resource while testing.
Because executing a production job and a testing job share the same core of business logic, we can build these jobs from a shared graph. In the GraphDefinition.to_job
method, which builds a job from a graph, you can specify environment-specific hooks and resources.
In this case, we can mock the slack_resource
using a helper function ResourceDefinition.hardcoded_resource()
, so it won't send slack messages during development.
@graph def slack_notif_all(): a() b() notif_all_dev = slack_notif_all.to_job( name="notif_all_dev", resource_defs={ "slack": ResourceDefinition.hardcoded_resource( slack_resource_mock, "do not send messages in dev" ) }, hooks={slack_message_on_failure}, ) notif_all_prod = slack_notif_all.to_job( name="notif_all_prod", resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure}, )
When we switch to production, we can provide the real slack token in the run_config
and therefore enable sending messages to a certain slack channel when a hook is triggered.
resources: slack: config: token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
Then, we can execute a job with the config through Python API, CLI, or the Dagit UI. Here's an example of using the Python API.
if __name__ == "__main__": prod_op_hooks_run_config_yaml = file_relative_path(__file__, "prod_op_hooks.yaml") with open(prod_op_hooks_run_config_yaml, "r", encoding="utf8") as fd: run_config = yaml.safe_load(fd.read()) notif_all_prod.execute_in_process(run_config=run_config, raise_on_error=False)
When you add a hook to a job, the hook will be added to every op in the job individually. The hook does not track job-scoped events and only tracks op-level success or failure events.
You may find the need to set up job-level policies. For example, you may want to run some code for every job failure.
Dagster provides a way to create a sensor that reacts to job failure events. You can find details at Run failure sensor on the Sensors page.