Queue and Trigger Data and Metadata Jobs¶
This notebook describes and provides examples to
- understand and manage requests to the central queueing system for NMDC job orchestration
- understand and trigger jobs that can move and analyze data, and update metadata, based on precursor activities
Dependencies¶
The following modules, constants, and helper functions are used by one or more use case cells below, so be sure to run this cell first (ensuring that your relative path to an environment-variables file is set):
In [1]:
Copied!
from datetime import datetime, timezone
import json
import os
from pprint import pprint
import secrets
import time
from dotenv import load_dotenv
import requests
# relative path to file with format
# ```
# NMDC_RUNTIME_HOST=fixme
# NMDC_RUNTIME_USER=fixme
# NMDC_RUNTIME_PASS=fixme
# NMDC_RUNTIME_SITE_ID=fixme # Okay if you don't have yet
# NMDC_RUNTIME_SITE_CLIENT_ID=fixme # Okay if you don't have yet
# NMDC_RUNTIME_SITE_CLIENT_SECRET=fixme # Okay if you don't have yet
# ```
envfile_path = "../../.env.client"
load_dotenv(envfile_path)
ENV = {
k: v for k, v in os.environ.items()
if k.startswith("NMDC_RUNTIME_")
}
assert (
ENV["NMDC_RUNTIME_HOST"] ==
"https://api.microbiomedata.org"
)
HOST = ENV["NMDC_RUNTIME_HOST"]
def request_and_return_json(method, path, host=HOST, **kwargs):
r = requests.request(method, host + path, **kwargs)
r.raise_for_status()
return r.json()
def get_json(path, host=HOST, **kwargs):
return request_and_return_json("GET", path, host=host, **kwargs)
def post_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("POST", path, host=host, **kwargs)
def patch_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("PATCH", path, host=host, **kwargs)
def put_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("PUT", path, host=host, **kwargs)
def auth_header(bearer_token):
return {"Authorization": f"Bearer {bearer_token}"}
def get_token_for_user():
response = post_and_return_json(
"/token",
data={
"grant_type": "password",
"username": ENV["NMDC_RUNTIME_USER"],
"password": ENV["NMDC_RUNTIME_PASS"]
}
)
expires_minutes = response['expires']['minutes']
print(f"Bearer token expires in {expires_minutes} minutes")
return response["access_token"]
def get_token_for_site_client():
response = post_and_return_json(
"/token",
data={
"grant_type": "client_credentials",
"client_id": ENV["NMDC_RUNTIME_SITE_CLIENT_ID"],
"client_secret": ENV["NMDC_RUNTIME_SITE_CLIENT_SECRET"]
}
)
expires_minutes = response['expires']['minutes']
print(f"Bearer token expires in {expires_minutes} minutes")
return response["access_token"]
def now(as_str=False):
dt = datetime.now(timezone.utc)
return dt.isoformat() if as_str else dt
TOKEN_U = get_token_for_user()
from datetime import datetime, timezone
import json
import os
from pprint import pprint
import secrets
import time
from dotenv import load_dotenv
import requests
# relative path to file with format
# ```
# NMDC_RUNTIME_HOST=fixme
# NMDC_RUNTIME_USER=fixme
# NMDC_RUNTIME_PASS=fixme
# NMDC_RUNTIME_SITE_ID=fixme # Okay if you don't have yet
# NMDC_RUNTIME_SITE_CLIENT_ID=fixme # Okay if you don't have yet
# NMDC_RUNTIME_SITE_CLIENT_SECRET=fixme # Okay if you don't have yet
# ```
envfile_path = "../../.env.client"
load_dotenv(envfile_path)
ENV = {
k: v for k, v in os.environ.items()
if k.startswith("NMDC_RUNTIME_")
}
assert (
ENV["NMDC_RUNTIME_HOST"] ==
"https://api.microbiomedata.org"
)
HOST = ENV["NMDC_RUNTIME_HOST"]
def request_and_return_json(method, path, host=HOST, **kwargs):
r = requests.request(method, host + path, **kwargs)
r.raise_for_status()
return r.json()
def get_json(path, host=HOST, **kwargs):
return request_and_return_json("GET", path, host=host, **kwargs)
def post_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("POST", path, host=host, **kwargs)
def patch_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("PATCH", path, host=host, **kwargs)
def put_and_return_json(path, host=HOST, **kwargs):
return request_and_return_json("PUT", path, host=host, **kwargs)
def auth_header(bearer_token):
return {"Authorization": f"Bearer {bearer_token}"}
def get_token_for_user():
response = post_and_return_json(
"/token",
data={
"grant_type": "password",
"username": ENV["NMDC_RUNTIME_USER"],
"password": ENV["NMDC_RUNTIME_PASS"]
}
)
expires_minutes = response['expires']['minutes']
print(f"Bearer token expires in {expires_minutes} minutes")
return response["access_token"]
def get_token_for_site_client():
response = post_and_return_json(
"/token",
data={
"grant_type": "client_credentials",
"client_id": ENV["NMDC_RUNTIME_SITE_CLIENT_ID"],
"client_secret": ENV["NMDC_RUNTIME_SITE_CLIENT_SECRET"]
}
)
expires_minutes = response['expires']['minutes']
print(f"Bearer token expires in {expires_minutes} minutes")
return response["access_token"]
def now(as_str=False):
dt = datetime.now(timezone.utc)
return dt.isoformat() if as_str else dt
TOKEN_U = get_token_for_user()
Bearer token expires in 30 minutes
Understand and Manage Queued Jobs¶
Use case: create a new logical "site" to associate with job executions¶
In [2]:
Copied!
user_info = get_json("/users/me/", headers=auth_header(TOKEN_U))
id_newsite = f'{ENV["NMDC_RUNTIME_USER"]}-{secrets.token_urlsafe()}'
post_and_return_json(
"/sites",
json={"id": id_newsite},
headers=auth_header(TOKEN_U)
)
ENV["NMDC_RUNTIME_SITE_ID"] = id_newsite
print(ENV["NMDC_RUNTIME_SITE_ID"])
user_info = get_json("/users/me/", headers=auth_header(TOKEN_U))
id_newsite = f'{ENV["NMDC_RUNTIME_USER"]}-{secrets.token_urlsafe()}'
post_and_return_json(
"/sites",
json={"id": id_newsite},
headers=auth_header(TOKEN_U)
)
ENV["NMDC_RUNTIME_SITE_ID"] = id_newsite
print(ENV["NMDC_RUNTIME_SITE_ID"])
dwinston-_SIO50Lu0Whd45uyX9xKm3t7VmVV1KVxk6HrTm27brE
Use case: create client credentials for a site you administer¶
In [3]:
Copied!
site_id = ENV["NMDC_RUNTIME_SITE_ID"]
print(f"New client ID for site {site_id}:")
response = post_and_return_json(
f"/sites/{site_id}:generateCredentials",
headers=auth_header(TOKEN_U),
)
response
ENV["NMDC_RUNTIME_SITE_CLIENT_ID"] = response["client_id"]
ENV["NMDC_RUNTIME_SITE_CLIENT_SECRET"] = response["client_secret"]
print(ENV["NMDC_RUNTIME_SITE_CLIENT_ID"])
site_id = ENV["NMDC_RUNTIME_SITE_ID"]
print(f"New client ID for site {site_id}:")
response = post_and_return_json(
f"/sites/{site_id}:generateCredentials",
headers=auth_header(TOKEN_U),
)
response
ENV["NMDC_RUNTIME_SITE_CLIENT_ID"] = response["client_id"]
ENV["NMDC_RUNTIME_SITE_CLIENT_SECRET"] = response["client_secret"]
print(ENV["NMDC_RUNTIME_SITE_CLIENT_ID"])
New client ID for site dwinston-_SIO50Lu0Whd45uyX9xKm3t7VmVV1KVxk6HrTm27brE: sys0rgj0z957
Use case: filter relevant jobs your site can execute¶
In [4]:
Copied!
TOKEN_S = get_token_for_site_client()
def filter_jobs(filter_):
return get_json(
f"/jobs/",
headers=auth_header(TOKEN_U),
params={"filter": json.dumps(filter_)})
response = filter_jobs({"workflow.id": "test"})
pprint(response)
job_id = response['resources'][0]['id']
print(job_id)
TOKEN_S = get_token_for_site_client()
def filter_jobs(filter_):
return get_json(
f"/jobs/",
headers=auth_header(TOKEN_U),
params={"filter": json.dumps(filter_)})
response = filter_jobs({"workflow.id": "test"})
pprint(response)
job_id = response['resources'][0]['id']
print(job_id)
Bearer token expires in 30 minutes {'resources': [{'claims': [{'op_id': 'nmdc:sys08wb3p548', 'site_id': 'dwinston-J4TsenGGwEf0WXGNE5GKDOvZ15tpxPU2DXSsrytEZl8'}], 'config': {}, 'id': 'nmdc:fk0jb83', 'workflow': {'id': 'test'}}, {'config': {'object_id': '1bte-2c60-26'}, 'created_at': '2021-09-15T21:21:33.565000', 'id': 'nmdc:sys0d9st65', 'workflow': {'id': 'test'}}, {'config': {'object_id': 'px81-r1xd-77'}, 'created_at': '2021-09-27T21:17:03.606000', 'id': 'nmdc:sys09zw052', 'workflow': {'id': 'test'}}, {'config': {'object_id': 'sys04b34c032'}, 'created_at': '2022-08-16T20:13:43.339047+00:00', 'id': 'nmdc:sys0m8808k69', 'workflow': {'id': 'test'}}, {'config': {'object_id': 'sys0gprg5t78'}, 'created_at': '2022-08-16T20:18:19.335866+00:00', 'id': 'nmdc:sys088x72f03', 'workflow': {'id': 'test'}}, {'config': {'object_id': 'sys091bcr845'}, 'created_at': '2022-08-16T20:22:01.353465+00:00', 'id': 'nmdc:sys0grg8vd94', 'workflow': {'id': 'test'}}]} nmdc:fk0jb83
Use case: claim a job execution to keep folks in the loop¶
In [5]:
Copied!
TOKEN_S = get_token_for_site_client()
response = post_and_return_json(
f"/jobs/{job_id}:claim",
headers=auth_header(TOKEN_S),
)
pprint(response)
operation_id = response["id"]
print("Operation ID is ", operation_id)
TOKEN_S = get_token_for_site_client()
response = post_and_return_json(
f"/jobs/{job_id}:claim",
headers=auth_header(TOKEN_S),
)
pprint(response)
operation_id = response["id"]
print("Operation ID is ", operation_id)
Bearer token expires in 30 minutes {'done': False, 'expire_time': '2022-09-15T20:22:52.487625+00:00', 'id': 'nmdc:sys05me5jk63', 'metadata': {'job': {'config': {}, 'id': 'nmdc:fk0jb83', 'workflow': {'id': 'test'}}, 'model': 'nmdc_runtime.api.models.job.JobOperationMetadata', 'site_id': 'dwinston-_SIO50Lu0Whd45uyX9xKm3t7VmVV1KVxk6HrTm27brE'}, 'result': None} Operation ID is nmdc:sys05me5jk63
Use case: update your job-execution operation to keep folks in the loop¶
In [6]:
Copied!
TOKEN_S = get_token_for_site_client()
print("Operation summary:")
pprint(get_json(f"/operations/{operation_id}"))
print(f"Mark operation as done:")
response = patch_and_return_json(
f"/operations/{operation_id}",
json={"done": True, "result": "code green", "metadata": {"a": 3}},
headers=auth_header(TOKEN_S)
)
pprint(response)
TOKEN_S = get_token_for_site_client()
print("Operation summary:")
pprint(get_json(f"/operations/{operation_id}"))
print(f"Mark operation as done:")
response = patch_and_return_json(
f"/operations/{operation_id}",
json={"done": True, "result": "code green", "metadata": {"a": 3}},
headers=auth_header(TOKEN_S)
)
pprint(response)
Bearer token expires in 30 minutes Operation summary: {'done': False, 'expire_time': '2022-09-15T20:22:52.487000', 'id': 'nmdc:sys05me5jk63', 'metadata': {'job': {'claims': [], 'config': {}, 'created_at': None, 'description': None, 'id': 'nmdc:fk0jb83', 'name': None, 'workflow': {'capability_ids': None, 'created_at': None, 'description': None, 'id': 'test', 'name': None}}, 'model': 'nmdc_runtime.api.models.job.JobOperationMetadata', 'site_id': 'dwinston-_SIO50Lu0Whd45uyX9xKm3t7VmVV1KVxk6HrTm27brE'}, 'result': None} Mark operation as done: {'done': True, 'expire_time': '2022-09-15T20:22:52.487000', 'id': 'nmdc:sys05me5jk63', 'metadata': {'a': 3, 'job': {'claims': [], 'config': {}, 'created_at': None, 'description': None, 'id': 'nmdc:fk0jb83', 'name': None, 'workflow': {'capability_ids': None, 'created_at': None, 'description': None, 'id': 'test', 'name': None}}, 'model': 'nmdc_runtime.api.models.job.JobOperationMetadata', 'site_id': 'dwinston-_SIO50Lu0Whd45uyX9xKm3t7VmVV1KVxk6HrTm27brE'}, 'result': 'code green'}
Understand and Manage Triggered Jobs¶
Use case: register an object that may trigger a job via a type annotation¶
In [7]:
Copied!
response = post_and_return_json(
"/objects",
json={
"description": "a very fake object",
"checksums": [{"type": "sha256", "checksum": secrets.token_hex()}],
"created_time": now(as_str=True),
"size": 1,
"access_methods": [
{"access_url": {"url": "http://example.com/path/to/thing"}},
],
},
headers=auth_header(TOKEN_S)
)
pprint(response)
object_id = response["id"]
print(f"Types associated with Object ID {object_id}:")
pprint(get_json(f"/objects/{object_id}/types"))
response = post_and_return_json(
"/objects",
json={
"description": "a very fake object",
"checksums": [{"type": "sha256", "checksum": secrets.token_hex()}],
"created_time": now(as_str=True),
"size": 1,
"access_methods": [
{"access_url": {"url": "http://example.com/path/to/thing"}},
],
},
headers=auth_header(TOKEN_S)
)
pprint(response)
object_id = response["id"]
print(f"Types associated with Object ID {object_id}:")
pprint(get_json(f"/objects/{object_id}/types"))
{'access_methods': [{'access_id': None, 'access_url': {'headers': None, 'url': 'http://example.com/path/to/thing'}, 'region': None, 'type': 'https'}], 'aliases': None, 'checksums': [{'checksum': '0d5bc352bd9e947bcd1d88e9513ef4e8e0d4c81d7ae0f274a601ce36463e3f82', 'type': 'sha256'}], 'contents': None, 'created_time': '2022-08-16T20:22:58.963427+00:00', 'description': 'a very fake object', 'id': 'sys0n94fnf55', 'mime_type': None, 'name': None, 'self_uri': 'drs://drs.microbiomedata.org/sys0n94fnf55', 'size': 1, 'updated_time': None, 'version': None} Types associated with Object ID sys0n94fnf55: []
Use case: annotate a known object with a type that will trigger a workflow¶
In [8]:
Copied!
response = put_and_return_json(
f"/objects/{object_id}/types",
json=["test"],
headers=auth_header(TOKEN_S),
)
pprint(get_json(f"/objects/{object_id}/types"))
response = put_and_return_json(
f"/objects/{object_id}/types",
json=["test"],
headers=auth_header(TOKEN_S),
)
pprint(get_json(f"/objects/{object_id}/types"))
[{'created_at': '2021-09-07T00:00:00', 'description': 'For use in unit and integration tests', 'id': 'test', 'name': 'A test object type'}]
Wait some time. Perhaps up to a minute. Then, see the claimable job:
In [9]:
Copied!
def filter_jobs(filter_):
return get_json(
f"/jobs/",
headers=auth_header(TOKEN_U),
params={"filter": json.dumps(filter_)})
pprint(filter_jobs({"workflow.id": "test", "config.object_id": object_id}))
def filter_jobs(filter_):
return get_json(
f"/jobs/",
headers=auth_header(TOKEN_U),
params={"filter": json.dumps(filter_)})
pprint(filter_jobs({"workflow.id": "test", "config.object_id": object_id}))
{'resources': [{'config': {'object_id': 'sys0n94fnf55'}, 'created_at': '2022-08-16T20:23:51.372491+00:00', 'id': 'nmdc:sys0s5mpmq50', 'workflow': {'id': 'test'}}]}