Source code for

Constructing Mappings From KEGG "link" And "conv" Operations
|Functionality| for converting the output from the KEGG "link" or "conv" REST operations into mappings of the entry IDs from one database to the IDs of related entries.
import typing as t
import json
import copy as cp
import logging as log
from . import rest as r
from . import kegg_url as ku
from . import _utils as u

KEGGmapping = dict[str, set[str]]

def _to_dict(kegg_rest: r.KEGGrest | None, KEGGurl: type[ku.AbstractKEGGurl], **kwargs) -> KEGGmapping:
    """ Converts output from the KEGG "link" operation into a dictionary.

    :param kegg_rest: The KEGGrest object to perform the "link" operation. If None, one is created with the default parameters.
    :param KEGGurl: The class extending AbstractKEGGurl used to form the URL for the "link" operation.
    :param kwargs: The keyword arguments for constructing the URL.
    :return: The dictionary.
    :raises RuntimeError: Raised if the request to the KEGG REST API fails or times out.
    kegg_response = r.request_and_check_error(kegg_rest=kegg_rest, KEGGurl=KEGGurl, **kwargs)
    text_body = kegg_response.text_body.strip()
    mapped_ids = KEGGmapping()
    # If a non-empty response was provided, fill in the mapping with the data
    if text_body:
        for one_to_one in text_body.split('\n'):
            [map_from_id, map_to_id] = one_to_one.strip().split('\t')
            _add_to_dict(dictionary=mapped_ids, key=map_from_id, values={map_to_id})
    return mapped_ids

def _add_to_dict(dictionary: KEGGmapping, key: str, values: set[str]) -> None:
    """ Adds a set of values to a set mapped from a given key in a dictionary.

    :param dictionary: The dictionary mapping to sets, one of which the values will be added to.
    :param key: The key in the dictionary mapping to the set that the values will be added to.
    :param values: The values to add to the set mapped to by the key.
    if key in dictionary.keys():
        dictionary[key] = cp.deepcopy(values)  # In case "values" is referenced elsewhere, we don't want to update a shallow copy

def _deduplicate_pathway_ids(mapping: KEGGmapping, deduplicate: bool, source_database: str, target_database: str) -> KEGGmapping:
    """ If requested, removes entry IDs corresponding to duplicate pathway map entries (different ID, same entry).

    :param mapping: The mapping to deduplicate.
    :param deduplicate: Whether or not to deduplicate.
    :param source_database: The name of the source database of the mapping to validate.
    :param target_database: The name of the target database of the mapping to validate.
    :raises ValueError: Raised if deduplicate is True but neither source_database nor target_database is "pathway".
    if deduplicate:
        if source_database != 'pathway' and target_database != 'pathway':
            raise ValueError(
                f'Cannot deduplicate path:map entry ids when neither the source database nor the target database is set to '
                f'"pathway". Databases specified: {source_database}, {target_database}.')

        # noinspection PyShadowingNames
        def deduplicate_pathway_ids(mapping: KEGGmapping, **_) -> KEGGmapping:
            for pathway_id in list(mapping.keys()):
                if not pathway_id.startswith('path:map'):
                    del mapping[pathway_id]
            return mapping
        mapping = _process_mapping(
            mapping=mapping, func=deduplicate_pathway_ids, source_database=source_database,
            target_database=target_database, relevant_database='pathway')
    return mapping

def _process_mapping(
        mapping: KEGGmapping, func: t.Callable[..., KEGGmapping], source_database: str, target_database: str, relevant_database: str) -> KEGGmapping:
    """ Performs additional processing on a mapping according to a provided function.

    :param mapping: The mapping to process.
    :param func: The function that processes the mapping.
    :param source_database: The name of the source database of the mapping.
    :param target_database: The name of the target database of the mapping.
    :param relevant_database: The name of the database (expected to be either the source or the target) to which the processing is relevant.
    :return: The processed mapping.
    double_reverse = target_database == relevant_database
    if double_reverse:
        mapping = _reverse(mapping=mapping)
        target_database = source_database
    mapping = func(mapping=mapping, target_database=target_database)
    if double_reverse:
        mapping = _reverse(mapping=mapping)
    return mapping

