Source code for src.gcms_metab_metadata_generator

# -*- coding: utf-8 -*-
from src.metadata_generator import NMDCMetadataGenerator
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
import logging
from nmdc_api_utilities.metadata import Metadata
import ast
import pandas as pd
from nmdc_api_utilities.data_object_search import DataObjectSearch
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
import nmdc_schema.nmdc as nmdc
from nmdc_api_utilities.minter import Minter
from typing import List
from src.data_classes import NmdcTypes, GCMSMetabWorkflowMetadata
import re


[docs] class GCMSMetabolomicsMetadataGenerator(NMDCMetadataGenerator): """ A class for generating NMDC metadata objects related to GC/MS metabolomics data. This class processes input metadata files, generates various NMDC objects, and produces a database dump in JSON format. Parameters ---------- metadata_file : str Path to the metadata CSV file. database_dump_json_path : str Path to the output JSON file for the NMDC database dump. raw_data_url : str Base URL for the raw data files. process_data_url : str Base URL for the processed data files. minting_config_creds : str Path to the minting configuration credentials file. calibration_standard : str, optional Calibration standard used for the data. Default is "fames". configuration_file_name : str, optional Name of the configuration file. Default is "emsl_gcms_corems_params.toml". Attributes ---------- unique_columns : List[str] List of columns used to check for uniqueness in the metadata before processing. mass_spec_desc : str Description of the mass spectrometry analysis. mass_spec_eluent_intro : str Eluent introduction category for mass spectrometry. analyte_category : str Category of the analyte. raw_data_obj_type : str Type of the raw data object. raw_data_obj_desc : str Description of the raw data object. workflow_analysis_name : str Name of the workflow analysis. workflow_description : str Description of the workflow. workflow_git_url : str URL of the workflow's Git repository. workflow_version : str Version of the workflow. workflow_category : str Category of the workflow. processed_data_category : str Category of the processed data. processed_data_object_type : str Type of the processed data object. processed_data_object_description : str """ # Metadata attributes unique_columns: List[str] = ["raw_data_file", "processed_data_file"] # Data Generation attributes mass_spec_desc: str = ( "Generation of mass spectrometry data by GC/MS for the analysis of metabolites." ) mass_spec_eluent_intro: str = "gas_chromatography" analyte_category: str = "metabolome" raw_data_obj_type: str = "GC-MS Raw Data" raw_data_obj_desc: str = ( "GC/MS low resolution raw data for metabolomics data acquisition." ) # Workflow metadata workflow_analysis_name: str = "GC/MS Metabolomics analysis" workflow_description: str = ( "Analysis of raw mass spectrometry data for the annotation of metabolites." ) workflow_git_url: str = ( "https://github.com/microbiomedata/metaMS/wdl/metaMS_gcms.wdl" ) workflow_version: str workflow_category: str = "gc_ms_metabolomics" # Processed data attributes processed_data_category: str = "processed_data" processed_data_object_type: str = "GC-MS Metabolomics Results" processed_data_object_description: str = "Metabolomics annotations as a result of a GC/MS metabolomics workflow activity." def __init__( self, metadata_file: str, database_dump_json_path: str, raw_data_url: str, process_data_url: str, minting_config_creds: str = None, workflow_version: str = None, calibration_standard: str = "fames", configuration_file_name: str = "emsl_gcms_corems_params.toml", ): super().__init__( metadata_file=metadata_file, database_dump_json_path=database_dump_json_path, raw_data_url=raw_data_url, process_data_url=process_data_url, ) # Set the workflow version, prioritizing user input, then fetching from the Git URL, and finally using a default. self.workflow_version = workflow_version or self.get_workflow_version( workflow_version_git_url="https://github.com/microbiomedata/metaMS/blob/master/.bumpversion.cfg" ) self.minting_config_creds = minting_config_creds # Calibration attributes self.calibration_standard = calibration_standard # Workflow Configuration attributes self.configuration_file_name = configuration_file_name
[docs] def rerun(self) -> None: """ Execute a re run of the metadata generation process for GC/MS metabolomics data. This method performs the following steps: 1. Initialize an NMDC Database instance. 3. Load and process metadata to create NMDC objects. 4. Generate Metabolomics Analysis and Processed Data objects. 5. Update outputs for the Metabolomics Analysis object. 6. Append generated objects to the NMDC Database. 7. Dump the NMDC Database to a JSON file. 8. Validate the JSON file using the NMDC API. Returns ------- None Raises ------ FileNotFoundError If the metadata file is not found. Notes ----- This method uses tqdm to display progress bars for the processing of calibration information and mass spectrometry metadata. """ wf_client = WorkflowExecutionSearch() client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) # Start NMDC database and make metadata dataframe nmdc_database_inst = self.start_nmdc_database() try: df = pd.read_csv(self.metadata_file) except FileNotFoundError: raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}") metadata_df = df.apply(lambda x: x.reset_index(drop=True)) # just check the process data urls self.check_doj_urls( metadata_df=metadata_df, url_columns=["processed_data_file"] ) # Get the configuration file data object id and add it to the metadata_df do_client = DataObjectSearch() config_do_id = do_client.get_record_by_attribute( attribute_name="name", attribute_value=self.configuration_file_name, fields="id", exact_match=True, )[0]["id"] # process workflow metadata for _, data in tqdm( metadata_df.iterrows(), total=metadata_df.shape[0], desc="Processing Remaining Metadata", ): raw_data_object_id = do_client.get_record_by_attribute( attribute_name="url", attribute_value=self.raw_data_url + Path(data["raw_data_file"]).name, fields="id", exact_match=True, )[0]["id"] # find the MetabolomicsAnalysis object - this is the old one prev_metab_analysis = wf_client.get_record_by_filter( filter=f'{{"has_input":"{raw_data_object_id}","type":"{NmdcTypes.MetabolomicsAnalysis}"}}', fields="id,uses_calibration,execution_resource,processing_institution,was_informed_by", all_pages=True, ) # find the most recent metabolomics analysis object by the max id prev_metab_analysis = max(prev_metab_analysis, key=lambda x: x["id"]) # increment the metab_id, find the last .digit group with a regex regex = r"(\d+)$" metab_analysis_id = re.sub( regex, lambda x: str(int(x.group(1)) + 1), prev_metab_analysis["id"], ) # Generate processed data object processed_data_object = self.generate_data_object( file_path=Path(data["processed_data_file"]), data_category=self.processed_data_category, data_object_type=self.processed_data_object_type, description=self.processed_data_object_description, base_url=self.process_data_url, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis_id, ) # need to generate a new metabolomics analysis object with the newly incremented id metab_analysis = self.generate_metabolomics_analysis( cluster_name=prev_metab_analysis["execution_resource"], raw_data_name=Path(data["raw_data_file"]).name, raw_data_id=raw_data_object_id, data_gen_id=prev_metab_analysis["was_informed_by"], processed_data_id=processed_data_object.id, parameter_data_id=config_do_id, processing_institution=prev_metab_analysis["processing_institution"], incremeneted_id=metab_analysis_id, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, calibration_id=prev_metab_analysis["uses_calibration"], ) # Update MetabolomicsAnalysis times based on processed data file processed_file = Path(data["processed_data_file"]) metab_analysis.started_at_time = datetime.fromtimestamp( processed_file.stat().st_ctime ).strftime("%Y-%m-%d %H:%M:%S") metab_analysis.ended_at_time = datetime.fromtimestamp( processed_file.stat().st_mtime ).strftime("%Y-%m-%d %H:%M:%S") has_inputs = [config_do_id, raw_data_object_id] self.update_outputs( analysis_obj=metab_analysis, raw_data_obj_id=raw_data_object_id, parameter_data_id=has_inputs, processed_data_id_list=[processed_data_object.id], rerun=True, ) nmdc_database_inst.data_object_set.append(processed_data_object) nmdc_database_inst.workflow_execution_set.append(metab_analysis) self.dump_nmdc_database(nmdc_database=nmdc_database_inst) api_metadata = Metadata() api_metadata.validate_json(self.database_dump_json_path) logging.info("Metadata processing re run completed.")
[docs] def run(self) -> None: """ Execute the metadata generation process for GC/MS metabolomics data. This method performs the following steps: 1. Initialize an NMDC Database instance. 2. Generate calibration information and data objects for each calibration file. 3. Load and process metadata to create NMDC objects. 4. Generate Mass Spectrometry (including metabolite identifications), Raw Data, Metabolomics Analysis, and Processed Data objects. 5. Update outputs for Mass Spectrometry and Metabolomics Analysis objects. 6. Append generated objects to the NMDC Database. 7. Dump the NMDC Database to a JSON file. 8. Validate the JSON file using the NMDC API. Returns ------- None Raises ------ ValueError If the calibration standard is not supported. Notes ----- This method uses tqdm to display progress bars for the processing of calibration information and mass spectrometry metadata. """ client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) if self.calibration_standard != "fames": raise ValueError("Only FAMES calibration is supported at this time.") # Start NMDC database and make metadata dataframe nmdc_database_inst = self.start_nmdc_database() df = self.load_metadata() metadata_df = df.apply(lambda x: x.reset_index(drop=True)) self.check_for_biosamples( metadata_df=metadata_df, nmdc_database_inst=nmdc_database_inst, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, ) self.check_doj_urls(metadata_df=metadata_df, url_columns=self.unique_columns) # Get the configuration file data object id and add it to the metadata_df do_client = DataObjectSearch() config_do_id = do_client.get_record_by_attribute( attribute_name="name", attribute_value=self.configuration_file_name, fields="id", exact_match=True, )[0]["id"] # check if there is an existing calibration_id in the metadata. If not, we need to generate them if ( "calibration_id" not in metadata_df.columns or metadata_df["calibration_id"].isnull().all() ): self.generate_calibration_id( metadata_df=metadata_df, nmdc_database_inst=nmdc_database_inst, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, ) # process workflow metadata for _, data in tqdm( metadata_df.iterrows(), total=metadata_df.shape[0], desc="Processing Remaining Metadata", ): workflow_metadata_obj = self.create_workflow_metadata(data) # Generate data generation / mass spectrometry object mass_spec = self.generate_mass_spectrometry( file_path=Path(workflow_metadata_obj.raw_data_file), instrument_name=workflow_metadata_obj.instrument_used, sample_id=workflow_metadata_obj.biosample_id, raw_data_id="nmdc:placeholder", study_id=workflow_metadata_obj.nmdc_study, processing_institution=workflow_metadata_obj.processing_institution, mass_spec_config_name=workflow_metadata_obj.mass_spec_config_name, lc_config_name=workflow_metadata_obj.chromat_config_name, start_date=workflow_metadata_obj.instrument_analysis_start_date, end_date=workflow_metadata_obj.instrument_analysis_end_date, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, calibration_id=workflow_metadata_obj.calibration_id, ) # Generate raw data object raw_data_object = self.generate_data_object( file_path=Path(workflow_metadata_obj.raw_data_file), data_category=self.raw_data_category, data_object_type=self.raw_data_obj_type, description=self.raw_data_obj_desc, base_url=self.raw_data_url, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=mass_spec.id, ) raw_data_object_id = raw_data_object.id # Generate metabolite identifications metabolite_identifications = self.generate_metab_identifications( processed_data_file=workflow_metadata_obj.processed_data_file ) # Generate metabolomics analysis object with metabolite identifications metab_analysis = self.generate_metabolomics_analysis( cluster_name=workflow_metadata_obj.execution_resource, raw_data_name=Path(workflow_metadata_obj.raw_data_file).name, raw_data_id=raw_data_object_id, data_gen_id=mass_spec.id, processed_data_id="nmdc:placeholder", parameter_data_id=config_do_id, processing_institution=workflow_metadata_obj.processing_institution, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, calibration_id=workflow_metadata_obj.calibration_id, metabolite_identifications=metabolite_identifications, ) # Generate processed data object processed_data_object = self.generate_data_object( file_path=Path(workflow_metadata_obj.processed_data_file), data_category=self.processed_data_category, data_object_type=self.processed_data_object_type, description=self.processed_data_object_description, base_url=self.process_data_url, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) # Update MetabolomicsAnalysis times based on processed data file processed_file = Path(workflow_metadata_obj.processed_data_file) metab_analysis.started_at_time = datetime.fromtimestamp( processed_file.stat().st_ctime ).strftime("%Y-%m-%d %H:%M:%S") metab_analysis.ended_at_time = datetime.fromtimestamp( processed_file.stat().st_mtime ).strftime("%Y-%m-%d %H:%M:%S") has_inputs = [config_do_id, raw_data_object_id] self.update_outputs( mass_spec_obj=mass_spec, analysis_obj=metab_analysis, raw_data_obj_id=raw_data_object_id, parameter_data_id=has_inputs, processed_data_id_list=[processed_data_object.id], ) nmdc_database_inst.data_generation_set.append(mass_spec) nmdc_database_inst.data_object_set.append(raw_data_object) nmdc_database_inst.data_object_set.append(processed_data_object) nmdc_database_inst.workflow_execution_set.append(metab_analysis) self.dump_nmdc_database(nmdc_database=nmdc_database_inst) api_metadata = Metadata() api_metadata.validate_json(self.database_dump_json_path) logging.info("Metadata processing completed.")
[docs] def generate_calibration_id( self, metadata_df: pd.DataFrame, nmdc_database_inst: nmdc.Database, CLIENT_ID: str, CLIENT_SECRET: str, ) -> None: """ Generate calibration information and data objects for each calibration file. Parameters ---------- metadata_df : pd.DataFrame The metadata DataFrame. nmdc_database_inst : nmdc.Database The NMDC Database instance. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. Returns ------- None """ # Get unique calibration file, create data object and Calibration information for each and attach associated ids to metadata_df calibration_files = metadata_df["calibration_file"].unique() for calibration_file in tqdm( calibration_files, total=len(calibration_files), desc="Generating calibration information and data objects", ): calibration_data_object = self.generate_data_object( file_path=Path(calibration_file), data_category=self.raw_data_category, data_object_type=self.raw_data_obj_type, description=self.raw_data_obj_desc, base_url=self.raw_data_url, CLIENT_ID=CLIENT_ID, CLIENT_SECRET=CLIENT_SECRET, ) nmdc_database_inst.data_object_set.append(calibration_data_object) calibration = self.generate_calibration( calibration_object=calibration_data_object, CLIENT_ID=CLIENT_ID, CLIENT_SECRET=CLIENT_SECRET, fames=self.calibration_standard, internal=False, ) nmdc_database_inst.calibration_set.append(calibration) # Add calibration information id to metadata_df metadata_df.loc[ metadata_df["calibration_file"] == calibration_file, "calibration_id" ] = calibration.id
[docs] def generate_calibration( self, calibration_object: dict, CLIENT_ID: str, CLIENT_SECRET: str, fames: bool = True, internal: bool = False, ) -> nmdc.CalibrationInformation: """ Generate a CalibrationInformation object for the NMDC Database. Parameters ---------- calibration_object : dict The calibration data object. CLIENT_ID : str The client ID for the NMDC API. CLIENT_SECRET : str The client secret for the NMDC API. fames : bool, optional Whether the calibration is for FAMES. Default is True. internal : bool, optional Whether the calibration is internal. Default is False. Returns ------- nmdc.CalibrationInformation A CalibrationInformation object for the NMDC Database. Notes ----- This method generates a CalibrationInformation object based on the calibration data object and the calibration type. Raises ------ ValueError If the calibration type is not supported. """ mint = Minter() if fames and not internal: nmdc_id = mint.mint( nmdc_type=NmdcTypes.CalibrationInformation, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, ) data_dict = { "id": nmdc_id, "type": NmdcTypes.CalibrationInformation, "name": f"GC/MS FAMES calibration ({calibration_object.name})", "description": f"Full scan GC/MS FAMES calibration run ({calibration_object.name}).", "internal_calibration": False, "calibration_target": "retention_index", "calibration_standard": "fames", "calibration_object": calibration_object.id, } calibration_information = nmdc.CalibrationInformation(**data_dict) return calibration_information else: raise ValueError( "Calibration type not implemented, only external FAMES calibration is currently supported." )
[docs] def create_workflow_metadata( self, row: dict[str, str] ) -> GCMSMetabWorkflowMetadata: """ Create a GCMSMetabWorkflowMetadata object from a dictionary of workflow metadata. Parameters ---------- row : dict[str, str] Dictionary containing metadata for a workflow. This is typically a row from the input metadata CSV file. Returns ------- GCMSMetabWorkflowMetadata A GCMSMetabWorkflowMetadata object populated with data from the input dictionary. Notes ----- The input dictionary is expected to contain the following keys: 'Processed Data Directory', 'Raw Data File', 'Raw Data Object Alt Id', 'mass spec configuration name', 'lc config name', 'instrument used', 'instrument analysis start date', 'instrument analysis end date', 'execution resource'. """ return GCMSMetabWorkflowMetadata( biosample_id=row["biosample_id"], nmdc_study=ast.literal_eval(row["biosample.associated_studies"]), processing_institution=row["processing_institution"], processed_data_file=row["processed_data_file"], raw_data_file=row["raw_data_file"], mass_spec_config_name=row["mass_spec_configuration_name"], chromat_config_name=row["chromat_configuration_name"], instrument_used=row["instrument_used"], instrument_analysis_start_date=row["instrument_analysis_start_date"], instrument_analysis_end_date=row["instrument_analysis_end_date"], execution_resource=row["execution_resource"], calibration_id=row["calibration_id"], )
[docs] def generate_metab_identifications( self, processed_data_file: str ) -> List[nmdc.MetaboliteIdentification]: """ Generate MetaboliteIdentification objects from processed data file. Parameters ---------- processed_data_file : str Path to the processed data file. Returns ------- List[nmdc.MetaboliteIdentification] List of MetaboliteIdentification objects generated from the processed data file. Notes ----- This method reads in the processed data file and generates MetaboliteIdentification objects, pulling out the best hit for each peak based on the highest "Similarity Score". """ # Open the file and read in the data as a pandas dataframe processed_data = pd.read_csv(processed_data_file) # Drop any rows with missing similarity scores processed_data = processed_data.dropna(subset=["Similarity Score"]) # Group by "Peak Index" and find the best hit for each peak based on the highest "Similarity Score" best_hits = processed_data.groupby("Peak Index").apply( lambda x: x.loc[x["Similarity Score"].idxmax()] ) metabolite_identifications = [] for index, best_hit in best_hits.iterrows(): # Check if the best hit has a Chebi ID, if not, do not create a MetaboliteIdentification object if pd.isna(best_hit["Chebi ID"]): continue chebi_id = "chebi:" + str(int(best_hit["Chebi ID"])) # Prepare KEGG Compound ID as an alternative identifier alt_ids = [] if not pd.isna(best_hit["Kegg Compound ID"]): # Check for | in Kegg Compound ID and split if necessary if "|" in best_hit["Kegg Compound ID"]: alt_ids.extend( [ "kegg:" + x.strip() for x in best_hit["Kegg Compound ID"].split("|") ] ) else: alt_ids.append("kegg:" + best_hit["Kegg Compound ID"]) alt_ids = list(set(alt_ids)) data_dict = { "metabolite_identified": chebi_id, "alternative_identifiers": alt_ids, "type": NmdcTypes.MetaboliteIdentification, "highest_similarity_score": best_hit["Similarity Score"], } metabolite_identification = nmdc.MetaboliteIdentification(**data_dict) metabolite_identifications.append(metabolite_identification) return metabolite_identifications