Source code for src.metadata_generator

# -*- coding: utf-8 -*-
import logging
from pathlib import Path
from datetime import datetime
import re
from typing import List, Dict
from abc import ABC
import pandas as pd
import hashlib
import nmdc_schema.nmdc as nmdc
from linkml_runtime.dumpers import json_dumper
from src.metadata_parser import MetadataParser
from src.data_classes import NmdcTypes
from nmdc_api_utilities.instrument_search import InstrumentSearch
from nmdc_api_utilities.configuration_search import ConfigurationSearch
from nmdc_api_utilities.biosample_search import BiosampleSearch
from nmdc_api_utilities.study_search import StudySearch
from nmdc_api_utilities.data_object_search import DataObjectSearch
from nmdc_api_utilities.minter import Minter
import ast
import numpy as np
import toml
import os
import requests

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# TODO: Update script to for Sample Processing - has_input for MassSpectrometry will have to be changed to be a processed sample id - not biosample id


[docs] class NMDCMetadataGenerator(ABC): """ Abstract class for generating NMDC metadata objects using provided metadata files and configuration. Parameters ---------- metadata_file : str Path to the input CSV metadata file. database_dump_json_path : str Path where the output database dump JSON file will be saved. raw_data_url : str Base URL for the raw data files. process_data_url : str Base URL for the processed data files. """ def __init__( self, metadata_file: str, database_dump_json_path: str, raw_data_url: str, process_data_url: str, ): self.metadata_file = metadata_file self.database_dump_json_path = database_dump_json_path self.raw_data_url = raw_data_url self.process_data_url = process_data_url self.raw_data_category = "instrument_data"
[docs] def load_credentials(self, config_file: str = None) -> tuple: """ Load the client ID and secret from the environment or a configuration file. Parameters ---------- config_file: str The path to the configuration file. Returns ------- tuple A tuple containing the client ID and client secret. """ client_id = os.getenv("CLIENT_ID") client_secret = os.getenv("CLIENT_SECRET") if not client_id or not client_secret: if config_file: config_file = Path(config_file) try: config = toml.load(config_file) client_id = config.get("CLIENT_ID") client_secret = config.get("CLIENT_SECRET") except FileNotFoundError: raise ValueError(f"Config file {config_file} not found.") except toml.TomlDecodeError: raise ValueError("Error decoding TOML from the config file.") except KeyError: raise ValueError( "Config file must contain CLIENT_ID and CLIENT_SECRET. If generating biosample ids BIO_API_KEY is required." ) if not client_id or not client_secret: raise ValueError( "CLIENT_ID and CLIENT_SECRET must be set either in environment variables or passed in the config file.\nThey must be named CLIENT_ID and CLIENT_SECRET respectively." ) return client_id, client_secret
[docs] def load_bio_credentials(self, config_file: str = None) -> str: """ Load bio ontology API key from the environment or a configuration file. Parameters ---------- config_file: str The path to the configuration file. Returns ------- str The bio ontology API key. Raises ------ FileNotFoundError If the configuration file is not found, and the API key is not set in the environment. ValueError If the configuration file is not valid or does not contain the API key. """ BIO_API_KEY = os.getenv("BIO_API_KEY") if not BIO_API_KEY: if config_file: config_file = Path(config_file) try: config = toml.load(config_file) BIO_API_KEY = config.get("BIO_API_KEY") except FileNotFoundError: raise FileNotFoundError(f"Config file {config_file} not found.") except toml.TomlDecodeError: raise ValueError("Error decoding TOML from the config file.") except KeyError: raise ValueError( "Config file must contain BIO_API_KEY to generate biosample ids." ) if not BIO_API_KEY: raise ValueError( "BIO_API_KEY must be set either in environment variable or passed in the config file. It must be named BIO_API_KEY." ) return BIO_API_KEY
[docs] def start_nmdc_database(self) -> nmdc.Database: """ Initialize and return a new NMDC Database instance. Returns ------- nmdc.Database A new instance of an NMDC Database. Notes ----- This method simply creates and returns a new instance of the NMDC Database. It does not perform any additional initialization or configuration. """ return nmdc.Database()
[docs] def load_metadata(self) -> pd.core.frame.DataFrame: """ Load and group workflow metadata from a CSV file. This method reads the metadata CSV file, checks for uniqueness in specified columns, checks that biosamples exist, and groups the data by biosample ID. Returns ------- pd.core.frame.DataFrame A DataFrame containing the loaded and grouped metadata. Raises ------ FileNotFoundError If the `metadata_file` does not exist. ValueError If values in columns 'Raw Data File', and 'Processed Data Directory' are not unique. Notes ----- See example_metadata_file.csv in this directory for an example of the expected input file format. """ try: metadata_df = pd.read_csv(self.metadata_file) except FileNotFoundError: raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}") # Check for uniqueness in specified columns columns_to_check = self.unique_columns for column in columns_to_check: if not metadata_df[column].is_unique: raise ValueError(f"Duplicate values found in column '{column}'.") # Check that all biosamples exist biosample_ids = metadata_df["biosample_id"].unique() bs_client = BiosampleSearch() if pd.isna(biosample_ids)[0] == np.False_: if not bs_client.check_ids_exist(list(biosample_ids)): raise ValueError("Biosample IDs do not exist in the collection.") # Check that all studies exist if "biosample.associated_studies" in metadata_df.columns: # Convert string to list, make sure the values are unique, conmvert try: study_ids = ast.literal_eval( metadata_df["biosample.associated_studies"].iloc[0] ) except SyntaxError: study_ids = [metadata_df["biosample.associated_studies"].iloc[0]] ss_client = StudySearch() if not ss_client.check_ids_exist(study_ids): raise ValueError("Study IDs do not exist in the collection.") return metadata_df
[docs] def clean_dict(self, dict: Dict) -> Dict: """ Clean the dictionary by removing keys with empty or None values. Parameters ---------- dict : Dict The dictionary to be cleaned. Returns ------- Dict A new dictionary with keys removed where the values are None, an empty string, or a string with only whitespace. """ return {k: v for k, v in dict.items() if v not in [None, "", ""]}
[docs] def generate_mass_spectrometry( self, file_path: Path, instrument_name: str, sample_id: str, raw_data_id: str, study_id: str, processing_institution: str, mass_spec_config_name: str, start_date: str, end_date: str, CLIENT_ID: str, CLIENT_SECRET: str, lc_config_name: str = None, calibration_id: str = None, ) -> nmdc.DataGeneration: """ Create an NMDC DataGeneration object for mass spectrometry and mint an NMDC ID. Parameters ---------- file_path : Path File path of the mass spectrometry data. instrument_name : str Name of the instrument used for data generation. sample_id : str ID of the input sample associated with the data generation. raw_data_id : str ID of the raw data object associated with the data generation. study_id : str ID of the study associated with the data generation. processing_institution : str Name of the processing institution. mass_spec_config_name : str Name of the mass spectrometry configuration. start_date : str Start date of the data generation. end_date : str End date of the data generation. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. lc_config_name : str Name of the liquid chromatography configuration. calibration_id : str, optional ID of the calibration information generated with the data. Default is None, indicating no calibration information. Returns ------- nmdc.DataGeneration An NMDC DataGeneration object with the provided metadata. Notes ----- This method uses the nmdc_api_utilities package to fetch IDs for the instrument and configurations. It also mints a new NMDC ID for the DataGeneration object. """ is_client = InstrumentSearch() cs_client = ConfigurationSearch() minter = Minter() nmdc_id = minter.mint( nmdc_type=NmdcTypes.MassSpectrometry, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) instrument_id = is_client.get_record_by_attribute( attribute_name="name", attribute_value=instrument_name, fields="id", exact_match=True, )[0]["id"] if lc_config_name: lc_config_id = cs_client.get_record_by_attribute( attribute_name="name", attribute_value=lc_config_name, fields="id", exact_match=True, )[0]["id"] else: lc_config_id = "" mass_spec_id = cs_client.get_record_by_attribute( attribute_name="name", attribute_value=mass_spec_config_name, fields="id", exact_match=True, )[0]["id"] data_dict = { "id": nmdc_id, "name": file_path.stem, "description": self.mass_spec_desc, "add_date": datetime.now().strftime("%Y-%m-%d"), "eluent_introduction_category": self.mass_spec_eluent_intro, "has_mass_spectrometry_configuration": mass_spec_id, "has_chromatography_configuration": lc_config_id, "analyte_category": self.analyte_category, "instrument_used": instrument_id, "has_input": [sample_id], "has_output": [raw_data_id], "associated_studies": study_id, "processing_institution": processing_institution, "start_date": start_date, "end_date": end_date, "type": NmdcTypes.MassSpectrometry, } if calibration_id is not None: data_dict["generates_calibration"] = calibration_id data_dict = self.clean_dict(data_dict) mass_spectrometry = nmdc.DataGeneration(**data_dict) return mass_spectrometry
[docs] def generate_data_object( self, file_path: Path, data_category: str, data_object_type: str, description: str, base_url: str, CLIENT_ID: str, CLIENT_SECRET: str, was_generated_by: str = None, alternative_id: str = None, ) -> nmdc.DataObject: """ Create an NMDC DataObject with metadata from the specified file and details. This method generates an NMDC DataObject and assigns it a unique NMDC ID. The DataObject is populated with metadata derived from the provided file and input parameters. Parameters ---------- file_path : Path Path to the file representing the data object. The file's name is used as the `name` attribute. data_category : str Category of the data object (e.g., 'instrument_data'). data_object_type : str Type of the data object (e.g., 'LC-DDA-MS/MS Raw Data'). description : str Description of the data object. base_url : str Base URL for accessing the data object, to which the file name is appended to form the complete URL. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. was_generated_by : str, optional ID of the process or entity that generated the data object (e.g., the DataGeneration id or the MetabolomicsAnalysis id). alternative_id : str, optional An optional alternative identifier for the data object. Returns ------- nmdc.DataObject An NMDC DataObject instance with the specified metadata. Notes ----- This method calculates the MD5 checksum of the file, which may be time-consuming for large files. """ mint = Minter() nmdc_id = mint.mint( nmdc_type=NmdcTypes.DataObject, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "data_category": data_category, "data_object_type": data_object_type, "name": file_path.name, "description": description, "file_size_bytes": file_path.stat().st_size, "md5_checksum": hashlib.md5(file_path.open("rb").read()).hexdigest(), "url": base_url + str(file_path.name), "type": NmdcTypes.DataObject, "was_generated_by": was_generated_by, "alternative_identifiers": alternative_id, } # If any of the data_dict values are None or empty strings, remove them data_dict = self.clean_dict(data_dict) data_object = nmdc.DataObject(**data_dict) return data_object
[docs] def generate_metabolomics_analysis( self, cluster_name: str, raw_data_name: str, raw_data_id: str, data_gen_id: str, processed_data_id: str, parameter_data_id: str, processing_institution: str, CLIENT_ID: str, CLIENT_SECRET: str, calibration_id: str = None, incremeneted_id: str = None, metabolite_identifications: List[nmdc.MetaboliteIdentification] = None, type: str = NmdcTypes.MetabolomicsAnalysis, ) -> nmdc.MetabolomicsAnalysis: """ Create an NMDC MetabolomicsAnalysis object with metadata for a workflow analysis. This method generates an NMDC MetabolomicsAnalysis object, including details about the analysis, the processing institution, and relevant workflow information. Parameters ---------- cluster_name : str Name of the cluster or computing resource used for the analysis. raw_data_name : str Name of the raw data file that was analyzed. raw_data_id : str ID of the raw data object that was analyzed. data_gen_id : str ID of the DataGeneration object that generated the raw data. processed_data_id : str ID of the processed data resulting from the analysis. parameter_data_id : str ID of the parameter data object used for the analysis. processing_institution : str Name of the institution where the analysis was performed. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. calibration_id : str, optional ID of the calibration information used for the analysis. Default is None, indicating no calibration information. incremeneted_id : str, optional An optional incremented ID for the MetabolomicsAnalysis object. If not provided, a new NMDC ID will be minted. metabolite_identifications : List[nmdc.MetaboliteIdentification], optional List of MetaboliteIdentification objects associated with the analysis. Default is None, which indicates no metabolite identifications. type : str, optional The type of the analysis. Default is NmdcTypes.MetabolomicsAnalysis. Returns ------- nmdc.MetabolomicsAnalysis An NMDC MetabolomicsAnalysis instance with the provided metadata. Notes ----- The 'started_at_time' and 'ended_at_time' fields are initialized with placeholder values and should be updated with actual timestamps later when the processed files are iterated over in the run method. """ if incremeneted_id is None: # If no incremented id is provided, mint a new one mint = Minter() nmdc_id = ( mint.mint( nmdc_type=NmdcTypes.MetabolomicsAnalysis, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) + ".1" ) data_dict = { "id": incremeneted_id if incremeneted_id is not None else nmdc_id, "name": f"{self.workflow_analysis_name} for {raw_data_name}", "description": self.workflow_description, "processing_institution": processing_institution, "execution_resource": cluster_name, "git_url": self.workflow_git_url, "version": self.workflow_version, "was_informed_by": data_gen_id, "has_input": [raw_data_id, parameter_data_id], "has_output": [processed_data_id], "started_at_time": "placeholder", "ended_at_time": "placeholder", "type": type, "metabolomics_analysis_category": self.workflow_category, } if calibration_id is not None: data_dict["uses_calibration"] = calibration_id if metabolite_identifications is not None: data_dict["has_metabolite_identifications"] = metabolite_identifications data_dict = self.clean_dict(data_dict) metab_analysis = nmdc.MetabolomicsAnalysis(**data_dict) return metab_analysis
[docs] def update_outputs( self, analysis_obj: object, raw_data_obj_id: str, parameter_data_id: str, processed_data_id_list: list, mass_spec_obj: object = None, rerun: bool = False, ) -> None: """ Update output references for Mass Spectrometry and Workflow Analysis objects. This method assigns the output references for a Mass Spectrometry object and a Workflow Execution Analysis object. It sets `mass_spec_obj.has_output` to the ID of `raw_data_obj` and `analysis_obj.has_output` to a list of processed data IDs. Parameters ---------- analysis_obj : object The Workflow Execution Analysis object to update (e.g., MetabolomicsAnalysis). raw_data_obj_id : str The Raw Data Object associated with the Mass Spectrometry. parameter_data_id : str ID of the data object representing the parameter data used for the analysis. processed_data_id_list : list List of IDs representing processed data objects associated with the Workflow Execution. mass_spec_obj : object , optional The Mass Spectrometry object to update. Optional for rerun cases. rerun : bool, optional If True, this indicates the run is a rerun, and the method will not set `mass_spec_obj.has_output` because there is not one. Default is False. Returns ------- None Notes ------ - Sets `mass_spec_obj.has_output` to [raw_data_obj.id]. - Sets `analysis_obj.has_output` to `processed_data_id_list`. """ if not rerun: # if it is not a rerun, set the mass spec object, otherwise there will not be a mass spec object mass_spec_obj.has_output = [raw_data_obj_id] analysis_obj.has_input = parameter_data_id analysis_obj.has_output = processed_data_id_list
[docs] def dump_nmdc_database(self, nmdc_database: nmdc.Database) -> None: """ Dump the NMDC database to a JSON file. This method serializes the NMDC Database instance to a JSON file at the specified path. Parameters ---------- nmdc_database : nmdc.Database The NMDC Database instance to dump. Returns ------- None Side Effects ------------ Writes the database content to the file specified by `self.database_dump_json_path`. """ json_dumper.dump(nmdc_database, self.database_dump_json_path) logging.info("Database successfully dumped in %s", self.database_dump_json_path)
[docs] def handle_biosample(self, row: pd.Series) -> tuple: """ Process biosample information from metadata row. Checks if a biosample ID exists in the row. If it does, returns the existing biosample information. If not, generates a new biosample. Parameters ---------- row : pd.Series A row from the metadata DataFrame containing biosample information Returns ------- tuple A tuple containing: - emsl_metadata : Dict Parsed metadata from input csv row - biosample_id : str The ID of the biosample (existing or newly generated) """ parser = MetadataParser() emsl_metadata = parser.parse_biosample_metadata(row) biosample_id = emsl_metadata["biosample_id"] return emsl_metadata, biosample_id
[docs] def check_for_biosamples( self, metadata_df: pd.DataFrame, nmdc_database_inst: nmdc.Database, CLIENT_ID: str, CLIENT_SECRET: str, ) -> None: """ This method verifies the presence of the 'biosample_id' in the provided metadata DataFrame. It will loop over each row to verify the presence of the 'biosample_id', giving the option for some rows to need generation and some to already exist. If the 'biosample_id' is missing, it checks for the presence of required columns to generate a new biosample_id using the NMDC API. If they are all there, the function calls the dynam_parse_biosample_metadata method from the MetadataParser class to create the JSON for the biosample. If the required columns are missing and there is no biosample_id - it raises a ValueError. After the biosample_id is generated,it updates the DataFrame row with the newly minted biosample_id and the NMDC database instance with the new biosample JSON. Parameters ---------- metadata_df : pd.DataFrame the dataframe containing the metadata information. nmdc_database_inst : nmdc.Database The NMDC Database instance to add the biosample to if one needs to be generated. CLIENT_ID : str The client ID for the NMDC API. Used to mint a biosmaple id if one does not exist. CLIENT_SECRET : str The client secret for the NMDC API. Used to mint a biosmaple id if one does not exist. Returns ------- None Raises ------ ValueError If the 'biosample.name' column is missing and 'biosample_id' is empty. If any required columns for biosample generation are missing. """ parser = MetadataParser() metadata_df["biosample_id"] = metadata_df["biosample_id"].astype("object") if "biosample.name" not in metadata_df.columns: # if biosample.name does not exists check if biosample_id is empty. biosample_id should not be empty if biosample.name does not exist if len(metadata_df["biosample_id"]) == len( metadata_df.dropna(subset=["biosample_id"]) ): return else: raise ValueError( "biosample.name column is missing from the metadata file. Please provide biosample.name or biosample_id for each row. biosample.name is required to generate new biosample_id." ) rows = metadata_df.groupby("biosample.name") for _, group in rows: row = group.iloc[0] if pd.isnull(row.get("biosample_id")): required_columns = [ "biosample.name", "biosample.associated_studies", "biosample.env_broad_scale", "biosample.env_local_scale", "biosample.env_medium", ] # Check for the existence of all required columns missing_columns = [ col for col in required_columns if col not in metadata_df.columns ] if missing_columns: raise ValueError( f"The following required columns are missing from the DataFrame: {', '.join(missing_columns)}" ) bio_api_key = self.load_bio_credentials( config_file=self.minting_config_creds ) # Generate biosamples if no biosample_id in spreadsheet biosample_metadata = parser.dynam_parse_biosample_metadata( row=row, bio_api_key=bio_api_key ) biosample = self.generate_biosample( biosamp_metadata=biosample_metadata, CLIENT_ID=CLIENT_ID, CLIENT_SECRET=CLIENT_SECRET, ) biosample_id = biosample.id metadata_df.loc[ metadata_df["biosample.name"] == row["biosample.name"], "biosample_id", ] = biosample_id nmdc_database_inst.biosample_set.append(biosample)
[docs] def check_doj_urls(self, metadata_df: pd.DataFrame, url_columns: List) -> None: """ Check if the URLs in the input list already exist in the database. Parameters ---------- metadata_df : pd.DataFrame The DataFrame containing the metadata information. url_columns : List The list of columns in the DataFrame that contain URLs to check. Returns ------- None Raises ------ ValueError If any URL in the metadata DataFrame is invalid or inaccessible. FileNotFoundError If no files are found in the specified directory columns. """ urls = [] for col in url_columns: if "directory" in col: # if its a directory, we need to gather all the files in the directory file_data_paths = [ list(Path(x).glob("**/*")) for x in metadata_df[col].to_list() ] # Add a check that the processed data directory is not empty if not any(file_data_paths): raise FileNotFoundError( f"No files found in {col}: " f"{metadata_df[col]}" ) file_data_paths = [ file for sublist in file_data_paths for file in sublist ] if "process" in col: urls += [ self.process_data_url + str(x.relative_to(Path(x).parents[1])) for x in file_data_paths ] elif "raw" in col: urls += [self.raw_data_url + str(x.name) for x in file_data_paths] # check if the urls are valid for url in urls[ :5 ]: # Check up to 5 URLs, or fewer if the list is shorter try: response = requests.head(url) if response.status_code != 200: raise ValueError(f"URL {url} is not accessible.") except requests.RequestException as e: raise ValueError(f"URL {url} is not accessible. Error: {e}") else: # if its a file, we need to gather the file paths file_data_paths = [Path(x) for x in metadata_df[col].to_list()] if "process" in col: urls += [ self.process_data_url + str(x.name) for x in file_data_paths ] elif "raw" in col: urls += [self.raw_data_url + str(x.name) for x in file_data_paths] # check if the urls are valid for url in urls[ :5 ]: # Check up to 5 URLs, or fewer if the list is shorter try: response = requests.head(url) if response.status_code != 200: raise ValueError(f"URL {url} is not accessible.") except requests.RequestException as e: raise ValueError(f"URL {url} is not accessible. Error: {e}") doj_client = DataObjectSearch() resp = doj_client.get_batch_records( id_list=urls, search_field="url", fields="id" ) if resp: raise ValueError( f"The following URLs already exist in the database: {', '.join(resp)}" )
[docs] def generate_biosample( self, biosamp_metadata: dict, CLIENT_ID: str, CLIENT_SECRET: str ) -> nmdc.Biosample: """ Mint a biosample id from the given metadata and create a biosample instance. Parameters ---------- biosamp_metadata : dict The metadata object containing biosample information. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- nmdc.Biosample The generated biosample instance. """ mint = Minter() # If no biosample id in spreadsheet, mint biosample ids if biosamp_metadata["id"] is None: biosamp_metadata["id"] = mint.mint( nmdc_type=NmdcTypes.Biosample, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) # Filter dictionary to remove any key/value pairs with None as the value biosamp_dict = self.clean_dict(biosamp_metadata) biosample_object = nmdc.Biosample(**biosamp_dict) return biosample_object
[docs] def get_workflow_version(self, workflow_version_git_url: str) -> str: """ Get the version of the workflow from the git repository. Parameters ---------- repo_link : str The URL of the git repository containing the workflow version. Returns ------- str The version of the workflow. """ resp = requests.get(workflow_version_git_url) if resp.status_code == 200: # Regular expression to find the current_version match = re.search(r"current_version\s*=\s*([\d.]+)", resp.text) if match: current_version = match.group(1) return current_version else: logging.warning( f"Failed to fetch the workflow version from the Git repository {workflow_version_git_url}" ) return None