Source code for nmdc_ms_metadata_gen.metadata_generator

import ast
import hashlib

# json validate imports
import importlib.resources
import json
import logging
import os
import pkgutil
import re
import time
from abc import ABC
from copy import deepcopy
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Dict, List

import nmdc_schema.nmdc as nmdc
import numpy as np
import pandas as pd
import requests
import toml
from linkml.validator import Validator
from linkml.validator.plugins import JsonschemaValidationPlugin
from linkml_runtime import SchemaView
from linkml_runtime.dumpers import json_dumper
from nmdc_api_utilities.auth import NMDCAuth
from nmdc_api_utilities.biosample_search import BiosampleSearch
from nmdc_api_utilities.configuration_search import ConfigurationSearch
from nmdc_api_utilities.data_object_search import DataObjectSearch
from nmdc_api_utilities.instrument_search import InstrumentSearch
from nmdc_api_utilities.metadata import Metadata
from nmdc_api_utilities.nmdc_search import NMDCSearch
from nmdc_api_utilities.processed_sample_search import ProcessedSampleSearch
from nmdc_api_utilities.study_search import StudySearch
from nmdc_schema import NmdcSchemaValidationPlugin
from nmdc_schema.nmdc import Database as NMDCDatabase
from tqdm import tqdm

import nmdc_ms_metadata_gen
from nmdc_ms_metadata_gen.data_classes import NmdcTypes, ProcessGeneratorMap
from nmdc_ms_metadata_gen.id_pool import IDPool
from nmdc_ms_metadata_gen.schema_bridge import get_curie_for_class

ENV = os.getenv("NMDC_ENV", "prod")

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


