from itertools import chain
from typing import Generator, Any, Dict, Optional
from kgx.config import get_graph_store_class
from kgx.graph.base_graph import BaseGraph
from kgx.source.source import Source
from kgx.utils.kgx_utils import validate_node, validate_edge, sanitize_import
[docs]class GraphSource(Source):
"""
GraphSource is responsible for reading data as records
from an in memory graph representation.
The underlying store must be an instance of ``kgx.graph.base_graph.BaseGraph``
"""
def __init__(self):
super().__init__()
self.graph = get_graph_store_class()()
[docs] def parse(self, graph: BaseGraph, **kwargs: Any) -> Generator:
"""
This method reads from a graph and yields records.
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph to read from
kwargs: Any
Any additional arguments
Returns
-------
Generator
A generator for node and edge records read from the graph
"""
self.graph = graph
self.set_provenance_map(kwargs)
nodes = self.read_nodes()
edges = self.read_edges()
yield from chain(nodes, edges)
[docs] def read_nodes(self) -> Generator:
"""
Read nodes as records from the graph.
Returns
-------
Generator
A generator for nodes
"""
for n, data in self.graph.nodes(data=True):
if "id" not in data:
data["id"] = n
node_data = validate_node(data)
node_data = sanitize_import(node_data.copy())
self.set_node_provenance(node_data)
if self.check_node_filter(node_data):
self.node_properties.update(node_data.keys())
yield n, node_data
[docs] def read_edges(self) -> Generator:
"""
Read edges as records from the graph.
Returns
-------
Generator
A generator for edges
"""
for u, v, k, data in self.graph.edges(keys=True, data=True):
edge_data = validate_edge(data)
edge_data = sanitize_import(edge_data.copy())
self.set_edge_provenance(edge_data)
if self.check_edge_filter(edge_data):
self.node_properties.update(edge_data.keys())
yield u, v, k, edge_data