Skip to content

Update Sensors, Jobs, Graphs, and Ops

This guide will walk through the operation of the underlying Dagster orchestrator used by the Runtime, using a particular Sensor as the entry point.

A Dagster Repository is a collection of code that defines how orchestration is to be done. The nmdc_runtime.site.repository module exposes three such repositories via the @repository decorator. The creatively named repo repository is the main one. The translation and test_translation repositories are used for GOLD database translation jobs.

Why multiple repositories? A given repository may require resources that a given Dagster deployment may not provide -- it is nice to opt-in to serve a given repository of functionality.

A Dagster Workspace loads one or more repositories for a given deployment.

A Dagster Sensor defines a process that will be run over and over again at a regular, relatively tight interval such as every 30 seconds. The claim_and_run_apply_changesheet_jobs sensor, defined in nmdc_runtime.site.repository via the @sensor decorator, is the example we'll explore here.

A sensor decides, via the code in its body, to yield one or more RunRequests, requests to run a particular job with a particular configuration. A sensor may also yield a SkipReason to log why no job requests were yielded for a particular run of the sensor.

What is a Dagster "job"? A Dagster Job is an executable (directed acyclic) graph of operations. Breaking this down, a Dagster Graph will define abstract dependencies such as "I need a resource called 'mongo' for my operations". A Dagster Op, or Operation, is a Python function that performs work and that depends on resources, e.g. "mongo", that are made accessible to it at runtime. Finally, a Job is a configuration of a Graph that makes resources more concrete, e.g. "by 'mongo', I mean instantiate this Python class, passing it these parameters with values fetched from these environment variables."

In the case of the claim_and_run_apply_changesheet_jobs sensor, the kind of job that it yields RunRequests for is given as an argument to the @sensor decorator, i.e.

@sensor(job=apply_changesheet.to_job(**preset_normal))
def claim_and_run_apply_changesheet_jobs(_context):
    ...

where apply_changesheet is a Graph definition and preset_normal is a Python dictionary that supplies resource definitions (a mapping of resource names to Python functions decorated with @resource) as well as configuration for the resources (incl. specifying environment variables to source).

The apply_changesheet Graph is defined in nmdc_runtime.site.graphs as follows:

@graph
def apply_changesheet():
    sheet_in = get_changesheet_in()
    perform_changesheet_updates(sheet_in)

which is rendered in Dagster's Dagit UI as a graph:

Dagit UI rendering of apply_changesheet job

Dagit UI rendering of `apply_changesheet` job

Thus, Dagster inspects the Python code's abstract syntax tree (AST) in order to create a graph of operation nodes and their input/output dependencies. You can explore the Dagit rendering of this job at /workspace/repo@nmdc_runtime.site.repository:repo/jobs/apply_changesheet/ on your local instance (at http://localhost:3000 by default) or e.g. the read-only deployed version.

The operations comprising apply_changesheet are @op-decorated functions in nmdc_runtime.site.ops, i.e.

@op(required_resource_keys={"mongo"})
def get_changesheet_in(context) -> ChangesheetIn:
    mdb: MongoDatabase = context.resources.mongo.db
    object_id = context.solid_config.get("object_id")
    ...

and

@op(required_resource_keys={"mongo"})
def perform_changesheet_updates(context, sheet_in: ChangesheetIn):
    ....

Here you can see that Dagster operations (Ops) communicate what resources the need when they are run. Resources are injected as part of the context argument supplied to the function call. You can also see that other particular values that configure the job run, and thus are usable as input, can be passed to the specific solid being run (solid is Dagster's deprecated term for op, and context objects now have equivalent op_config attributes).

This extra level of indirection allows different schemes for the execution of job steps, i.e. ops. For example, the default Dagster Executor executes each step in its own spawned system process on the host machine, i.e. multiprocessing. One can also decide to e.g. execute each step within its own Docker container or even within an ephemeral kubernetes pod. There is similar system flexibility at the Run Launcher level for each Job (configured graph of Ops) run, where one can launch jobs runs in new processes (the default), new Docker containers, etc.

You can read more about options for Dagster Deployment and see how the NMDC Runtime deployment is configured via its dagster.yaml file.

In summary, there are many "touch points" for a given job, such as how it is scheduled (e.g. via a Sensor or via a Schedule that is more akin to Cron), how it is configured to source particular Resources from a Graph template, and how the work of the Graph is split into a collection of Ops that declare their dependencies so that everything can be run in a suitable order. All of these touch points are code-based, and so any modifications made to a given job should be expressed as a git branch for a Pull Request.