def _add_glycans_or_drugs(
        mapping: KEGGmapping, source_database: str, target_database: str, add_glycans: bool, add_drugs: bool,
        kegg_rest: r.KEGGrest | None = None) -> KEGGmapping:
    """ If requested, adds the corresponding compound IDs of equivalent glycan and/or drug entries to a mapping (assuming mapping from "compound" to some target database).

    :param mapping: The mapping to add the IDs of compound-equivalents which cross-reference the target database.
    :param source_database: Logs a warning if not equal to "compound" and if the target database name is also not equal to "compound".
    :param target_database: The database with IDs to which compound IDs are mapped.
    :param add_glycans: Whether to add the corresponding compound IDs of KEGG glycan entries.
    :param add_drugs: Whether to add the corresponding compound IDs of KEGG drug entries.
    :param kegg_rest: The KEGGrest object to perform the "link" operation(s). If None, one is created with the default parameters.
    :return: The dictionary.
    :raises RuntimeError: Raised if the request to the KEGG REST API fails or times out.
    if add_glycans or add_drugs:
        if source_database != 'compound' and target_database != 'compound':
                f'Adding compound IDs (corresponding to equivalent glycan and/or drug entries) to a mapping where neither the source'
                f' database nor the target database are "compound". Databases specified: {source_database}, '

        # noinspection PyShadowingNames
        def add_glycans_or_drugs(mapping: KEGGmapping, target_database: str) -> KEGGmapping:
            if add_glycans:
                glycan_to_database = indirect_link(
                    source_database='compound', intermediate_database='glycan', target_database=target_database,
                mapping = combine_mappings(mapping1=mapping, mapping2=glycan_to_database)
            if add_drugs:
                drug_to_database = indirect_link(
                    source_database='compound', intermediate_database='drug', target_database=target_database,
                mapping = combine_mappings(mapping1=mapping, mapping2=drug_to_database)
            return mapping
        mapping = _process_mapping(
            mapping=mapping, func=add_glycans_or_drugs, source_database=source_database,
            target_database=target_database, relevant_database='compound')
    return mapping

# noinspection PyShadowingNames
[docs] def database_conv( kegg_database: str, outside_database: str, reverse: bool = False, kegg_rest: r.KEGGrest | None = None) -> KEGGmapping: """ Converts the output of the KEGG "conv" operation (of the form that maps the entry IDs of one database to the entry IDs of another) into a dictionary. :param kegg_database: The name of the KEGG database with entry IDs mapped to the outside database. :param outside_database: The name of the outside database with entry IDs mapped from the KEGG database. :param reverse: Reverses the mapping with the target becoming the source and the source becoming the target. Equivalent to calling the reverse() function of this module. :param kegg_rest: The KEGGrest object to perform the "conv" operation. If None, one is created with the default parameters. :return: The dictionary. :raises RuntimeError: Raised if the request to the KEGG REST API fails or times out. """ return _map_and_reverse( reverse=reverse, kegg_rest=kegg_rest, KEGGurl=ku.DatabaseConvKEGGurl, kegg_database=kegg_database, outside_database=outside_database)
# noinspection PyShadowingNames def _map_and_reverse(reverse: bool, **kwargs) -> KEGGmapping: """ Helper function for general mapping functionality ("link" or "conv") that creates a mapping with the option to reverse. :param reverse: Reverses the mapping with the target becoming the source and the source becoming the target. :param kwargs: The arguments for the _to_dict helper method. :return: """ mapping = _to_dict(**kwargs) if reverse: mapping = _reverse(mapping=mapping) return mapping # noinspection PyShadowingNames # noinspection PyShadowingNames
[docs] def entries_conv( entry_ids: list[str], target_database: str, reverse: bool = False, kegg_rest: r.KEGGrest | None = None) -> KEGGmapping: """ Converts the output of the KEGG "conv" operation (of the form that maps specific provided entry IDs to the IDs of a target database) to a dictionary. :param entry_ids: The IDs of the entries to map to entries in the target database. :param target_database: The name of the database with entry IDs mapped to from the provided entry IDs. :param reverse: Reverses the mapping with the target becoming the source and the source becoming the target. Equivalent to calling the reverse() function of this module. :param kegg_rest: The KEGGrest object to perform the "link" operation. If None, one is created with the default parameters. :return: The dictionary. :raises RuntimeError: Raised if the request to the KEGG REST API fails or times out. """ return _map_and_reverse( reverse=reverse, kegg_rest=kegg_rest, KEGGurl=ku.EntriesConvKEGGurl, target_database=target_database, entry_ids=entry_ids)
[docs] def combine_mappings(mapping1: KEGGmapping, mapping2: KEGGmapping) -> KEGGmapping: """ Combines two mappings together. If a key in mapping 2 is already in mapping 1, their values are merged in the combined mapping e.g. X -> {A,B} and X -> {B,C} becomes X -> {A,B,C}. :param mapping1: The first mapping to combine. :param mapping2: The second mapping to combine. :return: The combined mapping. """ combined = dict() for key1, values1 in mapping1.items(): _add_to_dict(dictionary=combined, key=key1, values=values1) for key2, values2 in mapping2.items(): _add_to_dict(dictionary=combined, key=key2, values=values2) return combined
[docs] def reverse(mapping: KEGGmapping) -> KEGGmapping: """ Reverses the dictionary (mapping entry IDs of one database to IDs of related entries) turning keys into values and values into keys. :param mapping: The dictionary (of entry IDs (strings) to sets of entry IDs) to reverse. :return: The reversed mapping. """ reversed_mapping = dict() for key, values in mapping.items(): for value in values: _add_to_dict(dictionary=reversed_mapping, key=value, values={key}) return reversed_mapping
_reverse = reverse # So functions can have a "reverse" boolean parameter without overriding the module-level "reverse" function. _mapping_schema = { 'type': 'object', 'additionalProperties': False, 'patternProperties': { '^.+$': { 'type': 'array', 'minItems': 1, 'items': { 'type': 'string', 'minLength': 1 } } } } _validation_error_message = 'The mapping must be a dictionary of entry IDs (strings) mapped to a set of entry IDs'
[docs] def to_json_string(mapping: KEGGmapping) -> str: """ Converts a mapping of entry IDs (dictionary created with this map module) to a JSON string. :param mapping: The dictionary to convert. :return: The JSON string. :raises ValidationError: Raised if the mapping does not follow the correct JSON schema. Should follow the correct schema if the dictionary was created with this map module. """ mapping_to_convert = dict[str, list[str]]() for entry_id, entry_ids in mapping.items(): mapping_to_convert[entry_id] = sorted(entry_ids) u.validate_json_object( json_object=mapping_to_convert, json_schema=_mapping_schema, validation_error_message=_validation_error_message) return json.dumps(mapping_to_convert, indent=2)
[docs] def save_to_json(mapping: KEGGmapping, file_path: str) -> None: """ Saves a mapping of entry IDs (dictionary created with this map module) to a JSON file, either in a regular directory or ZIP archive. :param mapping: The mapping to save. :param file_path: The path to the JSON file. If in a ZIP archive, the file path must be in the following format: /path/to/ (e.g. ./ :raises ValidationError: Raised if the mapping does not follow the correct JSON schema. Should follow the correct schema if the dictionary was created with this map module. """ mapping_str: str = to_json_string(mapping=mapping) u.save_output(output_target=file_path, output_content=mapping_str)
[docs] def load_from_json(file_path: str) -> KEGGmapping: """ Loads a mapping of entry IDs (dictionary created with this map module) to a JSON file, either in a regular directory or ZIP archive. :param file_path: The path to the JSON file. If in a ZIP archive, the file path must be in the following format: /path/to/ (e.g. ./ :return: The mapping. :raises ValidationError: Raised if the mapping does not follow the correct JSON schema. Should follow the correct schema if the dictionary was created with this map module. """ mapping: KEGGmapping = u.load_json_file(file_path=file_path, json_schema=_mapping_schema, validation_error_message=_validation_error_message) for entry_id, entry_ids in mapping.items(): mapping[entry_id] = set(entry_ids) return mapping