Source code for messes.convert.mwtab_functions

# -*- coding: utf-8 -*-
"""
Functions For mwtab Format
--------------------------
"""

import sys
import operator


[docs]def create_sample_lineages(input_json: dict, entity_table_name: str="entity", parent_key: str="parent_id") -> dict: """Determine all the ancestors, parents, and siblings for each entity in the entity table. The returned dictionary is of the form: {entity_id:{"ancestors":[ancestor0, ancestor1, ...], "parents":[parent0, parent1, ...], "siblings":[sibling0, sibling1, ...]} ... } parents are the immediate ancestors an entity comes from. They are also included in the ancestors list. Args: input_json: the dictionary where the entity table is. entity_table_name: the name of the entity table in input_json. parent_key: the field name for the field that points to the entity's parent. Returns: a dictionary where the keys are the entity ids and the values are a dictionary of it's ancestors, parents, and siblings. """ lineages = {} for entity_name, entity_attributes in input_json[entity_table_name].items(): ancestors = [] immediate_parents = [] if parents := entity_attributes.get(parent_key): parents = parents if isinstance(parents, list) else [parents] immediate_parents = parents next_parents = parents while next_parents: parents = next_parents next_parents = [] for parent_name in parents: if parent_name not in ancestors: ancestors.append(parent_name) if parent_name not in input_json[entity_table_name]: print("Error: The parent entity, \"" + parent_name + "\", pulled from the entity \"" + entity_name + \ "\" in the \"" + entity_table_name + "\" table is not in the \"" + entity_table_name + "\" table. " +\ "Parent entities must be in the table with thier children.", file=sys.stderr) sys.exit() if grandparents := input_json[entity_table_name][parent_name].get(parent_key): grandparents = grandparents if isinstance(grandparents, list) else [grandparents] next_parents += grandparents ancestors.reverse() lineages[entity_name] = {"ancestors": ancestors, "parents": immediate_parents} for entity_name in lineages: siblings = [] if not lineages[entity_name]["ancestors"]: lineages[entity_name]["siblings"] = [] continue parents = lineages[entity_name]["parents"] for sibling_name, entity_attributes in input_json[entity_table_name].items(): if sibling_name != entity_name and (sibling_parents := entity_attributes.get(parent_key)): sibling_parents = sibling_parents if isinstance(sibling_parents, list) else [sibling_parents] if set(sibling_parents).intersection(parents): siblings.append(sibling_name) lineages[entity_name]["siblings"] = siblings return lineages
[docs]def create_subject_sample_factors(input_json: dict, measurement_table_name: str="measurement", sibling_match_field: str="protocol.id", sibling_match_value: str="protein_extraction", sample_id_key: str="entity.id", entity_table_name: str="entity", entity_type_key: str="type", subject_type_value: str="subject", parent_key: str="parent_id", factor_table_name: str="factor", factor_field_key: str="field", factor_allowed_values_key: str="allowed_values", protocol_table_name: str="protocol", protocol_field: str="protocol.id", protocol_type_field: str="type", measurement_type_value: str="measurement", data_files_key: str="data_files", data_files_attribute_key: str="data_files%entity_id", lineage_field_exclusion_list: list[str]|tuple[str] =("study.id", "project.id", "parent_id")) -> list[dict]: """Create the SUBJECT_SAMPLE_FACTORS section of the mwTab JSON. Args: input_json: the data to build from. measurement_table_name: the name of the table in input_json where the measurements are. sibling_match_field: the field to use to determine if a sibling should be added to the SSF. sibling_match_value the value to use to determine if a sibling should be added to the SSF. sample_id_key: the field in the measurement that has the sample id associated with it. entity_table_name: the name of the table in input_json where the entities are. entity_type_key: the field in entity records where the type is located. subject_type_value: the value in the type key that means the entity is a subject. parent_key: the field that points to the parent of the record. factor_table_name: the name of the table in input_json where the factors are. factor_field_key: the field in factor records that tells what the factor field is in other records. factor_allowed_values_key: the field in factor records where the allowed values for that factor are. protocol_table_name: the name of the table in input_json where the protocols are. protocol_field: the field in records that contains the protocol(s) of the record. protocol_type_field: the field in protocol records where the type is located. measurement_type_value: the value in the type key that means the protocol is a measurement type. data_files_key: the field in a measurement type protocol record where the file names are located. data_files_attribute_key: the field in a measurement type protocol record where the corresponding entity_id to raw file names are located. lineage_field_exclusion_list: the fields in entity records that should not be added as additional data. Returns: a list of SUBJECT_SAMPLE_FACTORS. """ samples = set() protocols = set() for measurement_name, measurement_attributes in input_json[measurement_table_name].items(): if sample_id := measurement_attributes.get(sample_id_key): samples.add(sample_id) if protocol := measurement_attributes.get(protocol_field): if isinstance(protocol, list): protocols.update(protocol) else: protocols.add(protocol) ## Determine the measurement protocol to look for raw_files. raw_file_dict = {} for protocol in protocols: if (protocol_attributes := input_json[protocol_table_name].get(protocol)) and \ (protocol_type := protocol_attributes.get(protocol_type_field)) and \ protocol_type == measurement_type_value and \ (data_files := protocol_attributes.get(data_files_key)) and \ (data_files_attribute := protocol_attributes.get(data_files_attribute_key)): data_files_len = len(data_files) data_files_attribute_len = len(data_files_attribute) if data_files_len != data_files_attribute_len: print("Warning: The protocol, \"" + protocol + "\", has a " + data_files_key + " field that is not the same length as its " + data_files_attribute_key + " field. The raw files for the subject-sample-factors may be incorrect.", file=sys.stderr) if data_files_len < data_files_attribute_len: raw_file_dict = {data_files_attribute[i]:raw_file for i, raw_file in enumerate(data_files)} else: raw_file_dict = {entity:data_files[i] for i, entity in enumerate(data_files_attribute)} if not samples == set(data_files_attribute): print("Warning: The entities found in the measurement records and those found in the " + data_files_attribute_key + " field of the " + protocol + " protocol are not the same.", file=sys.stderr) break lineages = create_sample_lineages(input_json, entity_table_name=entity_table_name, parent_key=parent_key) factor_fields = {factor_attributes[factor_field_key]:{"name":factor, "allowed_values":factor_attributes[factor_allowed_values_key]} for factor, factor_attributes in input_json[factor_table_name].items()} ss_factors = [] for sample in samples: if sample not in lineages: print("Error: The sample, \"" + sample + "\", pulled from the \"" + measurement_table_name + \ "\" table is not in the \"" + entity_table_name + "\" table. Thus the subject-sample-factors cannot be determined.", file=sys.stderr) sys.exit() additional_sample_data = {} # raw_files = [] factors = {} subject_id = "" lineage_count = 0 ## Loop over all of the sample's ancestors and add them to additional data as well find all the factors, and the closest subject. for ancestor in lineages[sample]["ancestors"]: for field, field_value in input_json[entity_table_name][ancestor].items(): if field in lineage_field_exclusion_list: continue additional_sample_data["lineage" + str(lineage_count) + "_" + field] = str(field_value) if field in factor_fields and factor_fields[field]["name"] not in factors: if isinstance(field_value,str) and field_value in factor_fields[field]["allowed_values"]: factors[factor_fields[field]["name"]] = field_value elif isinstance(field_value,list) and (field_values := [value for value in field_value if value in factor_fields[field]["allowed_values"]]): factors[factor_fields[field]["name"]] = field_values[0] if len(field_values) == 1 else str(field_values) if not subject_id and field == entity_type_key and field_value == subject_type_value: subject_id = ancestor lineage_count += 1 ## Look for siblings to add to additional data if sibling_match_field is given. if sibling_match_field and sibling_match_value: for sibling in lineages[sample]["siblings"]: match_field_value = input_json[entity_table_name][sibling][sibling_match_field] if sibling_match_field in input_json[entity_table_name][sibling] and \ ((isinstance(match_field_value, str) and sibling_match_value == match_field_value) or\ (isinstance(match_field_value, list) and sibling_match_value in match_field_value)): for field, field_value in input_json[entity_table_name][sibling].items(): if field in lineage_field_exclusion_list: continue additional_sample_data["lineage" + str(lineage_count) + "_" + field] = str(field_value) lineage_count += 1 ## Look for factors on the sample itself. for field, field_value in input_json[entity_table_name][sample].items(): if field in factor_fields and factor_fields[field]["name"] not in factors: if isinstance(field_value,str) and field_value in factor_fields[field]["allowed_values"]: factors[factor_fields[field]["name"]] = field_value elif isinstance(field_value,list) and (field_values := [value for value in field_value if value in factor_fields[field]["allowed_values"]]): factors[factor_fields[field]["name"]] = field_values[0] if len(field_values) == 1 else str(field_values) ## Add raw files as a key to additional sample data. if raw_file := raw_file_dict.get(sample): additional_sample_data["RAW_FILE_NAME"] = raw_file ss_factors.append({"Subject ID":subject_id, "Sample ID":sample, "Factors":factors, "Additional sample data":additional_sample_data}) ## Run some error checking on factors found. found_factors = {factor for ss_factor in ss_factors for factor in ss_factor["Factors"]} missing_factors = set(input_json[factor_table_name]) - found_factors if missing_factors: print("Warning: There are factors in the \"" + factor_table_name +\ "\" table that were not found when determining the subject-sample-factors. These factors are: " +\ ", ".join(missing_factors), file=sys.stderr) samples_without_all_factors = [ss_factor["Sample ID"] for ss_factor in ss_factors if found_factors - set(ss_factor["Factors"])] if samples_without_all_factors: print("Warning: The following samples do not have the full set of factors: \n" + "\n".join(samples_without_all_factors), file=sys.stderr) ## Sort ss_factors. ss_factors = sorted(ss_factors, key = operator.itemgetter(*["Subject ID", "Sample ID"])) return ss_factors