Source code for kgx.graph_operations.graph_merge

import copy
from typing import List

from kgx.config import get_logger
from kgx.graph.base_graph import BaseGraph
from kgx.utils.kgx_utils import prepare_data_dict


log = get_logger()


[docs]def merge_all_graphs(graphs: List[BaseGraph], preserve: bool = True) -> BaseGraph: """ Merge one or more graphs. .. note:: This method will first pick the largest graph in ``graphs`` and use that as the target to merge the remaining graphs. This is to reduce the memory footprint for this operation. The criteria for largest graph is the graph with the largest number of edges. The caveat is that the merge operation has a side effect where the largest graph is altered. If you would like to ensure that all incoming graphs remain as-is, then look at ``merge_graphs``. The outcome of the merge on node and edge properties depend on the ``preserve`` parameter. If preserve is ``True`` then, - core properties will not be overwritten - other properties will be concatenated to a list If preserve is ``False`` then, - core properties will not be overwritten - other properties will be replaced Parameters ---------- graphs: List[kgx.graph.base_graph.BaseGraph] A list of instances of BaseGraph to merge preserve: bool Whether or not to preserve conflicting properties Returns ------- kgx.graph.base_graph.BaseGraph The merged graph """ graph_size = [len(x.edges()) for x in graphs] largest = graphs.pop(graph_size.index(max(graph_size))) log.debug( f"Largest graph {largest.name} has {len(largest.nodes())} nodes and {len(largest.edges())} edges" ) merged_graph = merge_graphs(largest, graphs, preserve) return merged_graph
[docs]def merge_graphs( graph: BaseGraph, graphs: List[BaseGraph], preserve: bool = True ) -> BaseGraph: """ Merge all graphs in ``graphs`` to ``graph``. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph An instance of BaseGraph graphs: List[kgx.graph.base_graph.BaseGraph] A list of instances of BaseGraph to merge preserve: bool Whether or not to preserve conflicting properties Returns ------- kgx.graph.base_graph.BaseGraph The merged graph """ for g in graphs: node_merge_count = add_all_nodes(graph, g, preserve) log.info( f"Number of nodes merged between {graph.name} and {g.name}: {node_merge_count}" ) edge_merge_count = add_all_edges(graph, g, preserve) log.info( f"Number of edges merged between {graph.name} and {g.name}: {edge_merge_count}" ) return graph
[docs]def add_all_nodes(g1: BaseGraph, g2: BaseGraph, preserve: bool = True) -> int: """ Add all nodes from source graph (``g2``) to target graph (``g1``). Parameters ---------- g1: kgx.graph.base_graph.BaseGraph Target graph g2: kgx.graph.base_graph.BaseGraph Source graph preserve: bool Whether or not to preserve conflicting properties Returns ------- int Number of nodes merged during this operation """ log.info(f"Adding {g2.number_of_nodes()} nodes from {g2.name} to {g1.name}") merge_count = 0 for n, data in g2.nodes(data=True): if n in g1.nodes(): merge_node(g1, n, data, preserve) merge_count += 1 else: g1.add_node(n, **data) return merge_count
[docs]def merge_node(g: BaseGraph, n: str, data: dict, preserve: bool = True) -> dict: """ Merge node ``n`` into graph ``g``. Parameters ---------- g: kgx.graph.base_graph.BaseGraph The target graph n: str Node id data: dict Node properties preserve: bool Whether or not to preserve conflicting properties Returns ------- dict The merged node """ existing_node = g.nodes()[n] new_data = prepare_data_dict( copy.deepcopy(existing_node), copy.deepcopy(data), preserve ) g.add_node(n, **new_data) return existing_node
[docs]def add_all_edges(g1: BaseGraph, g2: BaseGraph, preserve: bool = True) -> int: """ Add all edges from source graph (``g2``) to target graph (``g1``). Parameters ---------- g1: kgx.graph.base_graph.BaseGraph Target graph g2: kgx.graph.base_graph.BaseGraph Source graph preserve: bool Whether or not to preserve conflicting properties Returns ------- int Number of edges merged during this operation """ log.info(f"Adding {g2.number_of_edges()} edges from {g2} to {g1}") merge_count = 0 for u, v, key, data in g2.edges(keys=True, data=True): if g1.has_edge(u, v, key): merge_edge(g1, u, v, key, data, preserve) merge_count += 1 else: g1.add_edge(u, v, edge_key=key, **data) return merge_count
[docs]def merge_edge( g: BaseGraph, u: str, v: str, key: str, data: dict, preserve: bool = True ) -> dict: """ Merge edge ``u`` -> ``v`` into graph ``g``. Parameters ---------- g: kgx.graph.base_graph.BaseGraph The target graph u: str Subject node id v: str Object node id key: str Edge key data: dict Node properties preserve: bool Whether or not to preserve conflicting properties Returns ------- dict The merged edge """ existing_edge = g.get_edge(u, v, key) new_data = prepare_data_dict( copy.deepcopy(existing_edge), copy.deepcopy(data), preserve ) g.add_edge(u, v, edge_key=key, **new_data) return existing_edge