Source code for src.lcms_metadata_generator
# -*- coding: utf-8 -*-
from src.metadata_generator import NMDCMetadataGenerator
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
import logging
from nmdc_api_utilities.metadata import Metadata
import ast
import pandas as pd
from src.data_classes import LCMSLipidWorkflowMetadata
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
from nmdc_api_utilities.data_object_search import DataObjectSearch
from src.data_classes import NmdcTypes
import re
[docs]
class LCMSMetadataGenerator(NMDCMetadataGenerator):
"""
A class for generating NMDC metadata objects using provided metadata files and configuration
for LC-MS data.
This class processes input metadata files, generates various NMDC objects, and produces
a database dump in JSON format.
"""
def __init__(
self,
metadata_file: str,
database_dump_json_path: str,
raw_data_url: str,
process_data_url: str,
):
super().__init__(
metadata_file=metadata_file,
database_dump_json_path=database_dump_json_path,
raw_data_url=raw_data_url,
process_data_url=process_data_url,
)
[docs]
def run(self) -> None:
"""
Execute the metadata generation process for lipidomics data.
This method performs the following steps:
1. Initialize an NMDC Database instance.
2. Load and process metadata to create NMDC objects.
3. Generate Mass Spectrometry, Raw Data, Metabolomics Analysis, and
Processed Data objects.
4. Update outputs for Mass Spectrometry and Metabolomics Analysis objects.
5. Append generated objects to the NMDC Database.
6. Dump the NMDC Database to a JSON file.
7. Validate the JSON file using the NMDC API.
Returns
-------
None
Raises
------
FileNotFoundError
If the processed data directory is empty or not found.
ValueError
If the number of files in the processed data directory is not as expected
Notes
-----
This method uses tqdm to display progress bars for the processing of
biosamples and mass spectrometry metadata.
"""
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
nmdc_database_inst = self.start_nmdc_database()
metadata_df = self.load_metadata()
self.check_for_biosamples(
metadata_df=metadata_df,
nmdc_database_inst=nmdc_database_inst,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
# check for duplicate doj urls in the database
self.check_doj_urls(metadata_df=metadata_df, url_columns=self.unique_columns)
for _, data in tqdm(
metadata_df.iterrows(),
total=metadata_df.shape[0],
desc="Processing LCMS biosamples",
):
workflow_metadata = self.create_workflow_metadata(data)
mass_spec = self.generate_mass_spectrometry(
file_path=Path(workflow_metadata.raw_data_file),
instrument_name=workflow_metadata.instrument_used,
sample_id=data["biosample_id"],
raw_data_id="nmdc:placeholder",
study_id=ast.literal_eval(data["biosample.associated_studies"]),
processing_institution=data["processing_institution"],
mass_spec_config_name=workflow_metadata.mass_spec_config_name,
lc_config_name=workflow_metadata.lc_config_name,
start_date=workflow_metadata.instrument_analysis_start_date,
end_date=workflow_metadata.instrument_analysis_end_date,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
raw_data_object = self.generate_data_object(
file_path=Path(workflow_metadata.raw_data_file),
data_category=self.raw_data_category,
data_object_type=self.raw_data_obj_type,
description=self.raw_data_obj_desc,
base_url=self.raw_data_url,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=mass_spec.id,
)
metab_analysis = self.generate_metabolomics_analysis(
cluster_name=workflow_metadata.execution_resource,
raw_data_name=Path(workflow_metadata.raw_data_file).name,
raw_data_id=raw_data_object.id,
data_gen_id=mass_spec.id,
processed_data_id="nmdc:placeholder",
parameter_data_id="nmdc:placeholder",
processing_institution=data["processing_institution"],
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
# list all paths in the processed data directory
processed_data_paths = list(
Path(workflow_metadata.processed_data_dir).glob("**/*")
)
# Add a check that the processed data directory is not empty
if not any(processed_data_paths):
raise FileNotFoundError(
f"No files found in processed data directory: "
f"{workflow_metadata.processed_data_dir}"
)
# Check that there is a .csv, .hdf5, and .toml file in the processed data directory and no other files
processed_data_paths = [x for x in processed_data_paths if x.is_file()]
if len(processed_data_paths) != 3:
raise ValueError(
f"Expected 3 files in the processed data directory {processed_data_paths}, found {len(processed_data_paths)}."
)
processed_data = []
for file in processed_data_paths:
file_type = file.suffixes
if file_type:
file_type = file_type[0].lstrip(".")
if file_type == "toml":
# Generate a data object for the parameter data
processed_data_object_config = self.generate_data_object(
file_path=file,
data_category=self.wf_config_process_data_category,
data_object_type=self.wf_config_process_data_obj_type,
description=self.wf_config_process_data_description,
base_url=self.process_data_url
+ Path(workflow_metadata.processed_data_dir).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(
processed_data_object_config
)
parameter_data_id = processed_data_object_config.id
elif file_type == "csv":
# Generate a data object for the annotated data
processed_data_object_annot = self.generate_data_object(
file_path=file,
data_category=self.no_config_process_data_category,
data_object_type=self.no_config_process_data_obj_type,
description=self.csv_process_data_description,
base_url=self.process_data_url
+ Path(workflow_metadata.processed_data_dir).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(
processed_data_object_annot
)
processed_data.append(processed_data_object_annot.id)
elif file_type == "hdf5":
# Generate a data object for the HDF5 processed data
processed_data_object = self.generate_data_object(
file_path=file,
data_category=self.no_config_process_data_category,
data_object_type=self.hdf5_process_data_obj_type,
description=self.hdf5_process_data_description,
base_url=self.process_data_url
+ Path(workflow_metadata.processed_data_dir).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(processed_data_object)
processed_data.append(processed_data_object.id)
# Update MetabolomicsAnalysis times based on HDF5 file
metab_analysis.started_at_time = datetime.fromtimestamp(
file.stat().st_ctime
).strftime("%Y-%m-%d %H:%M:%S")
metab_analysis.ended_at_time = datetime.fromtimestamp(
file.stat().st_mtime
).strftime("%Y-%m-%d %H:%M:%S")
else:
raise ValueError(f"Unexpected file type found for file {file}.")
# Check that all processed data objects were created
if (
processed_data_object_config is None
or processed_data_object_annot is None
or processed_data_object is None
):
raise ValueError(
f"Not all processed data objects were created for {workflow_metadata.processed_data_dir}."
)
has_input = [parameter_data_id, raw_data_object.id]
self.update_outputs(
mass_spec_obj=mass_spec,
analysis_obj=metab_analysis,
raw_data_obj_id=raw_data_object.id,
parameter_data_id=has_input,
processed_data_id_list=processed_data,
rerun=False,
)
nmdc_database_inst.data_generation_set.append(mass_spec)
nmdc_database_inst.data_object_set.append(raw_data_object)
nmdc_database_inst.workflow_execution_set.append(metab_analysis)
# Set processed data objects to none for next iteration
(
processed_data_object_config,
processed_data_object_annot,
processed_data_object,
) = (
None,
None,
None,
)
self.dump_nmdc_database(nmdc_database=nmdc_database_inst)
api_metadata = Metadata()
api_metadata.validate_json(self.database_dump_json_path)
logging.info("Metadata processing completed.")
[docs]
def rerun(self) -> None:
"""
Execute a rerun of the metadata generation process for metabolomics data.
This method performs the following steps:
1. Initialize an NMDC Database instance.
2. Load and process metadata to create NMDC objects.
3. Generate Metabolomics Analysis and Processed Data objects.
4. Update outputs for the Metabolomics Analysis object.
5. Append generated objects to the NMDC Database.
6. Dump the NMDC Database to a JSON file.
7. Validate the JSON file using the NMDC API.
Returns
-------
None
Raises
------
FileNotFoundError
If the processed data directory is empty or not found.
ValueError
If the number of files in the processed data directory is not as expected.
Notes
-----
This method uses tqdm to display progress bars for the processing of
biosamples and mass spectrometry metadata.
"""
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
wf_client = WorkflowExecutionSearch()
do_client = DataObjectSearch()
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
nmdc_database_inst = self.start_nmdc_database()
try:
df = pd.read_csv(self.metadata_file)
except FileNotFoundError:
raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}")
metadata_df = df.apply(lambda x: x.reset_index(drop=True))
# check for duplicate doj urls in the database
self.check_doj_urls(
metadata_df=metadata_df, url_columns=["processed_data_directory"]
)
for _, data in tqdm(
metadata_df.iterrows(),
total=metadata_df.shape[0],
desc="Processing LCMS biosamples",
):
# workflow_metadata = self.create_workflow_metadata(data)
raw_data_object_id = do_client.get_record_by_attribute(
attribute_name="url",
attribute_value=self.raw_data_url + Path(data["raw_data_file"]).name,
fields="id",
exact_match=True,
)[0]["id"]
# find the MetabolomicsAnalysis object - this is the old one
prev_metab_analysis = wf_client.get_record_by_filter(
filter=f'{{"has_input":"{raw_data_object_id}","type":"{NmdcTypes.MetabolomicsAnalysis}"}}',
fields="id,uses_calibration,execution_resource,processing_institution,was_informed_by",
all_pages=True,
)
# find the most recent metabolomics analysis object by the max id
prev_metab_analysis = max(prev_metab_analysis, key=lambda x: x["id"])
# increment the metab_id, find the last .digit group with a regex
regex = r"(\d+)$"
metab_analysis_id = re.sub(
regex,
lambda x: str(int(x.group(1)) + 1),
prev_metab_analysis["id"],
)
metab_analysis = self.generate_metabolomics_analysis(
cluster_name=prev_metab_analysis["execution_resource"],
raw_data_name=Path(data["raw_data_file"]).name,
raw_data_id=raw_data_object_id,
data_gen_id=prev_metab_analysis["was_informed_by"],
processed_data_id="nmdc:placeholder",
parameter_data_id="nmdc:placeholder",
processing_institution=prev_metab_analysis["processing_institution"],
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
incremeneted_id=metab_analysis_id,
)
# list all paths in the processed data directory
processed_data_paths = list(
Path(data["processed_data_directory"]).glob("**/*")
)
# Add a check that the processed data directory is not empty
if not any(processed_data_paths):
raise FileNotFoundError(
f"No files found in processed data directory: "
f"{data['processed_data_directory']}"
)
# Check that there is a .csv, .hdf5, and .toml file in the processed data directory and no other files
processed_data_paths = [x for x in processed_data_paths if x.is_file()]
if len(processed_data_paths) != 3:
raise ValueError(
f"Expected 3 files in the processed data directory {processed_data_paths}, found {len(processed_data_paths)}."
)
processed_data = []
for file in processed_data_paths:
file_type = file.suffixes
if file_type:
file_type = file_type[0].lstrip(".")
if file_type == "toml":
# Generate a data object for the parameter data
processed_data_object_config = self.generate_data_object(
file_path=file,
data_category=self.wf_config_process_data_category,
data_object_type=self.wf_config_process_data_obj_type,
description=self.wf_config_process_data_description,
base_url=self.process_data_url
+ Path(data["processed_data_directory"]).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(
processed_data_object_config
)
parameter_data_id = processed_data_object_config.id
elif file_type == "csv":
# Generate a data object for the annotated data
processed_data_object_annot = self.generate_data_object(
file_path=file,
data_category=self.no_config_process_data_category,
data_object_type=self.no_config_process_data_obj_type,
description=self.csv_process_data_description,
base_url=self.process_data_url
+ Path(data["processed_data_directory"]).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(
processed_data_object_annot
)
processed_data.append(processed_data_object_annot.id)
elif file_type == "hdf5":
# Generate a data object for the HDF5 processed data
processed_data_object = self.generate_data_object(
file_path=file,
data_category=self.no_config_process_data_category,
data_object_type=self.hdf5_process_data_obj_type,
description=self.hdf5_process_data_description,
base_url=self.process_data_url
+ Path(data["processed_data_directory"]).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=metab_analysis.id,
)
nmdc_database_inst.data_object_set.append(processed_data_object)
processed_data.append(processed_data_object.id)
# Update MetabolomicsAnalysis times based on HDF5 file
metab_analysis.started_at_time = datetime.fromtimestamp(
file.stat().st_ctime
).strftime("%Y-%m-%d %H:%M:%S")
metab_analysis.ended_at_time = datetime.fromtimestamp(
file.stat().st_mtime
).strftime("%Y-%m-%d %H:%M:%S")
else:
raise ValueError(f"Unexpected file type found for file {file}.")
# Check that all processed data objects were created
if (
processed_data_object_config is None
or processed_data_object_annot is None
or processed_data_object is None
):
raise ValueError(
f"Not all processed data objects were created for {data['processed_data_directory']}."
)
has_input = [parameter_data_id, raw_data_object_id]
self.update_outputs(
analysis_obj=metab_analysis,
raw_data_obj_id=raw_data_object_id,
parameter_data_id=has_input,
processed_data_id_list=processed_data,
rerun=True,
)
nmdc_database_inst.workflow_execution_set.append(metab_analysis)
# Set processed data objects to none for next iteration
(
processed_data_object_config,
processed_data_object_annot,
processed_data_object,
) = (
None,
None,
None,
)
self.dump_nmdc_database(nmdc_database=nmdc_database_inst)
api_metadata = Metadata()
api_metadata.validate_json(self.database_dump_json_path)
logging.info("Metadata processing completed.")
[docs]
def create_workflow_metadata(
self, row: dict[str, str]
) -> LCMSLipidWorkflowMetadata:
"""
Create a LCMSLipidWorkflowMetadata object from a dictionary of workflow metadata.
Parameters
----------
row : dict[str, str]
Dictionary containing metadata for a workflow. This is typically
a row from the input metadata CSV file.
Returns
-------
LCMSLipidWorkflowMetadata
A LCMSLipidWorkflowMetadata object populated with data from the input dictionary.
Notes
-----
The input dictionary is expected to contain the following keys:
'Processed Data Directory', 'Raw Data File', 'Raw Data Object Alt Id',
'mass spec configuration name', 'lc config name', 'instrument used',
'instrument analysis start date', 'instrument analysis end date',
'execution resource'.
"""
return LCMSLipidWorkflowMetadata(
processed_data_dir=row["processed_data_directory"],
raw_data_file=row["raw_data_file"],
mass_spec_config_name=row["mass_spec_configuration_name"],
lc_config_name=row["chromat_configuration_name"],
instrument_used=row["instrument_used"],
instrument_analysis_start_date=row["instrument_analysis_start_date"],
instrument_analysis_end_date=row["instrument_analysis_end_date"],
execution_resource=row["execution_resource"],
)