Source code for kgx.graph_operations.clique_merge

import copy
from typing import Tuple, Optional, Dict, List, Any, Set, Union

import networkx as nx
from ordered_set import OrderedSet

from kgx.config import get_logger
from kgx.graph.base_graph import BaseGraph
from kgx.utils.kgx_utils import (
    get_prefix_prioritization_map,
    get_biolink_element,
    get_biolink_ancestors,
    current_time_in_millis,
    format_biolink_category,
    generate_edge_key,
    get_toolkit,
)

log = get_logger()
toolkit = get_toolkit()
SAME_AS = "biolink:same_as"
SUBCLASS_OF = "biolink:subclass_of"
LEADER_ANNOTATION = "clique_leader"
ORIGINAL_SUBJECT_PROPERTY = "_original_subject"
ORIGINAL_OBJECT_PROPERTY = "_original_object"


[docs]def clique_merge( target_graph: BaseGraph, leader_annotation: str = None, prefix_prioritization_map: Optional[Dict[str, List[str]]] = None, category_mapping: Optional[Dict[str, str]] = None, strict: bool = True, ) -> Tuple[BaseGraph, nx.MultiDiGraph]: """ Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph leader_annotation: str The field on a node that signifies that the node is the leader of a clique prefix_prioritization_map: Optional[Dict[str, List[str]]] A map that gives a prefix priority for one or more categories category_mapping: Optional[Dict[str, str]] Mapping for non-Biolink Model categories to Biolink Model categories strict: bool Whether or not to merge nodes in a clique that have conflicting node categories Returns ------- Tuple[kgx.graph.base_graph.BaseGraph, networkx.MultiDiGraph] A tuple containing the updated target graph, and the clique graph """ ppm = get_prefix_prioritization_map() if prefix_prioritization_map: ppm.update(prefix_prioritization_map) prefix_prioritization_map = ppm if not leader_annotation: leader_annotation = LEADER_ANNOTATION start = current_time_in_millis() clique_graph = build_cliques(target_graph) end = current_time_in_millis() log.info(f"Total time taken to build cliques: {end - start} ms") start = current_time_in_millis() elect_leader( target_graph, clique_graph, leader_annotation, prefix_prioritization_map, category_mapping, strict, ) end = current_time_in_millis() log.info(f"Total time taken to elect leaders for all cliques: {end - start} ms") start = current_time_in_millis() graph = consolidate_edges(target_graph, clique_graph, leader_annotation) end = current_time_in_millis() log.info(f"Total time taken to consolidate edges in target graph: {end - start} ms") return graph, clique_graph
[docs]def build_cliques(target_graph: BaseGraph) -> nx.MultiDiGraph: """ Builds a clique graph from ``same_as`` edges in ``target_graph``. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph An instance of BaseGraph that contains nodes and edges Returns ------- networkx.MultiDiGraph The clique graph with only ``same_as`` edges """ clique_graph = nx.MultiDiGraph() for n, data in target_graph.nodes(data=True): if "same_as" in data: new_data = copy.deepcopy(data) del new_data["same_as"] clique_graph.add_node(n, **new_data) for s in data["same_as"]: edge_data1 = {"subject": n, "predicate": SAME_AS, "object": s} if "provided_by" in data: edge_data1["provided_by"] = data["provided_by"] clique_graph.add_edge(n, s, **edge_data1) edge_data2 = {"subject": s, "predicate": SAME_AS, "object": n} if "provided_by" in data: edge_data2["provided_by"] = data["provided_by"] clique_graph.add_edge(s, n, **edge_data2) for u, v, data in target_graph.edges(data=True): if "predicate" in data and data["predicate"] == SAME_AS: # load all biolink:same_as edges to clique_graph clique_graph.add_node(u, **target_graph.nodes()[u]) clique_graph.add_node(v, **target_graph.nodes()[v]) clique_graph.add_edge(u, v, **data) clique_graph.add_edge( v, u, **{ "subject": v, "predicate": data["predicate"], "object": v, "relation": data["relation"], }, ) return clique_graph
[docs]def elect_leader( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, leader_annotation: str, prefix_prioritization_map: Optional[Dict[str, List[str]]], category_mapping: Optional[Dict[str, str]], strict: bool = True, ) -> BaseGraph: """ Elect leader for each clique in a graph. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.Graph The clique graph leader_annotation: str The field on a node that signifies that the node is the leader of a clique prefix_prioritization_map: Optional[Dict[str, List[str]]] A map that gives a prefix priority for one or more categories category_mapping: Optional[Dict[str, str]] Mapping for non-Biolink Model categories to Biolink Model categories strict: bool Whether or not to merge nodes in a clique that have conflicting node categories Returns ------- kgx.graph.base_graph.BaseGraph The updated target graph """ cliques = list(nx.strongly_connected_components(clique_graph)) log.info(f"Total cliques in clique graph: {len(cliques)}") count = 0 update_dict = {} for clique in cliques: log.info( f"Processing clique: {clique} with {[clique_graph.nodes()[x]['category'] if 'category' in clique_graph.nodes()[x] else None for x in clique]}" ) update_node_categories( target_graph, clique_graph, clique, category_mapping, strict ) clique_category, clique_category_ancestors = get_clique_category( clique_graph, clique ) log.debug(f"Clique category: {clique_category}") invalid_nodes = set() for n in clique: data = clique_graph.nodes()[n] if "_excluded_from_clique" in data and data["_excluded_from_clique"]: log.info( f"Removing invalid node {n} from clique graph; node marked to be excluded" ) clique_graph.remove_node(n) invalid_nodes.add(n) if data["category"][0] not in clique_category_ancestors: log.info( f"Removing invalid node {n} from the clique graph; node category {data['category'][0]} not in CCA: {clique_category_ancestors}" ) clique_graph.remove_node(n) invalid_nodes.add(n) filtered_clique = [x for x in clique if x not in invalid_nodes] if filtered_clique: if clique_category: # First check for LEADER_ANNOTATION property leader, election_strategy = get_leader_by_annotation( target_graph, clique_graph, filtered_clique, leader_annotation ) if not leader: # Leader is None; use prefix prioritization strategy log.debug( "Could not elect clique leader by looking for LEADER_ANNOTATION property; " "Using prefix prioritization instead" ) if ( prefix_prioritization_map and clique_category in prefix_prioritization_map.keys() ): leader, election_strategy = get_leader_by_prefix_priority( target_graph, clique_graph, filtered_clique, prefix_prioritization_map[clique_category], ) else: log.debug( f"No prefix order found for category '{clique_category}' in PREFIX_PRIORITIZATION_MAP" ) if not leader: # Leader is None; fall back to alphabetical sort on prefixes log.debug( "Could not elect clique leader by PREFIX_PRIORITIZATION; Using alphabetical sort on prefixes" ) leader, election_strategy = get_leader_by_sort( target_graph, clique_graph, filtered_clique ) log.debug( f"Elected {leader} as leader via {election_strategy} for clique {filtered_clique}" ) update_dict[leader] = { LEADER_ANNOTATION: True, "election_strategy": election_strategy, } count += 1 nx.set_node_attributes(clique_graph, update_dict) target_graph.set_node_attributes(target_graph, update_dict) log.info(f"Total merged cliques: {count}") return target_graph
[docs]def consolidate_edges( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, leader_annotation: str ) -> BaseGraph: """ Move all edges from nodes in a clique to the clique leader. Original subject and object of a node are preserved via ``ORIGINAL_SUBJECT_PROPERTY`` and ``ORIGINAL_OBJECT_PROPERTY`` Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.MultiDiGraph The clique graph leader_annotation: str The field on a node that signifies that the node is the leader of a clique Returns ------- kgx.graph.base_graph.BaseGraph The target graph where all edges from nodes in a clique are moved to clique leader """ cliques = list(nx.strongly_connected_components(clique_graph)) log.info(f"Consolidating edges in {len(cliques)} cliques") for clique in cliques: log.debug(f"Processing clique: {clique}") leaders: List = [ x for x in clique if leader_annotation in clique_graph.nodes()[x] and clique_graph.nodes()[x][leader_annotation] ] if len(leaders) == 0: log.debug("No leader elected for clique {}; skipping".format(clique)) continue leader: str = leaders[0] # update nodes in target graph target_graph.set_node_attributes( target_graph, { leader: { leader_annotation: clique_graph.nodes()[leader].get( leader_annotation ), "election_strategy": clique_graph.nodes()[leader].get( "election_strategy" ), } }, ) leader_equivalent_identifiers = set([x for x in clique_graph.neighbors(leader)]) for node in clique: if node == leader: continue log.debug(f"Looking for in_edges for {node}") in_edges = target_graph.in_edges(node, keys=False, data=True) filtered_in_edges = [x for x in in_edges if x[2]["predicate"] != SAME_AS] equiv_in_edges = [x for x in in_edges if x[2]["predicate"] == SAME_AS] log.debug(f"Moving {len(in_edges)} in-edges from {node} to {leader}") for u, v, edge_data in filtered_in_edges: key = generate_edge_key(u, edge_data["predicate"], v) target_graph.remove_edge(u, v, edge_key=key) edge_data[ORIGINAL_SUBJECT_PROPERTY] = edge_data["subject"] edge_data[ORIGINAL_OBJECT_PROPERTY] = edge_data["object"] edge_data["object"] = leader key = generate_edge_key(u, edge_data["predicate"], leader) if ( edge_data["subject"] == edge_data["object"] and edge_data["predicate"] == SUBCLASS_OF ): continue target_graph.add_edge( edge_data["subject"], edge_data["object"], key, **edge_data ) log.debug(f"Looking for out_edges for {node}") out_edges = target_graph.out_edges(node, keys=False, data=True) filtered_out_edges = [x for x in out_edges if x[2]["predicate"] != SAME_AS] equiv_out_edges = [x for x in out_edges if x[2]["predicate"] == SAME_AS] log.debug(f"Moving {len(out_edges)} out-edges from {node} to {leader}") for u, v, edge_data in filtered_out_edges: key = generate_edge_key(u, edge_data["predicate"], v) target_graph.remove_edge(u, v, edge_key=key) edge_data[ORIGINAL_SUBJECT_PROPERTY] = edge_data["subject"] edge_data[ORIGINAL_OBJECT_PROPERTY] = edge_data["object"] edge_data["subject"] = leader key = generate_edge_key(leader, edge_data["predicate"], v) if ( edge_data["subject"] == edge_data["object"] and edge_data["predicate"] == SUBCLASS_OF ): continue target_graph.add_edge( edge_data["subject"], edge_data["object"], key, **edge_data ) log.debug(f"equiv out edges: {equiv_out_edges}") equivalent_identifiers = set() for u, v, edge_data in equiv_in_edges: if u != leader: equivalent_identifiers.add(u) if v != leader: equivalent_identifiers.add(v) target_graph.remove_edge( u, v, edge_key=generate_edge_key(u, SAME_AS, v) ) log.debug(f"equiv out edges: {equiv_out_edges}") for u, v, edge_data in equiv_out_edges: if u != leader: log.debug(f"{u} is an equivalent identifier of leader {leader}") equivalent_identifiers.add(u) if v != leader: log.debug(f"{v} is an equivalent identifier of leader {leader}") equivalent_identifiers.add(v) target_graph.remove_edge( u, v, edge_key=generate_edge_key(u, SAME_AS, v) ) leader_equivalent_identifiers.update(equivalent_identifiers) log.debug( f"setting same_as property to leader node with {leader_equivalent_identifiers}" ) target_graph.set_node_attributes( target_graph, {leader: {"same_as": list(leader_equivalent_identifiers)}} ) log.debug( f"removing equivalent nodes of leader: {leader_equivalent_identifiers}" ) for n in leader_equivalent_identifiers: target_graph.remove_node(n) return target_graph
[docs]def update_node_categories( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, clique: List, category_mapping: Optional[Dict[str, str]], strict: bool = True, ) -> List: """ For a given clique, get category for each node in clique and validate against Biolink Model, mapping to Biolink Model category where needed. For example, If a node has ``biolink:Gene`` as its category, then this method adds all of its ancestors. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.Graph The clique graph clique: List A list of nodes from a clique category_mapping: Optional[Dict[str, str]] Mapping for non-Biolink Model categories to Biolink Model categories strict: bool Whether or not to merge nodes in a clique that have conflicting node categories Returns ------- List The clique """ updated_clique_graph_properties = {} updated_target_graph_properties = {} for node in clique: # For each node in a clique, get its category property data = clique_graph.nodes()[node] if "category" in data: categories = data["category"] else: categories = get_category_from_equivalence( target_graph, clique_graph, node, data ) # differentiate between valid and invalid categories ( valid_biolink_categories, invalid_biolink_categories, invalid_categories, ) = check_all_categories(categories) log.debug( f"valid biolink categories: {valid_biolink_categories} invalid biolink categories: {invalid_biolink_categories} invalid_categories: {invalid_categories}" ) # extend categories to have the longest list of ancestors extended_categories: List = [] for x in valid_biolink_categories: ancestors = get_biolink_ancestors(x) if len(ancestors) > len(extended_categories): extended_categories.extend(ancestors) log.debug(f"Extended categories: {extended_categories}") clique_graph_update_dict: Dict = {"category": list(extended_categories)} target_graph_update_dict: Dict = {} if invalid_biolink_categories: if strict: clique_graph_update_dict["_excluded_from_clique"] = True target_graph_update_dict["_excluded_from_clique"] = True clique_graph_update_dict[ "invalid_biolink_category" ] = invalid_biolink_categories target_graph_update_dict[ "invalid_biolink_category" ] = invalid_biolink_categories if invalid_categories: clique_graph_update_dict["_invalid_category"] = invalid_categories target_graph_update_dict["_invalid_category"] = invalid_categories updated_clique_graph_properties[node] = clique_graph_update_dict updated_target_graph_properties[node] = target_graph_update_dict nx.set_node_attributes(clique_graph, updated_clique_graph_properties) target_graph.set_node_attributes(target_graph, updated_target_graph_properties) return clique
[docs]def get_clique_category( clique_graph: nx.MultiDiGraph, clique: List ) -> Tuple[str, List]: """ Given a clique, identify the category of the clique. Parameters ---------- clique_graph: nx.MultiDiGraph Clique graph clique: List A list of nodes in clique Returns ------- Tuple[str, list] A tuple of clique category and its ancestors """ l = [clique_graph.nodes()[x]["category"] for x in clique] u = OrderedSet.union(*l) uo = sort_categories(u) log.debug(f"outcome of union (sorted): {uo}") clique_category = uo[0] clique_category_ancestors = get_biolink_ancestors(uo[0]) return clique_category, clique_category_ancestors
[docs]def check_categories( categories: List, closure: List, category_mapping: Optional[Dict[str, str]] = None ) -> Tuple[List, List, List]: """ Check categories to ensure whether values in ``categories`` are valid biolink categories. Valid biolink categories are classes that descend from 'NamedThing'. Mixins, while valid ancestors, are not valid categories. Parameters ---------- categories: List A list of categories to check closure: List A list of nodes in a clique category_mapping: Optional[Dict[str, str]] A map that provides mapping from a non-biolink category to a biolink category Returns ------- Tuple[List, List, List] A tuple consisting of valid biolink categories, invalid biolink categories, and invalid categories """ valid_biolink_categories = [] invalid_biolink_categories = [] invalid_categories = [] tk = get_toolkit() for x in categories: # use the toolkit to check if the declared category is actually a mixin. if tk.is_mixin(x): invalid_categories.append(x) continue # get biolink element corresponding to category element = get_biolink_element(x) if element: mapped_category = format_biolink_category(element["name"]) if mapped_category in closure: valid_biolink_categories.append(x) else: log.warning(f"category '{mapped_category}' not in closure: {closure}") if category_mapping: mapped = category_mapping[x] if x in category_mapping.keys() else x if mapped not in closure: log.warning( f"category '{mapped_category}' is not in category_mapping." ) invalid_biolink_categories.append(x) else: invalid_biolink_categories.append(x) else: log.warning(f"category '{x}' is not in Biolink Model") invalid_categories.append(x) continue return valid_biolink_categories, invalid_biolink_categories, invalid_categories
[docs]def check_all_categories(categories) -> Tuple[List, List, List]: """ Check all categories in ``categories``. Parameters ---------- categories: List A list of categories Returns ------- Tuple[List, List, List] A tuple consisting of valid biolink categories, invalid biolink categories, and invalid categories Note: the sort_categories method will re-arrange the passed in category list according to the distance of each list member from the top of their hierarchy. Each category's hierarchy is made up of its 'is_a' and mixin ancestors. """ previous: List = [] valid_biolink_categories: List = [] invalid_biolink_categories: List = [] invalid_categories: List = [] sc: List = sort_categories(categories) for c in sc: if previous: vbc, ibc, ic = check_categories( [c], get_biolink_ancestors(previous[0]), None ) else: vbc, ibc, ic = check_categories([c], get_biolink_ancestors(c), None) if vbc: valid_biolink_categories.extend(vbc) if ic: invalid_categories.extend(ic) if ibc: invalid_biolink_categories.extend(ibc) else: previous = vbc return valid_biolink_categories, invalid_biolink_categories, invalid_categories
[docs]def sort_categories(categories: Union[List, Set, OrderedSet]) -> List: """ Sort a list of categories from most specific to the most generic. Parameters ---------- categories: Union[List, Set, OrderedSet] A list of categories Returns ------- List A sorted list of categories where sorted means that the first element in the list returned has the most number of parents in the class hierarchy. """ weighted_categories = [] for c in categories: weighted_categories.append((len(get_biolink_ancestors(c)), c)) sorted_categories = sorted(weighted_categories, key=lambda x: x[0], reverse=True) return [x[1] for x in sorted_categories]
[docs]def get_category_from_equivalence( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, node: str, attributes: Dict ) -> List: """ Get category for a node based on its equivalent nodes in a graph. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.MultiDiGraph The clique graph node: str Node identifier attributes: Dict Node's attributes Returns ------- List Category for the node """ category: List = [] for u, v, data in clique_graph.edges(node, data=True): if data["predicate"] == SAME_AS: if u == node: if "category" in clique_graph.nodes()[v]: category = clique_graph.nodes()[v]["category"] break elif v == node: if "category" in clique_graph.nodes()[u]: category = clique_graph.nodes()[u]["category"] break update = {node: {"category": category}} nx.set_node_attributes(clique_graph, update) return category
[docs]def get_leader_by_annotation( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, clique: List, leader_annotation: str, ) -> Tuple[Optional[str], Optional[str]]: """ Get leader by searching for leader annotation property in any of the nodes in a given clique. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.MultiDiGraph The clique graph clique: List A list of nodes from a clique leader_annotation: str The field on a node that signifies that the node is the leader of a clique Returns ------- Tuple[Optional[str], Optional[str]] A tuple containing the node that has been elected as the leader and the election strategy """ leader = None election_strategy = None for node in clique: attributes = clique_graph.nodes()[node] if leader_annotation in attributes: if isinstance(attributes[leader_annotation], str): v = attributes[leader_annotation] if v == "true" or v == "True": leader = node elif isinstance(attributes[leader_annotation], list): v = attributes[leader_annotation][0] if isinstance(v, str): if v == "true" or v == "True": leader = node elif isinstance(v, bool): if eval(str(v)): leader = node elif isinstance(attributes[leader_annotation], bool): v = attributes[leader_annotation] if eval(str(v)): leader = node if leader: election_strategy = "LEADER_ANNOTATION" log.debug(f"Elected leader '{leader}' via LEADER_ANNOTATION") return leader, election_strategy
[docs]def get_leader_by_prefix_priority( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, clique: List, prefix_priority_list: List, ) -> Tuple[Optional[str], Optional[str]]: """ Get leader from clique based on a given prefix priority. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.MultiDiGraph The clique graph clique: List A list of nodes that correspond to a clique prefix_priority_list: List A list of prefixes in descending priority Returns ------- Tuple[Optional[str], Optional[str]] A tuple containing the node that has been elected as the leader and the election strategy """ leader = None election_strategy = None for prefix in prefix_priority_list: log.debug(f"Checking for prefix {prefix} in {clique}") leader = next((x for x in clique if prefix in x), None) if leader: election_strategy = "PREFIX_PRIORITIZATION" log.debug(f"Elected leader '{leader}' via {election_strategy}") break return leader, election_strategy
[docs]def get_leader_by_sort( target_graph: BaseGraph, clique_graph: nx.MultiDiGraph, clique: List ) -> Tuple[Optional[str], Optional[str]]: """ Get leader from clique based on the first selection from an alphabetical sort of the node id prefixes. Parameters ---------- target_graph: kgx.graph.base_graph.BaseGraph The original graph clique_graph: networkx.MultiDiGraph The clique graph clique: List A list of nodes that correspond to a clique Returns ------- Tuple[Optional[str], Optional[str]] A tuple containing the node that has been elected as the leader and the election strategy """ election_strategy = "ALPHABETICAL_SORT" prefixes = [x.split(":", 1)[0] for x in clique] prefixes.sort() leader_prefix = prefixes[0] log.debug(f"clique: {clique} leader_prefix: {leader_prefix}") leader = [x for x in clique if leader_prefix in x] if leader: log.debug(f"Elected leader '{leader}' via {election_strategy}") return leader[0], election_strategy