Source code for src.metadata_parser

# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np

from dataclasses import dataclass, is_dataclass
from typing import Union, Dict, List, get_origin, get_args
import typing_inspect
from pathlib import Path
from src.bio_ontology_api import BioOntologyInfoRetriever
from nmdc_schema.nmdc import (
    Biosample,
    ControlledIdentifiedTermValue,
    TextValue,
    QuantityValue,
    GeolocationValue,
    TimestampValue,
)
import ast
from src.data_classes import NmdcTypes


[docs] class MetadataParser: """Parsers metadata from input metadata spreadsheet.""" def __init__(self): pass # Helper function to handle missing or NaN values
[docs] def get_value(self, row: pd.Series, key: str, default: str = None) -> str: """ Retrieve a value from a row, handling missing or NaN values. Parameters ---------- row : pd.Series A row from the DataFrame. key : str The key to retrieve the value for. default : str, optional Default value to return if the key does not exist or is NaN. Returns ------- str The value associated with the key, or default if not found. """ type = None # if the value passed in is a Biosample field, we need to add the biosample prefix for field, data in Biosample.__dataclass_fields__.items(): if field == key: key = "biosample." + key type = data.type break if self.is_type(type, QuantityValue): # if the value is a quantity value, we need to extract all columns that could be associated with it # and create a dict with the values value = { "has_numeric_value": row.get(key + ".has_numeric_value", default), "has_minimum_numeric_value": row.get( key + ".has_minimum_numeric_value", default ), "has_maximum_numeric_value": row.get( key + ".has_maximum_numeric_value", default ), "has_unit": row.get(key + ".has_unit", default), "has_raw_value": row.get(key + ".has_raw_value", default), } # remove any keys with None values value = { k: float(v) if isinstance(v, np.int_) else v for k, v in value.items() if v is not None } # if the value is empty, return the default value if not value: return default return value value = row.get(key, default) if isinstance(value, float) and np.isnan(value): return default return value
[docs] def parse_biosample_metadata(self, row: pd.Series) -> Dict: """ Parse the metadata row to get non-biosample class information. Parameters ---------- row : pd.Series A row from the DataFrame containing metadata. Returns ------- Dict """ # Initialize the metadata dictionary metadata_dict = { "raw_data_file": Path(self.get_value(row, "raw_data_file")), "processed_data_directory": Path( self.get_value(row, "processed_data_directory") ), "data_path": Path(self.get_value(row, "LC-MS filename")), "dms_dataset_id": self.get_value(row, "DMS Dataset ID"), "myemsl_link": self.get_value(row, "MyEMSL link"), "associated_studies": ast.literal_eval( self.get_value(row, "associated_studies") ) if self.get_value(row, "associated_studies") else None, "biosample_id": self.get_value(row, "biosample_id") if self.get_value(row, "biosample_id") or self.get_value(row, "id") else None, "instrument_used": self.get_value(row, "instrument_used") if self.get_value(row, "instrument_used") else None, "mass_spec_config": self.get_value(row, "mass_spec_config") if self.get_value(row, "mass_spec_config") else None, } # Create and return the EmslMetadata instance metadata = metadata_dict return metadata
[docs] def is_type(self, type_hint, type_to_search_for) -> bool: """Recursively check if a type hint is or contains input type.""" if not type_to_search_for: return False # Check if the type_to_search_for is a dataclass and compare directly if is_dataclass(type_to_search_for): if is_dataclass(type_hint) and type_hint == type_to_search_for: return True # Check if the origin of the type hint is the type_to_search_for if get_origin(type_hint) == type_to_search_for or ( typing_inspect.is_union_type(type_hint) and any(get_origin(tp) == type_to_search_for for tp in get_args(type_hint)) ): return True # If the type is a Union, check the arguments recursively if typing_inspect.is_union_type(type_hint): return any( self.is_type(arg, type_to_search_for) for arg in get_args(type_hint) if arg is not type(None) ) return False
[docs] def dynam_parse_biosample_metadata(self, row: pd.Series, bio_api_key: str) -> dict: """ Function to parse the metadata row if it includes biosample information. This pulls the most recent version of the ontology terms from the API and compares them to the values in the given row. Different parsing is done on different types of fields, such as lists, controlled identified terms, and text values to ensure the correct format is used. Parameters ---------- row: pd.Series A row from the DataFrame containing metadata. bio_api_key: str The API key to access the Bio Ontology API Returns ------- metadata: dict The metadata dictionary. """ envo_retriever = BioOntologyInfoRetriever(bio_api_key) metadata = {} for field_name, field_data in Biosample.__dataclass_fields__.items(): # check if the field is a list of dataclasses if field_name == "type": metadata[field_name] = "nmdc:Biosample" elif self.is_type(field_data.type, List[Union[dict, dataclass]]): # check if a value exists before we begin complex parsing, saves time douing this at the begining if self.get_value(row, field_name): # we need to make a dict for each item in the list metadata[field_name] = [] # get the list of dicts from the csv row list_of_dicts = ast.literal_eval(self.get_value(row, field_name)) # iterate through the list of dicts and format them for item in list_of_dicts: if self.is_type(field_data.type, TextValue): metadata[field_name].append( self.create_text_value(item, field_name) ) elif self.is_type( field_data.type, ControlledIdentifiedTermValue ): metadata[field_name].append( self.create_controlled_identified_term_value( item, {item: item}, ) ) elif self.is_type(field_data.type, QuantityValue): metadata[field_name].append( self.create_quantity_value(value_dict=item) ) else: metadata[field_name].append(item) # check if the field is a list type, we will need to convert the csv row to a list instead of treating it as a string elif self.is_type(field_data.type, list): metadata[field_name] = ( ast.literal_eval(self.get_value(row, field_name)) if self.get_value(row, field_name) else None ) # format GeolocationValue dict elif self.is_type(field_data.type, GeolocationValue): metadata[field_name] = ( self.create_geo_loc_value( self.get_value(row, field_name), ) if self.get_value(row, field_name) else None ) # format QuantityValue dict elif self.is_type(field_data.type, QuantityValue): metadata[field_name] = ( self.create_quantity_value( value_dict=self.get_value(row, field_name) ) if self.get_value(row, field_name) else None ) elif self.is_type(field_data.type, TimestampValue): metadata[field_name] = ( self.create_timestamp_value(self.get_value(row, field_name)) if self.get_value(row, field_name) else None ) # format ControlledIdentifiedTermValue dict elif self.is_type( field_data.type, ControlledIdentifiedTermValue ) and field_name not in [ "env_broad_scale", "env_local_scale", "env_medium", ]: metadata[field_name] = ( self.create_controlled_identified_term_value( self.get_value(row, field_name), { self.get_value(row, field_name): self.get_value( row, field_name ) }, ) if self.get_value(row, field_name) else None ) # format TextValue dict elif self.is_type(field_data.type, TextValue): metadata[field_name] = ( self.create_text_value( self.get_value(row, field_name), field_name == "env_package" ) if self.get_value(row, field_name) else None ) # format and create envo term for env_broad_scale, env_local_scale, env_medium elif field_name in ["env_broad_scale", "env_local_scale", "env_medium"]: # create envo term for env_broad_scale, env_local_scale, env_medium metadata[field_name] = ( self.create_controlled_identified_term_value( self.get_value(row, field_name), envo_retriever.get_envo_terms(self.get_value(row, field_name)), ) if self.get_value(row, field_name) else None ) # catch all for normal case - strings, ints, etc else: metadata[field_name] = ( self.get_value(row, field_name) if self.get_value(row, field_name) else None ) return metadata
[docs] def create_timestamp_value(self, raw_value: str) -> dict: """ Create a timestamp value representation. Parameters ---------- raw_value : str The raw value to convert to a timestamp. Returns ------- dict A dictionary representing the timestamp value. """ nmdc_timestamp_value = { "has_raw_value": raw_value, "type": NmdcTypes.TimeStampValue, } return nmdc_timestamp_value
[docs] def create_quantity_value( self, value_dict: dict = None, ) -> dict: """ Create a quantity value representation. Since a dictionary is passed in, we need to check if any of the values are None and remove them if so. Also adds the Quantity value type. Parameters ---------- value_dict : dict A dictionary containing the raw value and other attributes gathered from the metadata. This is a dict of the form: { "has_numeric_value": float, "has_minimum_numeric_value": float, "has_maximum_numeric_value": float, "has_unit": str, "has_raw_value": str } The keys in the dictionary are the attributes of the QuantityValue class. They may be passed in as None if they are not present in the metadata. Returns ------- dict A dictionary representing the quantity value. """ if value_dict: value_dict = {k: v for k, v in value_dict.items() if v is not None} value_dict["type"] = NmdcTypes.QuantityValue return value_dict
[docs] def create_geo_loc_value(self, raw_value: str) -> dict: """ Create a geolocation value representation. Parameters ---------- raw_value : str The raw value associated with geolocation. Returns ------- dict A dictionary representing the geolocation value. """ lat_value, long_value = raw_value.split(" ", 1) nmdc_geo_loc_value = { "has_raw_value": raw_value, "latitude": lat_value, "longitude": long_value, "type": NmdcTypes.GeolocationValue, } return nmdc_geo_loc_value
[docs] def create_text_value(self, row_value: str, is_list: bool) -> dict: """ Create a text value representation. Parameters ---------- row_value : str The raw value to convert. is_list : bool Whether to treat the value as a list. Returns ------- dict A dictionary representing the text value. """ nmdc_text_value = {"has_raw_value": row_value, "type": NmdcTypes.TextValue} return nmdc_text_value
[docs] def create_controlled_identified_term_value( self, row_value: str, slot_enum_dict: dict ) -> dict: """ Create a controlled identified term value. Parameters ---------- row_value : str The raw value to be converted. slot_enum_dict : dict A dictionary mapping the raw value to its corresponding term. Returns ------- dict A dictionary representing the controlled identified term. """ nmdc_controlled_term_slot = { "has_raw_value": row_value, "term": { "id": row_value, "name": slot_enum_dict.get(row_value), "type": NmdcTypes.OntologyClass, }, "type": NmdcTypes.ControlledIdentifiedTermValue, } return nmdc_controlled_term_slot
[docs] def generate_example_biosample_csv( self, file_path: str = "example_biosample_metadata.csv" ): """ Function to generate an example csv file from available NMDCSchema Biosample fields. Saves the file to the given path. Parameters ---------- file_path : str The path to save the example CSV file. Default is "example_biosample_metadata.csv". Returns ------- None """ # Get all fields from the Biosample dataclass biosample_fields = Biosample.__dataclass_fields__.keys() biosample_fields = [ "biosample." + field for field in biosample_fields if field != "_inherited_slots" ] # Create a DataFrame with the fields as columns and an empty row df = pd.DataFrame(columns=biosample_fields) # Add data based on the type of column for field in biosample_fields: if self.is_type( Biosample.__dataclass_fields__[field.split(".")[-1]].type, ControlledIdentifiedTermValue, ): df[field] = ["ENVO:00000000"] elif self.is_type( Biosample.__dataclass_fields__[field.split(".")[-1]].type, TextValue ): df[field] = "textValue" elif self.is_type( Biosample.__dataclass_fields__[field.split(".")[-1]].type, QuantityValue ): # create new columns for each of the needed fields quantity_df = pd.DataFrame( { field + ".has_maximum_numeric_value": ["85"], field + ".has_minimum_numeric_value": ["85"], field + ".has_numeric_value": ["85"], field + ".has_unit": ["celcius"], field + ".has_raw_value": ["85"], } ) df = pd.concat([df, quantity_df], axis=1) df.drop(field, axis=1, inplace=True) elif self.is_type( Biosample.__dataclass_fields__[field.split(".")[-1]].type, GeolocationValue, ): df[field] = ["46.37228379 -119.2717467"] elif self.is_type( Biosample.__dataclass_fields__[field.split(".")[-1]].type, TimestampValue, ): df[field] = ["2014-11-25"] # Otherwise, set it to blank else: df[field] = "" # Save the DataFrame to a CSV file df.to_csv(file_path, index=False)