[docs] class NMDCMetadataGenerator: """ Generic base class for generating and validating NMDC metadata Parameters ---------- id_pool_size : int The size of the ID pool to maintain for minting NMDC IDs. Default is 100. id_refill_threshold : int The threshold at which to refill the ID pool. Default is 10. Attributes ---------- id_pool : IDPool An instance of the IDPool class for managing NMDC ID minting. provenance_metadata : nmdc.ProvenanceMetadata An instance of the ProvenanceMetadata associated with this metadata generation process. """ def __init__( self, id_pool_size: int = 100, id_refill_threshold: int = 10, test: bool = False ): # Initialize ID pool self.id_pool = IDPool( pool_size=id_pool_size, refill_threshold=id_refill_threshold, test=test ) # Add provenance metadata self.provenance_metadata = self._generate_provenance_metadata() def _generate_provenance_metadata(self) -> nmdc.ProvenanceMetadata: """ Generate ProvenanceMetadata associated with this metadata generation process. This method creates a ProvenanceMetadata instance that captures information about the metadata generation process and is subsequently associated with generated NMDC instances as appropriate. Returns ------- nmdc.ProvenanceMetadata The generated ProvenanceMetadata instance. """ type_str = NmdcTypes.get("ProvenanceMetadata") git_url = "https://github.com/microbiomedata/nmdc_mass_spectrometry_metadata_generation" version = nmdc_ms_metadata_gen.__version__ # Warn if using development version (package not installed properly) if version == "0.0.0-dev": logging.warning( "Using development version '0.0.0-dev'. Install the package with 'pip install -e .' to use the actual version from pyproject.toml" ) source_system_of_record = "custom" add_date = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") provenance_metadata = nmdc.ProvenanceMetadata( type=type_str, git_url=git_url, version=version, source_system_of_record=source_system_of_record, add_date=add_date, ) return provenance_metadata
[docs] def load_credentials(self, config_file: str = None) -> tuple: """ Load the client ID and secret from the environment or a configuration file. Parameters ---------- config_file: str The path to the configuration file. Returns ------- tuple A tuple containing the client ID and client secret. """ client_id = os.getenv("CLIENT_ID") client_secret = os.getenv("CLIENT_SECRET") if not client_id or not client_secret: if config_file: config_file = Path(config_file) try: config = toml.load(config_file) client_id = config.get("CLIENT_ID") client_secret = config.get("CLIENT_SECRET") except FileNotFoundError: raise ValueError(f"Config file {config_file} not found.") except toml.TomlDecodeError: raise ValueError("Error decoding TOML from the config file.") except KeyError: raise ValueError( "Config file must contain CLIENT_ID and CLIENT_SECRET." ) if not client_id or not client_secret: raise ValueError( "CLIENT_ID and CLIENT_SECRET must be set either in environment variables or passed in the config file.\nThey must be named CLIENT_ID and CLIENT_SECRET respectively." ) return client_id, client_secret
[docs] def start_nmdc_database(self) -> nmdc.Database: """ Initialize and return a new NMDC Database instance. Returns ------- nmdc.Database A new instance of an NMDC Database. Notes ----- This method simply creates and returns a new instance of the NMDC Database. It does not perform any additional initialization or configuration. """ return nmdc.Database()
[docs] def find_associated_ids(self, ids: list[str]): """ Given a list of sample ids, find the associated study ids. Parameters ---------- ids : list[str] The ids to search for. Returns ------- """ search_obj = NMDCSearch(env=ENV) resp = search_obj.get_linked_instances_and_associate_ids( ids=ids, types="nmdc:Study" ) return resp
[docs] def clean_dict(self, dict: Dict) -> Dict: """ Clean the dictionary by removing keys with empty or None values. Parameters ---------- dict : Dict The dictionary to be cleaned. Returns ------- Dict A new dictionary with keys removed where the values are None, an empty string, or a string with only whitespace. """ return { k: v for k, v in dict.items() if v not in [None, "", ""] and not (isinstance(v, float) and np.isnan(v)) }
[docs] def generate_data_object( self, file_path: Path, data_category: str, data_object_type: str, description: str, base_url: str, CLIENT_ID: str, CLIENT_SECRET: str, was_generated_by: str = None, alternative_id: str = None, in_manifest=None, url: str = None, ) -> nmdc.DataObject: """ Create an NMDC DataObject with metadata from the specified file and details. This method generates an NMDC DataObject and assigns it a unique NMDC ID. The DataObject is populated with metadata derived from the provided file and input parameters. Parameters ---------- file_path : Path Path to the file representing the data object. The file's name is used as the `name` attribute. data_category : str Category of the data object (e.g., 'instrument_data'). data_object_type : str Type of the data object (e.g., 'LC-DDA-MS/MS Raw Data'). description : str Description of the data object. base_url : str Base URL for accessing the data object, to which the file name is appended to form the complete URL. Ignored if `url` is provided. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. was_generated_by : str, optional ID of the process or entity that generated the data object (e.g., the DataGeneration id or the MetabolomicsAnalysis id). alternative_id : str, optional An optional alternative identifier for the data object. url : str, optional The complete URL for the data object. If provided, this takes precedence over constructing the URL from base_url + file name. Returns ------- nmdc.DataObject An NMDC DataObject instance with the specified metadata. Notes ----- This method calculates the MD5 checksum of the file, which may be time-consuming for large files. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("DataObject"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "data_category": data_category, "data_object_type": data_object_type, "name": file_path.name, "description": description, "file_size_bytes": file_path.stat().st_size, "md5_checksum": hashlib.md5(file_path.open("rb").read()).hexdigest(), "url": url if url is not None else base_url + str(file_path.name), "type": NmdcTypes.get("DataObject"), "was_generated_by": was_generated_by, "alternative_identifiers": alternative_id, "in_manifest": in_manifest, } # If any of the data_dict values are None or empty strings, remove them data_dict = self.clean_dict(data_dict) data_object = nmdc.DataObject(**data_dict) return data_object
[docs] def generate_mass_spectrometry_configuration( self, mass_spectrometry_acquisition_strategy: str, resolution_categories: List[str], mass_analyzers: List[str], ionization_source: str, mass_spectrum_collection_modes: List[str], polarity_mode: str, name: str, description: str, CLIENT_ID: str, CLIENT_SECRET: str, protocol_link: nmdc.Protocol | None = None, ) -> nmdc.MassSpectrometryConfiguration: """ Create an NMDC MassSpectrometryConfiguration object with the provided metadata. This method generates an NMDC MassSpectrometryConfiguration object, populated with the specified metadata. Parameters ---------- mass_spectrometry_acquisition_strategy : str The acquisition strategy used for mass spectrometry. resolution_categories : List[str] The resolution categories applicable to the mass spectrometry data. mass_analyzers : List[str] The mass analyzers used in the experiment. ionization_source : str The ionization source employed for the mass spectrometry analysis. mass_spectrum_collection_modes : List[str] The collection modes used for acquiring mass spectra. polarity_mode : str The polarity mode used in the mass spectrometry analysis. name : str The name of the mass spectrometry configuration. description : str A description of the mass spectrometry configuration. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. protocol_link : nmdc.Protocol, optional A link to the protocol used for the mass spectrometry configuration. Returns ------- nmdc.MassSpectrometryConfiguration An NMDC MassSpectrometryConfiguration object with the specified metadata. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("MassSpectrometryConfiguration"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "name": name, "description": description, "mass_spectrometry_acquisition_strategy": mass_spectrometry_acquisition_strategy, "resolution_categories": resolution_categories, "mass_analyzers": mass_analyzers, "ionization_source": ionization_source, "mass_spectrum_collection_modes": mass_spectrum_collection_modes, "polarity_mode": polarity_mode, "type": NmdcTypes.get("MassSpectrometryConfiguration"), "protocol_link": protocol_link, } mass_spectrometry_config = nmdc.MassSpectrometryConfiguration(**data_dict) return mass_spectrometry_config
[docs] def generate_portion_of_substance( self, substance_name: str, volume_value: float | None = None, volume_unit: str | None = None, final_concentration_value: float | None = None, source_concentration_value: float | None = None, concentration_unit: str | None = None, mass_value: float | None = None, mass_unit: str | None = None, substance_role: str | None = None, ) -> nmdc.PortionOfSubstance: """ Create an NMDC PortionOfSubstance object with the provided metadata. This method generates an NMDC PortionOfSubstance object, populated with the specified metadata. Parameters ---------- substance_name : str The name of the substance. volume_value : float, optional The volume of the substance. volume_unit : str, optional The unit of measurement for the volume. final_concentration_value : float, optional The final concentration of the substance. source_concentration_value : float, optional The source concentration of the substance. concentration_unit : str, optional The unit of measurement for the concentrations (both final and source). mass_value : float, optional The mass of the substance. mass_unit : str, optional The unit of measurement for the mass. substance_role : str, optional The role of the substance in the experiment. """ data_dict = { "known_as": substance_name, "type": NmdcTypes.get("PortionOfSubstance"), } if volume_value and volume_unit: data_dict["volume"] = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=volume_value, has_unit=volume_unit, ) if final_concentration_value and concentration_unit: data_dict["final_concentration"] = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=final_concentration_value, has_unit=concentration_unit, ) if source_concentration_value and concentration_unit: data_dict["source_concentration"] = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=source_concentration_value, has_unit=concentration_unit, ) if mass_value and mass_unit: data_dict["mass"] = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=mass_value, has_unit=mass_unit, ) if substance_role: data_dict["substance_role"] = substance_role portion_of_substance = nmdc.PortionOfSubstance(**data_dict) return portion_of_substance
[docs] def generate_mobile_phase_segment( self, duration_value: float | None = None, duration_unit: str | None = None, substances_used: List[nmdc.PortionOfSubstance] | None = None, ) -> nmdc.MobilePhaseSegment: """ Generate an NMDC MobilePhaseSegment object with the provided metadata. This method creates an NMDC MobilePhaseSegment object, populated with the specified metadata. Parameters ---------- duration_value : float, optional The duration of the mobile phase segment. duration_unit : str, optional The unit of measurement for the duration. substances_used : List[nmdc.PortionOfSubstance], optional A list of PortionOfSubstance objects used in the mobile phase segment. Returns ------- nmdc.MobilePhaseSegment The generated NMDC MobilePhaseSegment object. """ mobile_phase_segment = nmdc.MobilePhaseSegment( type=NmdcTypes.get("MobilePhaseSegment") ) if duration_value and duration_unit: mobile_phase_segment.duration = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=duration_value, has_unit=duration_unit, ) if substances_used: mobile_phase_segment.substances_used = substances_used return mobile_phase_segment
[docs] def generate_protocol(self, name: str = None, url: str = None) -> nmdc.Protocol: """ Create an NMDC Protocol object with the provided metadata. This method generates an NMDC Protocol object, populated with the specified metadata. Parameters ---------- name : str, optional The name of the protocol. url : str, optional The URL of the protocol. Returns ------- nmdc.Protocol An NMDC Protocol object with the specified metadata. """ data_dict = { "type": NmdcTypes.get("Protocol"), "name": name, "url": url, } return nmdc.Protocol(**data_dict)
[docs] def generate_chromatography_configuration( self, name: str, description: str, chromatographic_category: str, stationary_phase: str, CLIENT_ID: str, CLIENT_SECRET: str, ordered_mobile_phases: List[nmdc.MobilePhaseSegment] | None = None, temperature_value: float | None = None, temperature_unit: str | None = None, protocol_link: nmdc.Protocol | None = None, ) -> nmdc.ChromatographyConfiguration: """ Create an NMDC ChromatographyConfiguration object with the provided metadata. This method generates an NMDC ChromatographyConfiguration object, populated with the specified metadata. Parameters ---------- name : str The name of the chromatography configuration. description : str A description of the chromatography configuration. chromatographic_category : str The category of chromatography (e.g., 'liquid_chromatography'). stationary_phase : str The stationary phase used in the chromatography (e.g., 'C18'). CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. ordered_mobile_phases : List[nmdc.MobilePhaseSegment] A list of mobile phase segments used in the chromatography. temperature_value : float, optional The temperature at which the chromatography is performed. temperature_unit : str, optional The unit of measurement for the temperature (e.g., 'Cel') protocol_link : nmdc.Protocol, optional A link to the protocol used for the chromatography configuration. Returns ------- nmdc.ChromatographyConfiguration An NMDC ChromatographyConfiguration object with the specified metadata. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("ChromatographyConfiguration"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "name": name, "description": description, "chromatographic_category": chromatographic_category, "ordered_mobile_phases": ordered_mobile_phases, "stationary_phase": stationary_phase, "type": NmdcTypes.get("ChromatographyConfiguration"), "protocol_link": protocol_link, } if temperature_value and temperature_unit: data_dict["temperature"] = nmdc.QuantityValue( type=NmdcTypes.get("QuantityValue"), has_numeric_value=temperature_value, has_unit=temperature_unit, ) chromatography_config = nmdc.ChromatographyConfiguration(**data_dict) return chromatography_config
[docs] def generate_processed_sample( self, name: str, description: str, CLIENT_ID: str, CLIENT_SECRET: str, sampled_portion: list = None, ) -> nmdc.ProcessedSample: """ Generate a processed sample object from the provided data. Parameters ---------- name:str Name of the processed sample. description:str Description of the processed sample. sampled_portion:list The portion of the sample that is taken for downstream activity. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- nmdc.ProcessedSample An NMDC ProcessedSample object with the provided metadata. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("ProcessedSample"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "type": NmdcTypes.get("ProcessedSample"), "name": name, "description": description, "sampled_portion": sampled_portion, } return nmdc.ProcessedSample(**data_dict)
[docs] def generate_material_processing( self, data: dict, type: str, has_input: list, has_output: list, CLIENT_ID: str, CLIENT_SECRET: str, ): """ Generate a MaterialProcessing object from the provided data. Parameters ---------- data : dict A dictionary containing the metadata for the material processing. type : str The type of material processing (e.g., 'PoolingProcess'). has_input : list A list of input sample IDs for the process. has_outputs : list A list of output sample IDs for the process. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- nmdc MaterialProcessing object An NMDC MaterialProcessing object with the provided metadata. """ type_curie = get_curie_for_class(type) nmdc_id = self.id_pool.get_id( nmdc_type=type_curie, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) for key, value in data.items(): if value is None: data.pop(key) data.update( { "id": nmdc_id, "has_input": has_input, "has_output": has_output, "type": type_curie, } ) return ProcessGeneratorMap.get(type)(**data)
[docs] def generate_metabolomics_analysis( self, cluster_name: str, raw_data_name: str, raw_data_id: str, data_gen_id_list: List[str], processed_data_id: str, parameter_data_id: str, processing_institution: str, CLIENT_ID: str, CLIENT_SECRET: str, calibration_ids: list[str] = None, incremeneted_id: str = None, metabolite_identifications: List[nmdc.MetaboliteIdentification] = None, type: str = NmdcTypes.get("MetabolomicsAnalysis"), peak_count: int = None, peak_assignment_count: int = None, c13_isotopologue_count: int = None, qc_status: str = None, qc_comment: str = None, ) -> nmdc.MetabolomicsAnalysis: """ Create an NMDC MetabolomicsAnalysis object with metadata for a workflow analysis. This method generates an NMDC MetabolomicsAnalysis object, including details about the analysis, the processing institution, and relevant workflow information. Parameters ---------- cluster_name : str Name of the cluster or computing resource used for the analysis. raw_data_name : str Name of the raw data file that was analyzed. raw_data_id : str ID of the raw data object that was analyzed. data_gen_id_list : List[str] ID of the DataGeneration object that generated the raw data. processed_data_id : str ID of the processed data resulting from the analysis. parameter_data_id : str ID of the parameter data object used for the analysis. processing_institution : str Name of the institution where the analysis was performed. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. calibration_ids : list[str], optional ID of the calibration information used for the analysis. Default is None, indicating no calibration information. incremeneted_id : str, optional An optional incremented ID for the MetabolomicsAnalysis object. If not provided, a new NMDC ID will be minted. metabolite_identifications : List[nmdc.MetaboliteIdentification], optional List of MetaboliteIdentification objects associated with the analysis. Default is None, which indicates no metabolite identifications. type : str, optional The type of the analysis. Default resolves to the schema type for MetabolomicsAnalysis. peak_count : int, optional The number of peaks detected in the analysis. peak_assignment_count : int, optional The number of peak assignments made in the analysis. c13_isotopologue_count : int, optional The number of C13 isotopologues detected in the analysis. qc_status : str, optional The quality control status for the analysis. qc_comment : str, optional The quality control comment for the analysis. Returns ------- nmdc.MetabolomicsAnalysis An NMDC MetabolomicsAnalysis instance with the provided metadata. Notes ----- The 'started_at_time' and 'ended_at_time' fields are initialized with placeholder values and should be updated with actual timestamps later when the processed files are iterated over in the run method. """ if incremeneted_id is None: # If no incremented id is provided, mint a new one nmdc_id = ( self.id_pool.get_id( nmdc_type=NmdcTypes.get("MetabolomicsAnalysis"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) + ".1" ) data_dict = { "id": incremeneted_id if incremeneted_id is not None else nmdc_id, "name": f"{self.workflow_analysis_name} for {raw_data_name}", "description": self.workflow_description, "processing_institution": processing_institution, "execution_resource": cluster_name, "git_url": self.workflow_git_url, "version": self.workflow_version, "was_informed_by": data_gen_id_list, "has_input": [raw_data_id, parameter_data_id], "has_output": [processed_data_id], "started_at_time": "placeholder", "ended_at_time": "placeholder", "type": type, "metabolomics_analysis_category": self.workflow_category, "qc_status": qc_status, "qc_comment": qc_comment, } data_dict["uses_calibration"] = calibration_ids data_dict["has_metabolite_identifications"] = metabolite_identifications data_dict["peak_count"] = peak_count data_dict["peak_assignment_count"] = peak_assignment_count data_dict["c13_isotopologue_count"] = c13_isotopologue_count data_dict = self.clean_dict(data_dict) metab_analysis = nmdc.MetabolomicsAnalysis(**data_dict) return metab_analysis
[docs] def generate_instrument( self, name: str, description: str, CLIENT_ID: str, CLIENT_SECRET: str, vendor: str = None, model: str = None, ) -> nmdc.Instrument: """ Create an NMDC Instrument object with the provided metadata. This method generates an NMDC Instrument object, populated with the specified metadata. Parameters ---------- name : str The name of the instrument. description : str A description of the instrument. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. vendor : str, optional The vendor/manufacturer of the instrument. model : str, optional The model of the instrument. Returns ------- nmdc.Instrument An NMDC Instrument object with the specified metadata. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("Instrument"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "name": name, "description": description, "type": NmdcTypes.get("Instrument"), } if vendor: data_dict["vendor"] = vendor if model: data_dict["model"] = model # Clean the dictionary to remove any None values data_dict = self.clean_dict(data_dict) instrument = nmdc.Instrument(**data_dict) return instrument
[docs] def dump_nmdc_database(self, nmdc_database: nmdc.Database, json_path: Path) -> None: """ Dump the NMDC database to a JSON file. This method serializes the NMDC Database instance to a JSON file at the specified path. Parameters ---------- nmdc_database : nmdc.Database The NMDC Database instance to dump. json_path : Path The file path where the JSON dump will be saved. Returns ------- None Side Effects ------------ Writes the database content to the file specified by `json_path`. """ json_dumper.dump(nmdc_database, json_path) logging.info("Database successfully dumped in %s", json_path)
[docs] def validate_nmdc_database(self, json: dict | str, use_api: bool = True) -> dict: """ Validate the NMDC database JSON object against the NMDC schema. This method checks if the provided JSON file conforms to the NMDC schema. If the validation fails, it returns the errors. Parameters ---------- json : str | dict The JSON object or path to the JSON file to validate. use_api : bool Whether to use the NMDC API for validation. If False, uses local validation. Returns ------- dict A dictionary with the validation result. If valid, returns {"result": "All okay!"}. If errors, returns {"result": "errors", "detail": [list of errors]}. Raises ------ ValueError If the JSON file does not conform to the NMDC schema. """ import json as json_lib if isinstance(json, str): with open(json) as f: json = json_lib.load(f) if use_api: api_metadata = Metadata(env=ENV) api_metadata.validate_json(json) return {"result": "All okay!"} else: validation_result = self._validate_json_no_api(metadata=json) return validation_result
[docs] def json_submit(self, json: dict | str, CLIENT_ID: str, CLIENT_SECRET: str): """ Submit the generated JSON metadata to the NMDC API. Parameters ---------- json : dict | str The JSON metadata to submit. Can be a file path or a JSON string. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- None """ import json as json_lib if isinstance(json, str): with open(json) as f: json = json_lib.load(f) auth = NMDCAuth(client_id=CLIENT_ID, client_secret=CLIENT_SECRET) md = Metadata(env=ENV, auth=auth) success = md.submit_json(json) if success != 200: logging.error("Failed to submit JSON metadata: %s", json) raise ValueError("Failed to submit JSON metadata")
[docs] @staticmethod def get_start_end_times(file) -> tuple: """ Get the start and end times for a given file based on its filesystem metadata. This method retrieves the earliest and latest timestamps associated with the file, considering creation, modification, and, if available, birth times. Parameters ---------- file : Path A pathlib.Path object representing the file. Returns ------- tuple A tuple containing the start time and end time as formatted strings ("%Y-%m-%d %H:%M:%S"). """ stat_info = file.stat() timestamps = [stat_info.st_mtime, stat_info.st_ctime] if hasattr(stat_info, "st_birthtime"): timestamps.append(stat_info.st_birthtime) earliest_time = min(timestamps) start_time = datetime.fromtimestamp(earliest_time).strftime("%Y-%m-%d %H:%M:%S") end_time = datetime.fromtimestamp(stat_info.st_mtime).strftime( "%Y-%m-%d %H:%M:%S" ) assert start_time <= end_time, "Start time must be before end time." return start_time, end_time
@staticmethod def _validate_json_no_api(metadata: dict) -> dict: """ Checks whether the input dictionary represents a valid instance of the nmdc `Database` class defined in the NMDC Schema. This code was grabbed from nmdc-runtime and modified to be a static method here. Parameters ---------- in_docs: dict The dictionary you want to validate. Returns ------- dict A dictionary with two keys: `result` and `detail`. The value of `result` is either `"valid"` or `"invalid"`, and the value of `detail` is a list of validation error messages (if any). Example ------- { "biosample_set": [ {"id": "nmdc:bsm-00-000001", ...}, {"id": "nmdc:bsm-00-000002", ...} ], "study_set": [ {"id": "nmdc:sty-00-000001", ...}, {"id": "nmdc:sty-00-000002", ...} ] } """ def get_nmdc_yaml_view() -> SchemaView: nmdc_schema_bytes = pkgutil.get_data( "nmdc_schema", "nmdc_materialized_patterns.yaml" ) nmdc_schema_yaml_string = str(nmdc_schema_bytes, "utf-8") nmdc_view = SchemaView(nmdc_schema_yaml_string) return nmdc_view def nmdc_database_collection_names(): """ This function was designed to return a list of names of all Database slots that represents database collections """ names = [] view = get_nmdc_yaml_view() all_classes = set(view.all_classes()) for slot in view.class_slots("Database"): rng = getattr(view.get_slot(slot), "range", None) if rng in all_classes: names.append(slot) return names def get_nmdc_jsonschema_path() -> Path: """Get path to NMDC JSON Schema file.""" with importlib.resources.path( "nmdc_schema", "nmdc_materialized_patterns.schema.json" ) as p: return p def get_nmdc_schema_validator() -> Validator: schema_view = get_nmdc_yaml_view() return Validator( schema_view.schema, validation_plugins=[ JsonschemaValidationPlugin( closed=True, # Since the `nmdc-schema` package exports a pre-built JSON Schema file, use that # instead of relying on the plugin to generate one on the fly. json_schema_path=get_nmdc_jsonschema_path(), ), NmdcSchemaValidationPlugin(), ], ) validator = get_nmdc_schema_validator() docs = deepcopy(metadata) validation_errors = {} known_coll_names = set(nmdc_database_collection_names()) for coll_name, coll_docs in docs.items(): if coll_name not in known_coll_names: # We expect each key in `in_docs` to be a known schema collection name. However, `@type` is a special key # for JSON-LD, used for JSON serialization of e.g. LinkML objects. That is, the value of `@type` lets a # client know that the JSON object (a dict in Python) should be interpreted as a # <https://w3id.org/nmdc/Database>. If `@type` is present as a key, and its value indicates that # `metadata` is indeed a nmdc:Database, that's fine, and we don't want to raise an exception. # # prompted by: https://github.com/microbiomedata/nmdc-runtime/discussions/858 if coll_name == "@type" and coll_docs in ("Database", "nmdc:Database"): continue else: validation_errors[coll_name] = [ f"'{coll_name}' is not a known schema collection name" ] continue errors = list( validator.iter_results({coll_name: coll_docs}, target_class="Database") ) validation_errors[coll_name] = [e.message for e in errors] if coll_docs: if not isinstance(coll_docs, list): validation_errors[coll_name].append("value must be a list") elif not all(isinstance(d, dict) for d in coll_docs): validation_errors[coll_name].append( "all elements of list must be dicts" ) if all(len(v) == 0 for v in validation_errors.values()): # Second pass. Try instantiating linkml-sourced dataclass metadata.pop("@type", None) try: NMDCDatabase(**metadata) except Exception as e: return {"result": "errors", "detail": str(e)} return {"result": "All Okay!"} else: return {"result": "errors", "detail": validation_errors}
[docs] def nmdc_db_to_dict(self, nmdc_db: nmdc.Database) -> dict: """ Convert an NMDC Database object to a dictionary. This method serializes the NMDC Database instance to a dictionary. Parameters ---------- nmdc_db : nmdc.Database The NMDC Database instance to convert. Returns ------- dict A dictionary representation of the NMDC Database. """ return json_dumper.to_dict(nmdc_db)
[docs] class NMDCWorkflowMetadataGenerator(NMDCMetadataGenerator, ABC): """ Abstract class for generating NMDC metadata objects using provided metadata files and configuration. Parameters ---------- metadata_file : str Path to the input CSV metadata file. database_dump_json_path : str Path where the output database dump JSON file will be saved. raw_data_url : str Base URL for the raw data files. process_data_url : str Base URL for the processed data files. skip_sample_id_check : bool, optional Flag to skip sample ID checking in MongoDB. If True, will skip biosample and processed sample ID checks even in production mode. Default is False. """ def __init__( self, metadata_file: str, database_dump_json_path: str, raw_data_url: str, process_data_url: str, test: bool = False, skip_sample_id_check: bool = False, ): super().__init__(test=test) self.metadata_file = metadata_file self.database_dump_json_path = database_dump_json_path self.raw_data_url = raw_data_url self.process_data_url = process_data_url self.raw_data_category = "instrument_data" self.skip_sample_id_check = skip_sample_id_check def _get_qc_fields(self, row: pd.Series) -> tuple: """ Extract qc_status and qc_comment from a row, converting NaN to None. Parameters ---------- row : pd.Series A row from the metadata DataFrame. Returns ------- tuple A tuple of (qc_status, qc_comment) with NaN values converted to None. """ qc_status = row.get("qc_status") qc_status = None if pd.isna(qc_status) else qc_status qc_comment = row.get("qc_comment") qc_comment = None if pd.isna(qc_comment) else qc_comment return qc_status, qc_comment
[docs] def load_metadata(self) -> pd.core.frame.DataFrame: """ Load and group workflow metadata from a CSV file. This method reads the metadata CSV file, checks for uniqueness in specified columns, checks that samples exist, and groups the data by sample ID. Returns ------- pd.core.frame.DataFrame A DataFrame containing the loaded and grouped metadata. Raises ------ FileNotFoundError If the `metadata_file` does not exist. ValueError If values in columns 'Raw Data File', and 'Processed Data Directory' are not unique. Notes ----- See example_metadata_file.csv in this directory for an example of the expected input file format. """ try: metadata_df = pd.read_csv(self.metadata_file).replace(np.nan, None) except FileNotFoundError: raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}") # Check for uniqueness in specified columns columns_to_check = self.unique_columns for column in columns_to_check: if not metadata_df[column].is_unique: raise ValueError(f"Duplicate values found in column '{column}'.") # Check if associated_studies is already in the input CSV and validate format if ( "associated_studies" in metadata_df.columns and not metadata_df["associated_studies"].isna().all() ): # Use the associated_studies from the CSV if provided # Validate that each non-null value is properly formatted as a list string for idx, row in metadata_df.iterrows(): if pd.notna(row.get("associated_studies")): # Ensure the value is in proper format (string representation of a list) study_val = row["associated_studies"] if not (isinstance(study_val, str) and study_val.startswith("[")): raise ValueError( f"associated_studies at row {idx} must be a string representation of a list, " f"e.g., \"['nmdc:sty-11-abc123']\". Got: {study_val}" ) # if the run is not a test, check that samples exist and find associated studies elif not self.test: # Check that all samples exist in the db (unless skip_sample_id_check is True) sample_ids = metadata_df["sample_id"].unique() # determine sample type if pd.isna(sample_ids)[0] == np.False_: sample_type = "biosample" if "bsm" in sample_ids[0] else "processed" else: sample_type = "" if sample_type == "biosample": sample_client = BiosampleSearch(env=ENV) else: sample_client = ProcessedSampleSearch(env=ENV) # Only check if IDs exist if skip_sample_id_check is False if not self.skip_sample_id_check: if pd.isna(sample_ids)[0] == np.False_: if not sample_client.check_ids_exist(list(sample_ids)): raise ValueError("IDs do not exist in the collection.") # make a call to find_associated_ids to get the associated studies # build the ID list from the input samples sample_ids = metadata_df["sample_id"].unique().tolist() associations = self.find_associated_ids(ids=sample_ids) # map the ids back to the df before returning. associations will be a list of dictionaries with study ids for sample_id, studies in associations.items(): study_str = str(studies) metadata_df.loc[ metadata_df["sample_id"] == sample_id, "associated_studies", ] = study_str # if it is a test and no associated_studies provided, plug in a placeholder else: metadata_df["associated_studies"] = "['nmdc:sty-00-000001']" return metadata_df
[docs] def generate_mass_spectrometry( self, file_path: Path, instrument_id: str, sample_id: str, raw_data_id: str, study_id: str, processing_institution: str, mass_spec_configuration_id: str, start_date: str, end_date: str, CLIENT_ID: str, CLIENT_SECRET: str, instrument_instance_specifier: str = None, lc_config_id: str = None, calibration_ids: str = None, ) -> nmdc.DataGeneration: """ Create an NMDC DataGeneration object for mass spectrometry and mint an NMDC ID. Parameters ---------- file_path : Path File path of the mass spectrometry data. instrument_name : str Name of the instrument used for data generation. sample_id : str ID of the input sample associated with the data generation. raw_data_id : str ID of the raw data object associated with the data generation. study_id : str ID of the study associated with the data generation. processing_institution : str Name of the processing institution. mass_spec_config_name : str Name of the mass spectrometry configuration. start_date : str Start date of the data generation. end_date : str End date of the data generation. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. instrument_instance_specifier : str, optional Specifier for the instrument instance used in the data generation. lc_config_name : str, optional Name of the liquid chromatography configuration. calibration_ids : str, optional ID of the calibration information generated with the data. Default is None, indicating no calibration information. Returns ------- nmdc.DataGeneration An NMDC DataGeneration object with the provided metadata. Notes ----- This method uses the nmdc_api_utilities package to fetch IDs for the instrument and configurations. It also mints a new NMDC ID for the DataGeneration object. """ nmdc_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("MassSpectrometry"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "name": file_path.stem, "description": self.mass_spec_desc, "eluent_introduction_category": self.mass_spec_eluent_intro, "has_mass_spectrometry_configuration": mass_spec_configuration_id, "has_chromatography_configuration": lc_config_id, "analyte_category": self.analyte_category, "instrument_used": instrument_id, "has_input": [sample_id], "has_output": [raw_data_id], "associated_studies": study_id, "processing_institution": processing_institution, "start_date": start_date, "end_date": end_date, "instrument_instance_specifier": instrument_instance_specifier, "type": NmdcTypes.get("MassSpectrometry"), } if calibration_ids is not None: data_dict["generates_calibration"] = calibration_ids data_dict["provenance_metadata"] = self.provenance_metadata data_dict = self.clean_dict(data_dict) mass_spectrometry = nmdc.DataGeneration(**data_dict) return mass_spectrometry
[docs] def check_doj_urls(self, metadata_df: pd.DataFrame, url_columns: List) -> None: """ Check if the URLs in the input list already exist in the database. Parameters ---------- metadata_df : pd.DataFrame The DataFrame containing the metadata information. url_columns : List The list of columns in the DataFrame that contain URLs to check. Returns ------- None Raises ------ ValueError If any URL in the metadata DataFrame is invalid or inaccessible. FileNotFoundError If no files are found in the specified directory columns. """ urls = [] for col in url_columns: # Check if this is a raw data URL column (only handle raw data URLs) if col == "raw_data_url": # For raw data URL column, use the URLs directly column_urls = metadata_df[col].dropna().tolist() urls.extend(column_urls) # Check if the urls are valid for url in column_urls[ :5 ]: # Check up to 5 URLs, or fewer if the list is shorter try: response = requests.head(url) if response.status_code != 200: raise ValueError(f"URL {url} is not accessible.") time.sleep(2) # Wait 2 seconds between URL checks except requests.RequestException as e: raise ValueError(f"URL {url} is not accessible. Error: {e}") elif "directory" in col: # if its a directory, we need to gather all the files in the directory file_data_paths = [ list(Path(x).glob("**/*")) for x in metadata_df[col].to_list() ] # Add a check that the processed data directory is not empty if not any(file_data_paths): raise FileNotFoundError( f"No files found in {col}: {metadata_df[col]}" ) # line to flatten the list of lists file_data_paths = [ file for sublist in file_data_paths for file in sublist ] # if it is a directory, we need to grab the directory from the metadata file path if "process" in col: urls += [ self.process_data_url + str(x.parent.name) + "/" + str(x.name) for x in file_data_paths ] elif "raw" in col: urls += [ self.raw_data_url + str(x.parent.name) + "/" + str(x.name) for x in file_data_paths ] # check if the urls are valid for url in urls[ :5 ]: # Check up to 5 URLs, or fewer if the list is shorter try: response = requests.head(url) if response.status_code != 200: raise ValueError(f"URL {url} is not accessible.") time.sleep(2) # Wait 2 seconds between URL checks except requests.RequestException as e: raise ValueError(f"URL {url} is not accessible. Error: {e}") else: # if its a file, we need to gather the file paths file_data_paths = [Path(x) for x in metadata_df[col].to_list()] if "process" in col: urls += [ self.process_data_url + str(x.name) for x in file_data_paths ] elif "raw" in col: urls += [self.raw_data_url + str(x.name) for x in file_data_paths] # check if the urls are valid for url in urls[ :5 ]: # Check up to 5 URLs, or fewer if the list is shorter try: response = requests.head(url) if response.status_code != 200: raise ValueError(f"URL {url} is not accessible.") time.sleep(2) # Wait 2 seconds between URL checks except requests.RequestException as e: raise ValueError(f"URL {url} is not accessible. Error: {e}") doj_client = DataObjectSearch(env=ENV) resp = doj_client.get_batch_records( id_list=urls, search_field="url", fields="id", chunk_size=10 ) if resp: raise ValueError( f"The following URLs already exist in the database: {', '.join(resp)}" )
[docs] def check_manifest( self, metadata_df: pd.DataFrame, nmdc_database_inst: nmdc.Database, CLIENT_ID: str, CLIENT_SECRET: str, ) -> None: """ Check if the manifest id is passed in the metadata DataFrame. If not, generate a new manifest id and add it to the NMDC database instance. Parameters ---------- metadata_df : pd.DataFrame The DataFrame containing the metadata information. nmdc_database_inst : nmdc.Database The NMDC Database instance to add the manifest to if one needs to be generated. CLIENT_ID : str The client ID for the NMDC API. Used to mint a manifest id if one does not exist. CLIENT_SECRET : str The client secret for the NMDC API. Used to mint a manifest id if one does not exist. """ if ( "manifest_id" not in metadata_df.columns or metadata_df["manifest_id"].isnull().any() ): self.generate_manifest( metadata_df=metadata_df, nmdc_database_inst=nmdc_database_inst, CLIENT_ID=CLIENT_ID, CLIENT_SECRET=CLIENT_SECRET, )
[docs] def update_outputs( self, analysis_obj: object, raw_data_obj_id: str, parameter_data_id: str, processed_data_id_list: list, mass_spec_obj: object = None, rerun: bool = False, ) -> None: """ Update output references for Mass Spectrometry and Workflow Analysis objects. This method assigns the output references for a Mass Spectrometry object and a Workflow Execution Analysis object. It sets `mass_spec_obj.has_output` to the ID of `raw_data_obj` and `analysis_obj.has_output` to a list of processed data IDs. Parameters ---------- analysis_obj : object The Workflow Execution Analysis object to update (e.g., MetabolomicsAnalysis). raw_data_obj_id : str The Raw Data Object associated with the Mass Spectrometry. parameter_data_id : str ID of the data object representing the parameter data used for the analysis. processed_data_id_list : list List of IDs representing processed data objects associated with the Workflow Execution. mass_spec_obj : object , optional The Mass Spectrometry object to update. Optional for rerun cases. rerun : bool, optional If True, this indicates the run is a rerun, and the method will not set `mass_spec_obj.has_output` because there is not one. Default is False. Returns ------- None Notes ------ - Sets `mass_spec_obj.has_output` to [raw_data_obj.id]. - Sets `analysis_obj.has_output` to `processed_data_id_list`. """ if not rerun: # if it is not a rerun, set the mass spec object, otherwise there will not be a mass spec object mass_spec_obj.has_output = [raw_data_obj_id] analysis_obj.has_input = parameter_data_id # Update has_output with the actual processed data IDs # Note: has_output is initially set with placeholder in generate functions # For failed QC, has_output will be removed in the calling code if processed_data_id_list: analysis_obj.has_output = processed_data_id_list
[docs] def generate_mass_spec_fields( self, metadata_df: pd.DataFrame, ) -> None: """ Generate mass_spec_id, lc_config_name, and instrument_id information for each row in the metadata DataFrame. Add fields back to the DataFrame. Parameters ---------- metadata_df : pd.DataFrame The metadata DataFrame. Returns ------- None """ is_client = InstrumentSearch(env=ENV) cs_client = ConfigurationSearch(env=ENV) instrument_names = metadata_df["instrument_used"].unique() if "chromat_configuration_name" in metadata_df.columns: # If the column exists, get unique LC configuration names lc_config_names = metadata_df["chromat_configuration_name"].unique() else: # If the column does not exist, set it to an empty array lc_config_names = [] mass_spec_config_names = metadata_df["mass_spec_configuration_name"].unique() # loop through each variable and get the id for each name # and add it to the metadata_df for var in [instrument_names, lc_config_names, mass_spec_config_names]: for name in var: if var is instrument_names: # Get instrument id instrument_id = is_client.get_record_by_attribute( attribute_name="name", attribute_value=name, fields="id", exact_match=True, )[0]["id"] metadata_df.loc[ metadata_df["instrument_used"] == name, "instrument_id" ] = instrument_id elif var is lc_config_names: # Get LC configuration id try: lc_config_id = cs_client.get_record_by_attribute( attribute_name="name", attribute_value=name, fields="id", exact_match=True, )[0]["id"] except IndexError: raise ValueError( f"Configuration '{name}' not found in the database." ) metadata_df.loc[ metadata_df["chromat_configuration_name"] == name, "lc_config_id", ] = lc_config_id elif var is mass_spec_config_names: # Get mass spec configuration id mass_spec_id = cs_client.get_record_by_attribute( attribute_name="name", attribute_value=name, fields="id", exact_match=True, )[0]["id"] metadata_df.loc[ metadata_df["mass_spec_configuration_name"] == name, "mass_spec_configuration_id", ] = mass_spec_id
[docs] def generate_manifest( self, metadata_df: pd.DataFrame, nmdc_database_inst: nmdc.Database, CLIENT_ID: str, CLIENT_SECRET: str, ) -> None: """ Generate manifest information and data objects for each manifest. Add the manifest id to the metadata DataFrame. Parameters ---------- metadata_df : pd.DataFrame The metadata DataFrame. nmdc_database_inst : nmdc.Database The NMDC Database instance. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- None """ # check if manifest_name exists and has non-null values. If not, return if ( "manifest_name" not in metadata_df.columns or metadata_df["manifest_name"].isnull().all() ): print("No manifests will be added.") return # Get unique combinations of manifest_name and (if existing) manifest_id and create initial mapping if "manifest_id" not in metadata_df.columns: metadata_df["manifest_id"] = None manifest_id_mapping = {} manifest_names = metadata_df[["manifest_name", "manifest_id"]].drop_duplicates() for _, row in manifest_names.iterrows(): manifest_id_mapping[row["manifest_name"]] = row["manifest_id"] for manifest_name in tqdm( manifest_id_mapping.keys(), total=len(manifest_names), desc="Generating manifest information and data objects", ): # If there is already an manifest_id associated with a manifest_name if manifest_id_mapping[manifest_name] is not None: continue # mint id manifest_id = self.id_pool.get_id( nmdc_type=NmdcTypes.get("Manifest"), client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": manifest_id, "name": manifest_name, "type": NmdcTypes.get("Manifest"), "manifest_category": "instrument_run", } manifest = nmdc.Manifest(**data_dict) nmdc_database_inst.manifest_set.append(manifest) # Add manifest_id to the mapping dictionary manifest_id_mapping[manifest_name] = manifest.id # Update the metadata_df in a single operation metadata_df["manifest_id"] = metadata_df["manifest_name"].map( manifest_id_mapping )
[docs] def get_workflow_version(self, workflow_version_git_url: str) -> str: """ Get the version of the workflow from the git repository. Parameters ---------- repo_link : str The URL of the git repository containing the workflow version. Returns ------- str The version of the workflow. """ resp = requests.get(workflow_version_git_url) if resp.status_code == 200: # Regular expression to find the current_version match = re.search(r"current_version\s*=\s*([\d.]+)", resp.text) if match: current_version = match.group(1) return current_version else: logging.error( f"Failed to fetch the workflow version from the Git repository {workflow_version_git_url}" ) return None