Source code for kgx.graph_operations

from typing import Dict, Optional

from kgx.config import get_logger
from kgx.graph.base_graph import BaseGraph
from kgx.utils.kgx_utils import (
    CORE_NODE_PROPERTIES,
    CORE_EDGE_PROPERTIES,
    generate_edge_key,
    current_time_in_millis,
)

log = get_logger()


[docs]def remap_node_identifier( graph: BaseGraph, category: str, alternative_property: str, prefix=None ) -> BaseGraph: """ Remap a node's 'id' attribute with value from a node's ``alternative_property`` attribute. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph category: string category referring to nodes whose 'id' needs to be remapped alternative_property: string property name from which the new value is pulled from prefix: string signifies that the value for ``alternative_property`` is a list and the ``prefix`` indicates which value to pick from the list Returns ------- kgx.graph.base_graph.BaseGraph The modified graph """ mapping: Dict = {} for nid, data in graph.nodes(data=True): node_data = data.copy() if "category" in node_data and category not in node_data["category"]: continue if alternative_property in node_data: alternative_values = node_data[alternative_property] if isinstance(alternative_values, (list, set, tuple)): if prefix: for v in alternative_values: if prefix in v: # take the first occurring value that contains the given prefix mapping[nid] = {"id": v} break else: # no prefix defined; pick the 1st one from list mapping[nid] = {"id": next(iter(alternative_values))} elif isinstance(alternative_values, str): if prefix: if alternative_values.startswith(prefix): mapping[nid] = {"id": alternative_values} else: # no prefix defined mapping[nid] = {"id": alternative_values} else: log.error( f"Cannot use {alternative_values} from alternative_property {alternative_property}" ) graph.set_node_attributes(graph, attributes=mapping) graph.relabel_nodes(graph, {k: list(v.values())[0] for k, v in mapping.items()}) # update 'subject' of all outgoing edges update_edge_keys = {} updated_subject_values = {} updated_object_values = {} for u, v, k, edge_data in graph.edges(data=True, keys=True): if u is not edge_data["subject"]: updated_subject_values[(u, v, k)] = {"subject": u} update_edge_keys[(u, v, k)] = { "edge_key": generate_edge_key(u, edge_data["predicate"], v) } if v is not edge_data["object"]: updated_object_values[(u, v, k)] = {"object": v} update_edge_keys[(u, v, k)] = { "edge_key": generate_edge_key(u, edge_data["predicate"], v) } graph.set_edge_attributes(graph, attributes=updated_subject_values) graph.set_edge_attributes(graph, attributes=updated_object_values) graph.set_edge_attributes(graph, attributes=update_edge_keys) return graph
[docs]def remap_node_property( graph: BaseGraph, category: str, old_property: str, new_property: str ) -> None: """ Remap the value in node ``old_property`` attribute with value from node ``new_property`` attribute. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph category: string Category referring to nodes whose property needs to be remapped old_property: string old property name whose value needs to be replaced new_property: string new property name from which the value is pulled from """ mapping = {} if old_property in CORE_NODE_PROPERTIES: raise AttributeError( f"node property {old_property} cannot be modified as it is a core property." ) for nid, data in graph.nodes(data=True): node_data = data.copy() if category in node_data and category not in node_data["category"]: continue if new_property in node_data: mapping[nid] = {old_property: node_data[new_property]} graph.set_node_attributes(graph, attributes=mapping)
[docs]def remap_edge_property( graph: BaseGraph, edge_predicate: str, old_property: str, new_property: str ) -> None: """ Remap the value in an edge ``old_property`` attribute with value from edge ``new_property`` attribute. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph edge_predicate: string edge_predicate referring to edges whose property needs to be remapped old_property: string Old property name whose value needs to be replaced new_property: string New property name from which the value is pulled from """ mapping = {} if old_property in CORE_EDGE_PROPERTIES: raise AttributeError( f"edge property {old_property} cannot be modified as it is a core property." ) for u, v, k, data in graph.edges(data=True, keys=True): edge_data = data.copy() if edge_predicate is not edge_data["predicate"]: continue if new_property in edge_data: mapping[(u, v, k)] = {old_property: edge_data[new_property]} graph.set_edge_attributes(graph, attributes=mapping)
[docs]def fold_predicate( graph: BaseGraph, predicate: str, remove_prefix: bool = False ) -> None: """ Fold predicate as node property where every edge with ``predicate`` will be folded as a node property. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph predicate: str The predicate to fold remove_prefix: bool Whether or not to remove prefix from the predicate (``False``, by default) """ node_cache = [] edge_cache = [] start = current_time_in_millis() p = predicate.split(":", 1)[1] if remove_prefix else predicate for u, v, k, data in graph.edges(keys=True, data=True): if data["predicate"] == predicate: node_cache.append((u, p, v)) edge_cache.append((u, v, k)) while node_cache: n = node_cache.pop() graph.add_node_attribute(*n) while edge_cache: e = edge_cache.pop() graph.remove_edge(*e) end = current_time_in_millis() log.info(f"Time taken: {end - start} ms")
[docs]def unfold_node_property( graph: BaseGraph, node_property: str, prefix: Optional[str] = None ) -> None: """ Unfold node property as a predicate where every node with ``node_property`` will be unfolded as an edge. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph node_property: str The node property to unfold prefix: Optional[str] The prefix to use """ node_cache = [] edge_cache = [] start = current_time_in_millis() p = f"{prefix}:{node_property}" if prefix else node_property for n, data in graph.nodes(data=True): sub = n if node_property in data: obj = data[node_property] edge_cache.append((sub, obj, p)) node_cache.append((n, node_property)) while edge_cache: e = edge_cache.pop() graph.add_edge( *e, **{"subject": e[0], "object": e[1], "predicate": e[2], "relation": e[2]} ) while node_cache: n = node_cache.pop() del graph.nodes()[n[0]][n[1]] end = current_time_in_millis() log.info(f"Time taken: {end - start} ms")
[docs]def remove_singleton_nodes(graph: BaseGraph) -> None: """ Remove singleton nodes (nodes that have a degree of 0) from the graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph """ start = current_time_in_millis() singleton = [] for n, d in graph.degree(): if d == 0: singleton.append(n) while singleton: n = singleton.pop() log.debug(f"Removing singleton node {n}") graph.remove_node(n) end = current_time_in_millis() log.info(f"Time taken: {end - start} ms")