Source code for src.nom_metadata_generator
# -*- coding: utf-8 -*-
from src.metadata_generator import NMDCMetadataGenerator
from src.metadata_parser import NmdcTypes
from tqdm import tqdm
from pathlib import Path
from nmdc_api_utilities.data_object_search import DataObjectSearch
from nmdc_api_utilities.calibration_search import CalibrationSearch
from nmdc_api_utilities.workflow_execution_search import WorkflowExecutionSearch
from nmdc_api_utilities.minter import Minter
from nmdc_api_utilities.metadata import Metadata
import nmdc_schema.nmdc as nmdc
import hashlib
import pandas as pd
import re
from datetime import datetime
from dotenv import load_dotenv
import os
load_dotenv()
ENV = os.getenv("NMDC_ENV", "prod")
[docs]
class NOMMetadataGenerator(NMDCMetadataGenerator):
"""
A class for generating NMDC metadata objects using provided metadata files and configuration
for Natural Organic Matter (NOM) data.
Parameters
----------
metadata_file : str
Path to the input CSV metadata file.
database_dump_json_path : str
Path where the output database dump JSON file will be saved.
raw_data_url : str
Base URL for the raw data files.
process_data_url : str
Base URL for the processed data files.
minting_config_creds : str, optional
Path to the configuration file containing the client ID and client secret for minting NMDC IDs. It can also include the bio ontology API key if generating biosample ids is needed.
If not provided, the CLIENT_ID, CLIENT_SECRET, and BIO_API_KEY environment variables will be used.
Attributes
----------
raw_data_object_type : str
The type of the raw data object.
processed_data_object_type : str
The type of the processed data object.
processed_data_category : str
The category of the processed data.
execution_resource : str
The execution resource for the workflow.
analyte_category : str
The category of the analyte.
workflow_analysis_name : str
The name of the workflow analysis.
workflow_description : str
The description of the workflow.
workflow_param_data_category : str
The category of the workflow parameter data.
workflow_param_data_object_type : str
The type of the workflow parameter data object.
unique_columns : list[str]
List of unique columns in the metadata file.
mass_spec_desc : str
The description of the mass spectrometry data.
mass_spec_eluent_intro : str
The introduction to the mass spectrometry eluent.
processing_institution : str
The institution responsible for processing the data.
workflow_git_url : str
The URL of the workflow Git repository.
workflow_version : str
The version of the workflow.
"""
raw_data_object_type: str = "Direct Infusion FT ICR-MS Raw Data"
processed_data_object_type: str = "Direct Infusion FT-ICR MS Analysis Results"
processed_data_category: str = "processed_data"
execution_resource: str = "EMSL-RZR"
analyte_category: str = "nom"
workflow_analysis_name: str = "NOM Analysis"
workflow_description: str = (
"Natural Organic Matter analysis of raw mass spectrometry data."
)
workflow_param_data_category: str = "workflow_parameter_data"
workflow_param_data_object_type: str = "Analysis Tool Parameter File"
unique_columns: list[str] = ["raw_data_file", "processed_data_directory"]
mass_spec_desc: str = "ultra high resolution mass spectrum"
mass_spec_eluent_intro: str = "direct_infusion_autosampler"
processing_institution: str = "EMSL"
workflow_git_url: str = "https://github.com/microbiomedata/enviroMS"
workflow_version: str
def __init__(
self,
metadata_file: str,
database_dump_json_path: str,
raw_data_url: str,
process_data_url: str,
minting_config_creds: str = None,
workflow_version: str = None,
):
super().__init__(
metadata_file=metadata_file,
database_dump_json_path=database_dump_json_path,
raw_data_url=raw_data_url,
process_data_url=process_data_url,
)
self.minting_config_creds = minting_config_creds
# Set the workflow version, prioritizing user input, then fetching from the Git URL, and finally using a default.
self.workflow_version = workflow_version or self.get_workflow_version(
workflow_version_git_url="https://github.com/microbiomedata/enviroMS/blob/master/.bumpversion.cfg"
)
[docs]
def rerun(self):
"""
Execute a rerun of the metadata generation process.
This method processes the metadata file, generates biosamples (if needed)
and metadata, and manages the workflow for generating NOM analysis data.
Assumes raw data for NOM are on minio and that the raw data object URL field is populated.
"""
do_client = DataObjectSearch(env=ENV)
wf_client = WorkflowExecutionSearch(env=ENV)
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
nmdc_database_inst = self.start_nmdc_database()
try:
df = pd.read_csv(self.metadata_file)
except FileNotFoundError:
raise FileNotFoundError(f"Metadata file not found: {self.metadata_file}")
metadata_df = df.apply(lambda x: x.reset_index(drop=True))
tqdm.write("\033[92mStarting metadata processing...\033[0m")
# check for duplicate doj urls in the database
self.check_doj_urls(
metadata_df=metadata_df, url_columns=["processed_data_directory"]
)
# Iterate through each row in df to generate metadata
for _, row in tqdm(
metadata_df.iterrows(),
total=metadata_df.shape[0],
desc="Processing NOM rows",
):
try:
raw_data_object_id = do_client.get_record_by_attribute(
attribute_name="url",
attribute_value=self.raw_data_url + Path(row["raw_data_file"]).name,
fields="id",
exact_match=True,
)[0]["id"]
except Exception as e:
raise ValueError(
f"Raw data object not found for URL: {self.raw_data_url + Path(row['raw_data_file']).name}"
) from e
try:
# find the NomAnalysis object - this is the old one
prev_nom_analysis = wf_client.get_record_by_filter(
filter=f'{{"has_input":"{raw_data_object_id}","type":"{NmdcTypes.NomAnalysis}"}}',
fields="id,uses_calibration,execution_resource,processing_institution,was_informed_by",
all_pages=True,
)
# find the most recent metabolomics analysis object by the max id
prev_nom_analysis = max(prev_nom_analysis, key=lambda x: x["id"])
# increment the metab_id, find the last .digit group with a regex
regex = r"(\d+)$"
metab_analysis_id = re.sub(
regex,
lambda x: str(int(x.group(1)) + 1),
prev_nom_analysis["id"],
)
except Exception as e:
raise IndexError(
f"NomAnalysis object not found for raw data object ID: {raw_data_object_id}"
)
processed_data = []
# grab the calibration_id from the previous metabolomics analysis
# Generate nom analysis instance, workflow_execution_set (metabolomics analysis), uses the raw data zip file
nom_analysis = self.generate_nom_analysis(
file_path=Path(row["raw_data_file"]),
raw_data_id=raw_data_object_id,
data_gen_id=prev_nom_analysis["was_informed_by"],
processed_data_id="nmdc:placeholder",
calibration_id=prev_nom_analysis["uses_calibration"],
incremented_id=metab_analysis_id,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
processed_data_paths = list(
Path(row["processed_data_directory"]).glob("**/*")
)
# Add a check that the processed data directory is not empty
if not any(processed_data_paths):
raise FileNotFoundError(
f"No files found in processed data directory: "
f"{row['processed_data_directory']}"
)
processed_data_paths = [x for x in processed_data_paths if x.is_file()]
### we will have processed data object AFTER the workflow is ran. Since this is how the lipidomics and gcms work, that is how this will function as well.
for file in processed_data_paths:
if file.suffix == ".csv":
# this is the .csv file of the processed data
processed_data_object_desc = "EnviroMS natural organic matter workflow molecular formula assignment output details"
processed_data_object = self.generate_data_object(
file_path=file,
data_category=self.processed_data_category,
data_object_type=self.processed_data_object_type,
description=processed_data_object_desc,
base_url=self.process_data_url
+ Path(row["processed_data_directory"]).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=nom_analysis.id,
alternative_id=None,
)
processed_data.append(processed_data_object.id)
# Update NomAnalysis times based on csv file
nom_analysis.started_at_time = datetime.fromtimestamp(
file.stat().st_ctime
).strftime("%Y-%m-%d %H:%M:%S")
nom_analysis.ended_at_time = datetime.fromtimestamp(
file.stat().st_mtime
).strftime("%Y-%m-%d %H:%M:%S")
if file.suffix == ".json":
# Generate workflow parameter data object
# this is the .json file of processed data
workflow_param_data_object_desc = f"CoreMS processing parameters for natural organic matter analysis used to generate {nom_analysis.id}"
workflow_data_object = self.generate_data_object(
file_path=file,
data_category=self.workflow_param_data_category,
data_object_type=self.workflow_param_data_object_type,
description=workflow_param_data_object_desc,
base_url=self.process_data_url
+ Path(row["processed_data_directory"]).name
+ "/",
was_generated_by=nom_analysis.id,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
alternative_id=None,
)
has_input = [workflow_data_object.id, raw_data_object_id]
# Update the outputs for mass_spectrometry and nom_analysis
self.update_outputs(
analysis_obj=nom_analysis,
raw_data_obj_id=raw_data_object_id,
parameter_data_id=has_input,
processed_data_id_list=processed_data,
rerun=True,
)
nmdc_database_inst.data_object_set.append(processed_data_object)
nmdc_database_inst.data_object_set.append(workflow_data_object)
nmdc_database_inst.workflow_execution_set.append(nom_analysis)
processed_data = []
self.dump_nmdc_database(nmdc_database=nmdc_database_inst)
api_metadata = Metadata(env=ENV)
api_metadata.validate_json(self.database_dump_json_path)
[docs]
def run(self):
"""
Execute the metadata generation process.
This method processes the metadata file, generates biosamples (if needed)
and metadata, and manages the workflow for generating NOM analysis data.
"""
client_id, client_secret = self.load_credentials(
config_file=self.minting_config_creds
)
nmdc_database_inst = self.start_nmdc_database()
metadata_df = self.load_metadata()
tqdm.write("\033[92mStarting metadata processing...\033[0m")
processed_data = []
self.check_for_biosamples(
metadata_df=metadata_df,
nmdc_database_inst=nmdc_database_inst,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
# check for duplicate doj urls in the database
self.check_doj_urls(metadata_df=metadata_df, url_columns=self.unique_columns)
print(f"ENV: {ENV}")
# Iterate through each row in df to generate metadata
for _, row in tqdm(
metadata_df.iterrows(),
total=metadata_df.shape[0],
desc="Processing NOM rows",
):
emsl_metadata, biosample_id = self.handle_biosample(row)
# Generate MassSpectrometry record
mass_spec = self.generate_mass_spectrometry(
file_path=Path(emsl_metadata["data_path"]),
instrument_name=emsl_metadata["instrument_used"],
sample_id=biosample_id,
raw_data_id="nmdc:placeholder",
study_id=emsl_metadata["associated_studies"],
processing_institution=self.processing_institution,
mass_spec_config_name=emsl_metadata["mass_spec_config"],
start_date=row["instrument_analysis_start_date"],
end_date=row["instrument_analysis_end_date"],
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
eluent_intro_pretty = self.mass_spec_eluent_intro.replace("_", " ")
# raw is the zipped .d directory
raw_data_object_desc = (
f"Raw {emsl_metadata['instrument_used']} {eluent_intro_pretty} data."
)
raw_data_object = self.generate_data_object(
file_path=Path(row["raw_data_file"]),
data_category=self.raw_data_category,
data_object_type=self.raw_data_object_type,
description=raw_data_object_desc,
base_url=self.raw_data_url,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=mass_spec.id,
)
# Generate nom analysis instance, workflow_execution_set (metabolomics analysis), uses the raw data zip file
calibration_id = self.get_calibration_id(
calibration_path=Path(row["ref_calibration_path"])
)
nom_analysis = self.generate_nom_analysis(
file_path=Path(row["raw_data_file"]),
calibration_id=calibration_id,
raw_data_id=raw_data_object.id,
data_gen_id=mass_spec.id,
processed_data_id="nmdc:placeholder",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
)
processed_data_paths = list(
Path(row["processed_data_directory"]).glob("**/*")
)
# Add a check that the processed data directory is not empty
if not any(processed_data_paths):
raise FileNotFoundError(
f"No files found in processed data directory: "
f"{row['processed_data_directory']}"
)
processed_data_paths = [x for x in processed_data_paths if x.is_file()]
### we will have processed data object AFTER the workflow is ran. Since this is how the lipidomics and gcms work, that is how this will function as well.
for file in processed_data_paths:
if file.suffix == ".csv":
# this is the .csv file of the processed data
processed_data_object_desc = (
f"EnviroMS {emsl_metadata['instrument_used']} "
"natural organic matter workflow molecular formula assignment output details"
)
processed_data_object = self.generate_data_object(
file_path=file,
data_category=self.processed_data_category,
data_object_type=self.processed_data_object_type,
description=processed_data_object_desc,
base_url=self.process_data_url
+ Path(row["processed_data_directory"]).name
+ "/",
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
was_generated_by=nom_analysis.id,
alternative_id=None,
)
processed_data.append(processed_data_object.id)
# Update NomAnalysis times based on csv file
nom_analysis.started_at_time = datetime.fromtimestamp(
file.stat().st_ctime
).strftime("%Y-%m-%d %H:%M:%S")
nom_analysis.ended_at_time = datetime.fromtimestamp(
file.stat().st_mtime
).strftime("%Y-%m-%d %H:%M:%S")
if file.suffix == ".json":
# Generate workflow parameter data object
# this is the .json file of processed data
workflow_param_data_object_desc = f"CoreMS processing parameters for natural organic matter analysis used to generate {nom_analysis.id}"
workflow_data_object = self.generate_data_object(
file_path=file,
data_category=self.workflow_param_data_category,
data_object_type=self.workflow_param_data_object_type,
description=workflow_param_data_object_desc,
base_url=self.process_data_url
+ Path(row["processed_data_directory"]).name
+ "/",
was_generated_by=nom_analysis.id,
CLIENT_ID=client_id,
CLIENT_SECRET=client_secret,
alternative_id=None,
)
has_input = [workflow_data_object.id, raw_data_object.id]
# Update the outputs for mass_spectrometry and nom_analysis
self.update_outputs(
mass_spec_obj=mass_spec,
analysis_obj=nom_analysis,
raw_data_obj_id=raw_data_object.id,
parameter_data_id=has_input,
processed_data_id_list=processed_data,
rerun=False,
)
nmdc_database_inst.data_generation_set.append(mass_spec)
nmdc_database_inst.data_object_set.append(raw_data_object)
nmdc_database_inst.data_object_set.append(processed_data_object)
nmdc_database_inst.data_object_set.append(workflow_data_object)
nmdc_database_inst.workflow_execution_set.append(nom_analysis)
processed_data = []
self.dump_nmdc_database(nmdc_database=nmdc_database_inst)
api_metadata = Metadata(env=ENV)
api_metadata.validate_json(self.database_dump_json_path)
[docs]
def get_calibration_id(
self,
calibration_path: str,
) -> str:
"""
Get the calibration ID from the NMDC API using the md5 checksum of the calibration file.
Parameters
----------
calibration_path : str
The file path of the calibration file.
Returns
-------
str
The calibration ID if found, otherwise None.
"""
# Lookup calibration id by md5 checksum of calibration_path file
calib_md5 = hashlib.md5(calibration_path.open("rb").read()).hexdigest()
do_client = DataObjectSearch(env=ENV)
cs_client = CalibrationSearch(env=ENV)
try:
calib_do_id = do_client.get_record_by_attribute(
attribute_name="md5_checksum",
attribute_value=calib_md5,
fields="id",
exact_match=True,
)[0]["id"]
calibration_id = cs_client.get_record_by_attribute(
attribute_name="calibration_object",
attribute_value=calib_do_id,
fields="id",
exact_match=True,
)[0]["id"]
except ValueError as e:
print(f"Calibration object does not exist: {e}")
calibration_id = None
except IndexError as e:
print(f"Calibration object not found: {e}")
calibration_id = None
except Exception as e:
print(f"An error occurred: {e}")
return calibration_id
[docs]
def generate_nom_analysis(
self,
file_path: Path,
raw_data_id: str,
data_gen_id: str,
processed_data_id: str,
CLIENT_ID: str,
CLIENT_SECRET: str,
calibration_id: str = None,
incremented_id: str = None,
) -> nmdc.NomAnalysis:
"""
Generate a metabolomics analysis object from the provided file information.
Parameters
----------
file_path : Path
The file path of the metabolomics analysis data file.
raw_data_id : str
The ID of the raw data associated with the analysis.
data_gen_id : str
The ID of the data generation process that informed this analysis.
processed_data_id : str
The ID of the processed data resulting from this analysis.
CLIENT_ID : str
The client ID for the NMDC API.
CLIENT_SECRET : str
The client secret for the NMDC API.
calibration_id : str, optional
The ID of the calibration object used in the analysis. If None, no calibration is used.
incremented_id : str, optional
The incremented ID for the metabolomics analysis. If None, a new ID will be minted.
Returns
-------
nmdc.NomAnalysis
The generated metabolomics analysis object.
"""
if incremented_id is None:
mint = Minter(env=ENV)
nmdc_id = mint.mint(
nmdc_type=NmdcTypes.NomAnalysis,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
)
incremented_id = nmdc_id + ".1"
data_dict = {
"id": incremented_id,
"name": f"{self.workflow_analysis_name} for {file_path.name}",
"description": self.workflow_description,
"uses_calibration": calibration_id,
"processing_institution": self.processing_institution,
"execution_resource": self.execution_resource,
"git_url": self.workflow_git_url,
"version": self.workflow_version,
"was_informed_by": data_gen_id,
"has_input": [raw_data_id],
"has_output": [processed_data_id],
"started_at_time": "placeholder",
"ended_at_time": "placeholder",
"type": NmdcTypes.NomAnalysis,
}
self.clean_dict(data_dict)
nomAnalysis = nmdc.NomAnalysis(**data_dict)
return nomAnalysis