The Run Coordinator is used to control the policy that Dagster uses to manage the set of active runs on your deployment.
The Run Coordinator is invoked when runs are submitted on the instance (e.g. via the GraphQL API, and as a result it can be used to dynamically attach tags to submitted runs.
In this example, we'll perform run attribution, which means that we'll attach a user's email as a tag to submitted runs.
To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's GraphQL server) and parse the headers to get an email which we'll attach as a tag.
In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the Dagster Daemon. To accomplish this, we can use the Queued Run Coordinator in the example below. The context
object available in submit_run
has a get_request_header
method we can use to read HTTP headers:
from dagster._core.run_coordinator import QueuedRunCoordinator, SubmitRunContext from dagster._core.storage.pipeline_run import PipelineRun class CustomRunCoordinator(QueuedRunCoordinator): def submit_run(self, context: SubmitRunContext) -> PipelineRun: desired_header = context.get_request_header(CUSTOM_HEADER_NAME)
Then we can parse the relevant header (in this case, called the jwt_claims_header
) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email.
def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]: if not jwt_claims_header: return None split_header_tokens = jwt_claims_header.split(".") if len(split_header_tokens) < 2: return None decoded_claims_json_str = b64decode(split_header_tokens[1]) try: claims_json = loads(decoded_claims_json_str) return claims_json.get("email") except JSONDecodeError: return None
The above is just an example - you can write any hook which would be useful to you.
Putting this all together, we can use these hooks to dynamically attach tags to submitted pipeline runs. In the following example, we'd read the user's email from the X-Amzn-Oidc-Data
header by using the get_email
hook defined above, and then attach the email as a tag to the pipeline run.
def submit_run(self, context: SubmitRunContext) -> PipelineRun: pipeline_run = context.pipeline_run jwt_claims_header = context.get_request_header("X-Amzn-Oidc-Data") email = self.get_email(jwt_claims_header) if email: self._instance.add_run_tags(pipeline_run.run_id, {"user": email}) else: warnings.warn(f"Couldn't decode JWT header {jwt_claims_header}") return super().submit_run(context)
To specify the custom Run Coordinator to be used on the instance, add the following snippet to an instance's dagster.yaml
:
run_coordinator: module: run_attribution_example class: CustomRunCoordinator
If you're using Helm to deploy instead, you can specify the custom run coordinator in the Helm chart's values.yaml
:
dagsterDaemon: runCoordinator: enabled: true type: CustomRunCoordinator config: customRunCoordinator: module: run_attribution_example class: CustomRunCoordinator config: {}
Note that the flexibility of specifying module
and class
allows for any custom Run Coordinator to be used, as long as the relevant module
is installed in the image that the Dagster instance is running on.