Source code for nmdc_client.data_staging

# -*- coding: utf-8 -*-
import json
import logging
from typing import Any

import requests

from nmdc_client.api_client import NMDCAPIClient
from nmdc_client.auth import NMDCAuth
from nmdc_client.config import API_BASE_URL
from nmdc_client.decorators import has_deprecated_parameter, requires_auth

logger = logging.getLogger(__name__)


[docs] @has_deprecated_parameter("env", reason="Use ``api_base_url`` instead.") class JGISequencingProjectAPI(NMDCAPIClient): """ Class to interact with the NMDC API to get JGI samples. Parameters ---------- auth The NMDCAuth instance containing the credentials and API base URL for authentication. api_base_url The base URL of an instance of the NMDC Runtime API. By default, this is the base URL of the production instance. env Deprecated. Use ``api_base_url`` instead. Previously used to specify the API environment (e.g., "prod", "dev"). """ def __init__( self, auth: NMDCAuth, api_base_url: str = API_BASE_URL, env: str = "", ): self.auth = auth if not self.auth.has_credentials(): raise ValueError("credentials must be provided") super().__init__( api_base_url=api_base_url, env=env, ) # make sure the `api_base_url` is the same # TODO: Use a global "settings" object to store the `api_base_url` in a single place. # Example: https://github.com/pydantic/pydantic-settings if self.auth.api_base_url != self.api_base_url: raise ValueError( "`api_base_url` must be the same for NMDCAuth and JGISequencingProjectAPI" )
[docs] @requires_auth def create_jgi_sequencing_project( self, jgi_sequencing_project: dict, ) -> dict: """ Create a new JGI sequencing project in the NMDC database. For more information on available keys, visit the NMDC API Docs https://api.microbiomedata.org/docs#/Workflow%20management/create_sequencing_record_wf_file_staging_jgi_sequencing_projects_post Parameters ---------- jgi_sequencing_project The JGI sequencing project data to be created. Returns ------- dict The created JGI sequencing project record. Raises ------ Exception If the creation fails. """ url = f"{self.api_base_url}/wf_file_staging/jgi_sequencing_projects" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.post(url, headers=headers, json=jgi_sequencing_project) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to add new JGI sequencing project") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()
[docs] @requires_auth def list_jgi_sequencing_projects( self, filter: str | None = None, max_page_size: int = 20, fields: str = "", all_pages: bool = False, ) -> list[dict[str, Any]]: """ List JGI sequencing projects from the NMDC database. Parameters ---------- filter Filter to apply to the API call. max_page_size The maximum number of items to return per page. fields The fields to return. all_pages True to return all pages. False to return the first page. Returns ------- list The list of JGI sequencing projects. """ url = f"{self.api_base_url}/wf_file_staging/jgi_sequencing_projects" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: # Note: The `dict` is here to appease mypy, which, for some reason, doesn't infer that # the dictionary being assigned here is sufficient to pass to `requests.get`. query_params: dict = { "filter": f"{json.dumps(filter)}", "max_page_size": max_page_size, "projection": fields, } response = requests.get(url, headers=headers, params=query_params) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to retrieve JGI sequencing projects") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) if all_pages: return self._get_all_pages( response, url, filter or "", max_page_size, fields, access_token=self.auth.get_token(), )["resources"] return response.json()["resources"]
[docs] @requires_auth def get_jgi_sequencing_project_by_name(self, project_name: str) -> dict: """ Get a specific JGI sequencing project by name. Parameters ---------- project_name The name of the JGI sequencing project to retrieve. Returns ------- dict The JGI sequencing project record. """ url = f"{self.api_base_url}/wf_file_staging/jgi_sequencing_projects/{project_name}" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.get(url, headers=headers) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to retrieve JGI sequencing project") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()
[docs] @has_deprecated_parameter("env", reason="Use ``api_base_url`` instead.") class JGISampleSearchAPI(NMDCAPIClient): """ Class to interact with the NMDC API to get JGI samples. Parameters ---------- auth The NMDCAuth instance containing the credentials and API base URL for authentication. api_base_url The base URL of an instance of the NMDC Runtime API. By default, this is the base URL of the production instance. env Deprecated. Use ``api_base_url`` instead. Previously used to specify the API environment (e.g., "prod", "dev"). """ def __init__( self, auth: NMDCAuth, api_base_url: str = API_BASE_URL, env: str = "", ): self.auth = auth if not self.auth.has_credentials(): raise ValueError("credentials must be provided") super().__init__( api_base_url=api_base_url, env=env, ) # make sure the `api_base_url` is the same # TODO: Use a global "settings" object to store the `api_base_url` in a single place. # Example: https://github.com/pydantic/pydantic-settings if self.auth.api_base_url != self.api_base_url: raise ValueError( "`api_base_url` must be the same for NMDCAuth and JGISampleSearchAPI" )
[docs] @requires_auth def list_jgi_samples( self, filter: str | None = None, max_page_size: int = 20, fields: str = "", all_pages: bool = False, ) -> list[dict]: """ Get a specific JGI sample by name. Parameters ---------- filter Filter to apply to the API call. max_page_size The maximum number of items to return per page. fields The fields to return. all_pages True to return all pages. False to return the first page. Returns ------- list[dict] The list of JGI sample records. """ url = f"{self.api_base_url}/wf_file_staging/jgi_samples" try: query = filter if filter else {} query_params: dict[str, str | int] = { "filter": f"{json.dumps(query)}", "max_page_size": max_page_size, "projection": fields, } response = requests.get( url, headers=self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ), params=query_params, ) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to retrieve JGI samples") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) if all_pages: return self._get_all_pages( response, url, filter or "", max_page_size, fields, self.auth.get_token(), )["resources"] return response.json()["resources"]
[docs] @requires_auth def insert_jgi_sample( self, jgi_sample: dict, ) -> dict: """ Insert JGI samples into the NMDC database. For more information on keys, visit the NMDC API Docs https://api.microbiomedata.org/docs#/Workflow%20management/create_jgi_sample_wf_file_staging_jgi_samples_post Parameters ---------- jgi_sample The JGI sample data to be inserted. Returns ------- dict The response from the insertion operation. Raises ------ Exception If the insertion fails. """ url = f"{self.api_base_url}/wf_file_staging/jgi_samples" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.post(url, headers=headers, json=jgi_sample) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to insert JGI samples") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()
[docs] @requires_auth def update_jgi_sample( self, jgi_file_id: str, jgi_sample: dict, ) -> dict: """ Update JGI samples in the NMDC database. For more information on available keys, visit the NMDC API Docs https://api.microbiomedata.org/docs#/Workflow%20management/update_jgi_samples_wf_file_staging_jgi_samples__jdp_file_id__patch Parameters ---------- jgi_file_id The JGI file ID of the sample to be updated. jgi_sample The updated JGI sample data. Returns ------- dict The response from the update operation. Raises ------ Exception If the update fails. """ url = f"{self.api_base_url}/wf_file_staging/jgi_samples/{jgi_file_id}" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.patch(url, headers=headers, json=jgi_sample) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to update JGI samples") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()
[docs] @has_deprecated_parameter("env", reason="Use ``api_base_url`` instead.") class GlobusTaskAPI(NMDCAPIClient): """ Class to interact with the NMDC API for Globus tasks. Parameters ---------- auth The NMDCAuth instance containing the credentials and API base URL for authentication. api_base_url The base URL of an instance of the NMDC Runtime API. By default, this is the base URL of the production instance. env Deprecated. Use ``api_base_url`` instead. Previously used to specify the API environment (e.g., "prod", "dev"). """ def __init__( self, auth: NMDCAuth, api_base_url: str = API_BASE_URL, env: str = "", ): self.auth = auth if not self.auth.has_credentials(): raise ValueError("credentials must be provided") super().__init__( api_base_url=api_base_url, env=env, ) # make sure the `api_base_url` is the same # TODO: Use a global "settings" object to store the `api_base_url` in a single place. # Example: https://github.com/pydantic/pydantic-settings if self.auth.api_base_url != self.api_base_url: raise ValueError( "`api_base_url` must be the same for NMDCAuth and GlobusTaskAPI" )
[docs] @requires_auth def list_globus_tasks( self, filter: str | None = None, max_page_size: int = 20, fields: str = "", all_pages: bool = False, ) -> list[dict]: """ Get Globus tasks from the NMDC database. Parameters ---------- filter Filter to apply to the API call. max_page_size The maximum number of items to return per page. fields The fields to return. all_pages True to return all pages. False to return the first page. Returns ------- list[dict] The list of Globus task records. """ url = f"{self.api_base_url}/wf_file_staging/globus_tasks" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) query_params: dict[str, str | int] = { "filter": f"{json.dumps(filter)}", "max_page_size": max_page_size, "projection": fields, } try: response = requests.get(url, headers=headers, params=query_params) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to retrieve Globus tasks") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) if all_pages: return self._get_all_pages( response, url, filter or "", max_page_size, fields, self.auth.get_token(), )["resources"] return response.json()["resources"]
[docs] @requires_auth def create_globus_task( self, globus_task: dict, ) -> dict: """ Create a new Globus task in the NMDC database. For more information on available keys, visit the NMDC API Docs https://api.microbiomedata.org/docs#/Workflow%20management/create_globus_tasks_wf_file_staging_globus_tasks_post Parameters ---------- globus_task The Globus task data to be created. Returns ------- dict The created Globus task record. Raises ------ Exception If the creation fails. """ url = f"{self.api_base_url}/wf_file_staging/globus_tasks" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.post(url, headers=headers, json=globus_task) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to add new Globus task") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()
[docs] @requires_auth def update_globus_task( self, globus_task_id: str, globus_task: dict, ) -> dict: """ Update a Globus task in the NMDC database. For more information on available keys, visit the NMDC API Docs https://api.microbiomedata.org/docs#/Workflow%20management/update_globus_tasks_wf_file_staging_globus_tasks__task_id__patch Parameters ---------- globus_task_id The ID of the Globus task to be updated. globus_task The Globus task data to be updated. Returns ------- dict The updated Globus task record. Raises ------ Exception If the update fails. """ url = f"{self.api_base_url}/wf_file_staging/globus_tasks/{globus_task_id}" headers = self._build_http_request_headers( access_token=self.auth.get_token(), accept="application/json", content_type="application/json", ) try: response = requests.patch(url, headers=headers, json=globus_task) response.raise_for_status() except requests.RequestException as e: logger.error(f"Request failed: {e}") raise RuntimeError("Failed to update Globus task") from e else: logging.debug( f"API request response: {response.json()}\n API Status Code: {response.status_code}" ) return response.json()