Source code for kgx.sink.graph_sink

from typing import Dict

from kgx.graph.base_graph import BaseGraph

from kgx.config import get_graph_store_class
from kgx.sink.sink import Sink
from kgx.utils.kgx_utils import generate_edge_key


[docs]class GraphSink(Sink): """ GraphSink is responsible for writing data as records to an in memory graph representation. The underlying store is determined by the graph store class defined in config (``kgx.graph.nx_graph.NxGraph``, by default). Parameters ---------- graph: kgx.graph.base_graph.BaseGraph An instance of BaseGraph to read from """ def __init__(self, graph: BaseGraph = None): super().__init__() if graph: self.graph = graph else: self.graph = get_graph_store_class()()
[docs] def write_node(self, record: Dict) -> None: """ Write a node record to graph. Parameters ---------- record: Dict A node record """ self.graph.add_node(record["id"], **record)
[docs] def write_edge(self, record: Dict) -> None: """ Write an edge record to graph. Parameters ---------- record: Dict An edge record """ key = ( record["key"] if "key" in record else generate_edge_key( record["subject"], record["predicate"], record["object"] ) ) self.graph.add_edge(record["subject"], record["object"], key, **record)
[docs] def finalize(self) -> None: """ Perform any operations after writing nodes and edges to graph. """ pass