Source code for src.lcms_metadata_generator

# -*- coding: utf-8 -*-
from src.metadata_generator import NMDCMetadataGenerator
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
import logging
from nmdc_api_utilities.metadata import Metadata
import ast
import pandas as pd
from src.data_classes import LCMSLipidWorkflowMetadata
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
from nmdc_api_utilities.data_object_search import DataObjectSearch
from src.data_classes import NmdcTypes
import re


[docs] class LCMSMetadataGenerator(NMDCMetadataGenerator): """ A class for generating NMDC metadata objects using provided metadata files and configuration for LC-MS data. This class processes input metadata files, generates various NMDC objects, and produces a database dump in JSON format. """ def __init__( self, metadata_file: str, database_dump_json_path: str, raw_data_url: str, process_data_url: str, ): super().__init__( metadata_file=metadata_file, database_dump_json_path=database_dump_json_path, raw_data_url=raw_data_url, process_data_url=process_data_url, )
[docs] def run(self) -> None: """ Execute the metadata generation process for lipidomics data. This method performs the following steps: 1. Initialize an NMDC Database instance. 2. Load and process metadata to create NMDC objects. 3. Generate Mass Spectrometry, Raw Data, Metabolomics Analysis, and Processed Data objects. 4. Update outputs for Mass Spectrometry and Metabolomics Analysis objects. 5. Append generated objects to the NMDC Database. 6. Dump the NMDC Database to a JSON file. 7. Validate the JSON file using the NMDC API. Returns ------- None Raises ------ FileNotFoundError If the processed data directory is empty or not found. ValueError If the number of files in the processed data directory is not as expected Notes ----- This method uses tqdm to display progress bars for the processing of biosamples and mass spectrometry metadata. """ client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) nmdc_database_inst = self.start_nmdc_database() metadata_df = self.load_metadata() self.check_for_biosamples( metadata_df=metadata_df, nmdc_database_inst=nmdc_database_inst, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, ) # check for duplicate doj urls in the database self.check_doj_urls(metadata_df=metadata_df, url_columns=self.unique_columns) for _, data in tqdm( metadata_df.iterrows(), total=metadata_df.shape[0], desc="Processing LCMS biosamples", ): workflow_metadata = self.create_workflow_metadata(data) mass_spec = self.generate_mass_spectrometry( file_path=Path(workflow_metadata.raw_data_file), instrument_name=workflow_metadata.instrument_used, sample_id=data["biosample_id"], raw_data_id="nmdc:placeholder", study_id=ast.literal_eval(data["biosample.associated_studies"]), processing_institution=data["processing_institution"], mass_spec_config_name=workflow_metadata.mass_spec_config_name, lc_config_name=workflow_metadata.lc_config_name, start_date=workflow_metadata.instrument_analysis_start_date, end_date=workflow_metadata.instrument_analysis_end_date, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, ) raw_data_object = self.generate_data_object( file_path=Path(workflow_metadata.raw_data_file), data_category=self.raw_data_category, data_object_type=self.raw_data_obj_type, description=self.raw_data_obj_desc, base_url=self.raw_data_url, CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=mass_spec.id, ) metab_analysis = self.generate_metabolomics_analysis( cluster_name=workflow_metadata.execution_resource, raw_data_name=Path(workflow_metadata.raw_data_file).name, raw_data_id=raw_data_object.id, data_gen_id=mass_spec.id, processed_data_id="nmdc:placeholder", parameter_data_id="nmdc:placeholder", processing_institution=data["processing_institution"], CLIENT_ID=client_id, CLIENT_SECRET=client_secret, ) # list all paths in the processed data directory processed_data_paths = list( Path(workflow_metadata.processed_data_dir).glob("**/*") ) # Add a check that the processed data directory is not empty if not any(processed_data_paths): raise FileNotFoundError( f"No files found in processed data directory: " f"{workflow_metadata.processed_data_dir}" ) # Check that there is a .csv, .hdf5, and .toml file in the processed data directory and no other files processed_data_paths = [x for x in processed_data_paths if x.is_file()] if len(processed_data_paths) != 3: raise ValueError( f"Expected 3 files in the processed data directory {processed_data_paths}, found {len(processed_data_paths)}." ) processed_data = [] for file in processed_data_paths: file_type = file.suffixes if file_type: file_type = file_type[0].lstrip(".") if file_type == "toml": # Generate a data object for the parameter data processed_data_object_config = self.generate_data_object( file_path=file, data_category=self.wf_config_process_data_category, data_object_type=self.wf_config_process_data_obj_type, description=self.wf_config_process_data_description, base_url=self.process_data_url + Path(workflow_metadata.processed_data_dir).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append( processed_data_object_config ) parameter_data_id = processed_data_object_config.id elif file_type == "csv": # Generate a data object for the annotated data processed_data_object_annot = self.generate_data_object( file_path=file, data_category=self.no_config_process_data_category, data_object_type=self.no_config_process_data_obj_type, description=self.csv_process_data_description, base_url=self.process_data_url + Path(workflow_metadata.processed_data_dir).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append( processed_data_object_annot ) processed_data.append(processed_data_object_annot.id) elif file_type == "hdf5": # Generate a data object for the HDF5 processed data processed_data_object = self.generate_data_object( file_path=file, data_category=self.no_config_process_data_category, data_object_type=self.hdf5_process_data_obj_type, description=self.hdf5_process_data_description, base_url=self.process_data_url + Path(workflow_metadata.processed_data_dir).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append(processed_data_object) processed_data.append(processed_data_object.id) # Update MetabolomicsAnalysis times based on HDF5 file metab_analysis.started_at_time = datetime.fromtimestamp( file.stat().st_ctime ).strftime("%Y-%m-%d %H:%M:%S") metab_analysis.ended_at_time = datetime.fromtimestamp( file.stat().st_mtime ).strftime("%Y-%m-%d %H:%M:%S") else: raise ValueError(f"Unexpected file type found for file {file}.") # Check that all processed data objects were created if ( processed_data_object_config is None or processed_data_object_annot is None or processed_data_object is None ): raise ValueError( f"Not all processed data objects were created for {workflow_metadata.processed_data_dir}." ) has_input = [parameter_data_id, raw_data_object.id] self.update_outputs( mass_spec_obj=mass_spec, analysis_obj=metab_analysis, raw_data_obj_id=raw_data_object.id, parameter_data_id=has_input, processed_data_id_list=processed_data, rerun=False, ) nmdc_database_inst.data_generation_set.append(mass_spec) nmdc_database_inst.data_object_set.append(raw_data_object) nmdc_database_inst.workflow_execution_set.append(metab_analysis) # Set processed data objects to none for next iteration ( processed_data_object_config, processed_data_object_annot, processed_data_object, ) = ( None, None, None, ) self.dump_nmdc_database(nmdc_database=nmdc_database_inst) api_metadata = Metadata() api_metadata.validate_json(self.database_dump_json_path) logging.info("Metadata processing completed.")
[docs] def rerun(self) -> None: """ Execute a rerun of the metadata generation process for metabolomics data. This method performs the following steps: 1. Initialize an NMDC Database instance. 2. Load and process metadata to create NMDC objects. 3. Generate Metabolomics Analysis and Processed Data objects. 4. Update outputs for the Metabolomics Analysis object. 5. Append generated objects to the NMDC Database. 6. Dump the NMDC Database to a JSON file. 7. Validate the JSON file using the NMDC API. Returns ------- None Raises ------ FileNotFoundError If the processed data directory is empty or not found. ValueError If the number of files in the processed data directory is not as expected. Notes ----- This method uses tqdm to display progress bars for the processing of biosamples and mass spectrometry metadata. """ client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) wf_client = WorkflowExecutionSearch() do_client = DataObjectSearch() client_id, client_secret = self.load_credentials( config_file=self.minting_config_creds ) nmdc_database_inst = self.start_nmdc_database() try: df = pd.read_csv(self.metadata_file) except FileNotFoundError: raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}") metadata_df = df.apply(lambda x: x.reset_index(drop=True)) # check for duplicate doj urls in the database self.check_doj_urls( metadata_df=metadata_df, url_columns=["processed_data_directory"] ) for _, data in tqdm( metadata_df.iterrows(), total=metadata_df.shape[0], desc="Processing LCMS biosamples", ): # workflow_metadata = self.create_workflow_metadata(data) raw_data_object_id = do_client.get_record_by_attribute( attribute_name="url", attribute_value=self.raw_data_url + Path(data["raw_data_file"]).name, fields="id", exact_match=True, )[0]["id"] # find the MetabolomicsAnalysis object - this is the old one prev_metab_analysis = wf_client.get_record_by_filter( filter=f'{{"has_input":"{raw_data_object_id}","type":"{NmdcTypes.MetabolomicsAnalysis}"}}', fields="id,uses_calibration,execution_resource,processing_institution,was_informed_by", all_pages=True, ) # find the most recent metabolomics analysis object by the max id prev_metab_analysis = max(prev_metab_analysis, key=lambda x: x["id"]) # increment the metab_id, find the last .digit group with a regex regex = r"(\d+)$" metab_analysis_id = re.sub( regex, lambda x: str(int(x.group(1)) + 1), prev_metab_analysis["id"], ) metab_analysis = self.generate_metabolomics_analysis( cluster_name=prev_metab_analysis["execution_resource"], raw_data_name=Path(data["raw_data_file"]).name, raw_data_id=raw_data_object_id, data_gen_id=prev_metab_analysis["was_informed_by"], processed_data_id="nmdc:placeholder", parameter_data_id="nmdc:placeholder", processing_institution=prev_metab_analysis["processing_institution"], CLIENT_ID=client_id, CLIENT_SECRET=client_secret, incremeneted_id=metab_analysis_id, ) # list all paths in the processed data directory processed_data_paths = list( Path(data["processed_data_directory"]).glob("**/*") ) # Add a check that the processed data directory is not empty if not any(processed_data_paths): raise FileNotFoundError( f"No files found in processed data directory: " f"{data['processed_data_directory']}" ) # Check that there is a .csv, .hdf5, and .toml file in the processed data directory and no other files processed_data_paths = [x for x in processed_data_paths if x.is_file()] if len(processed_data_paths) != 3: raise ValueError( f"Expected 3 files in the processed data directory {processed_data_paths}, found {len(processed_data_paths)}." ) processed_data = [] for file in processed_data_paths: file_type = file.suffixes if file_type: file_type = file_type[0].lstrip(".") if file_type == "toml": # Generate a data object for the parameter data processed_data_object_config = self.generate_data_object( file_path=file, data_category=self.wf_config_process_data_category, data_object_type=self.wf_config_process_data_obj_type, description=self.wf_config_process_data_description, base_url=self.process_data_url + Path(data["processed_data_directory"]).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append( processed_data_object_config ) parameter_data_id = processed_data_object_config.id elif file_type == "csv": # Generate a data object for the annotated data processed_data_object_annot = self.generate_data_object( file_path=file, data_category=self.no_config_process_data_category, data_object_type=self.no_config_process_data_obj_type, description=self.csv_process_data_description, base_url=self.process_data_url + Path(data["processed_data_directory"]).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append( processed_data_object_annot ) processed_data.append(processed_data_object_annot.id) elif file_type == "hdf5": # Generate a data object for the HDF5 processed data processed_data_object = self.generate_data_object( file_path=file, data_category=self.no_config_process_data_category, data_object_type=self.hdf5_process_data_obj_type, description=self.hdf5_process_data_description, base_url=self.process_data_url + Path(data["processed_data_directory"]).name + "/", CLIENT_ID=client_id, CLIENT_SECRET=client_secret, was_generated_by=metab_analysis.id, ) nmdc_database_inst.data_object_set.append(processed_data_object) processed_data.append(processed_data_object.id) # Update MetabolomicsAnalysis times based on HDF5 file metab_analysis.started_at_time = datetime.fromtimestamp( file.stat().st_ctime ).strftime("%Y-%m-%d %H:%M:%S") metab_analysis.ended_at_time = datetime.fromtimestamp( file.stat().st_mtime ).strftime("%Y-%m-%d %H:%M:%S") else: raise ValueError(f"Unexpected file type found for file {file}.") # Check that all processed data objects were created if ( processed_data_object_config is None or processed_data_object_annot is None or processed_data_object is None ): raise ValueError( f"Not all processed data objects were created for {data['processed_data_directory']}." ) has_input = [parameter_data_id, raw_data_object_id] self.update_outputs( analysis_obj=metab_analysis, raw_data_obj_id=raw_data_object_id, parameter_data_id=has_input, processed_data_id_list=processed_data, rerun=True, ) nmdc_database_inst.workflow_execution_set.append(metab_analysis) # Set processed data objects to none for next iteration ( processed_data_object_config, processed_data_object_annot, processed_data_object, ) = ( None, None, None, ) self.dump_nmdc_database(nmdc_database=nmdc_database_inst) api_metadata = Metadata() api_metadata.validate_json(self.database_dump_json_path) logging.info("Metadata processing completed.")
[docs] def create_workflow_metadata( self, row: dict[str, str] ) -> LCMSLipidWorkflowMetadata: """ Create a LCMSLipidWorkflowMetadata object from a dictionary of workflow metadata. Parameters ---------- row : dict[str, str] Dictionary containing metadata for a workflow. This is typically a row from the input metadata CSV file. Returns ------- LCMSLipidWorkflowMetadata A LCMSLipidWorkflowMetadata object populated with data from the input dictionary. Notes ----- The input dictionary is expected to contain the following keys: 'Processed Data Directory', 'Raw Data File', 'Raw Data Object Alt Id', 'mass spec configuration name', 'lc config name', 'instrument used', 'instrument analysis start date', 'instrument analysis end date', 'execution resource'. """ return LCMSLipidWorkflowMetadata( processed_data_dir=row["processed_data_directory"], raw_data_file=row["raw_data_file"], mass_spec_config_name=row["mass_spec_configuration_name"], lc_config_name=row["chromat_configuration_name"], instrument_used=row["instrument_used"], instrument_analysis_start_date=row["instrument_analysis_start_date"], instrument_analysis_end_date=row["instrument_analysis_end_date"], execution_resource=row["execution_resource"], )