Source code for kgx.source.tsv_source

import re
import tarfile
from typing import Dict, Tuple, Any, Generator, Optional, List
import pandas as pd

from kgx.config import get_logger
from kgx.source.source import Source
from kgx.utils.kgx_utils import (
    generate_uuid,
    generate_edge_key,
    extension_types,
    archive_read_mode,
    sanitize_import,
    validate_edge,
    validate_node,
)

log = get_logger()


[docs]class TsvSource(Source): """ TsvSource is responsible for reading data as records from a TSV/CSV. """ def __init__(self): super().__init__()
[docs] def set_prefix_map(self, m: Dict) -> None: """ Add or override default prefix to IRI map. Parameters ---------- m: Dict Prefix to IRI map """ self.prefix_manager.set_prefix_map(m)
[docs] def set_reverse_prefix_map(self, m: Dict) -> None: """ Add or override default IRI to prefix map. Parameters ---------- m: Dict IRI to prefix map """ self.prefix_manager.set_reverse_prefix_map(m)
[docs] def parse( self, filename: str, format: str, compression: Optional[str] = None, **kwargs: Any, ) -> Generator: """ This method reads from a TSV/CSV and yields records. Parameters ---------- filename: str The filename to parse format: str The format (``tsv``, ``csv``) compression: Optional[str] The compression type (``tar``, ``tar.gz``) kwargs: Any Any additional arguments Returns ------- Generator A generator for node and edge records """ if "delimiter" not in kwargs: # infer delimiter from file format kwargs["delimiter"] = extension_types[format] # type: ignore if "lineterminator" not in kwargs: # set '\n' to be the default line terminator to prevent # truncation of lines due to hidden/escaped carriage returns kwargs["lineterminator"] = "\n" # type: ignore mode = ( archive_read_mode[compression] if compression in archive_read_mode else None ) self.set_provenance_map(kwargs) if format == "tsv": kwargs["quoting"] = 3 # type: ignore if mode: with tarfile.open(filename, mode=mode) as tar: # Alas, the order that tar file members is important in some streaming operations # (e.g. graph-summary and validation) in that generally, all the node files need to be # loaded first, followed by the associated edges files can be loaded and analysed. # Start by partitioning files of each type into separate lists node_files: List[str] = list() edge_files: List[str] = list() for name in tar.getnames(): if re.search(f"nodes.{format}", name): node_files.append(name) elif re.search(f"edges.{format}", name): edge_files.append(name) else: # This used to throw an exception but perhaps we should simply ignore it. log.warning( f"Tar archive contains an unrecognized file: {name}. Skipped..." ) # Then, first extract and capture contents of the nodes files... for name in node_files: try: member = tar.getmember(name) except KeyError: log.warning( f"Node file {name} member in archive {filename} could not be accessed? Skipped?" ) continue f = tar.extractfile(member) # TODO: can this somehow be streamed here? file_iter = pd.read_csv( f, dtype=str, chunksize=10000, low_memory=False, keep_default_na=False, **kwargs, ) for chunk in file_iter: self.node_properties.update(chunk.columns) yield from self.read_nodes(chunk) # Next, extract and capture contents of the edges files... for name in edge_files: try: member = tar.getmember(name) except KeyError: log.warning( f"Edge file {name} member in archive {filename} could not be accessed? Skipped?" ) continue f = tar.extractfile(member) # TODO: can this somehow be streamed here? file_iter = pd.read_csv( f, dtype=str, chunksize=10000, low_memory=False, keep_default_na=False, **kwargs, ) for chunk in file_iter: self.edge_properties.update(chunk.columns) yield from self.read_edges(chunk) else: file_iter = pd.read_csv( filename, dtype=str, chunksize=10000, low_memory=False, keep_default_na=False, **kwargs, ) if re.search(f"nodes.{format}", filename): for chunk in file_iter: self.node_properties.update(chunk.columns) yield from self.read_nodes(chunk) elif re.search(f"edges.{format}", filename): for chunk in file_iter: self.edge_properties.update(chunk.columns) yield from self.read_edges(chunk) else: # This used to throw an exception but perhaps we should simply ignore it. log.warning( f"Parse function cannot resolve the KGX file type in name {filename}. Skipped..." )
[docs] def read_nodes(self, df: pd.DataFrame) -> Generator: """ Read records from pandas.DataFrame and yield records. Parameters ---------- df: pandas.DataFrame Dataframe containing records that represent nodes Returns ------- Generator A generator for node records """ for obj in df.to_dict("records"): yield self.read_node(obj)
[docs] def read_node(self, node: Dict) -> Optional[Tuple[str, Dict]]: """ Prepare a node. Parameters ---------- node: Dict A node Returns ------- Optional[Tuple[str, Dict]] A tuple that contains node id and node data """ node = validate_node(node) node_data = sanitize_import(node.copy()) if "id" in node_data: n = node_data["id"] self.set_node_provenance(node_data) self.node_properties.update(list(node_data.keys())) if self.check_node_filter(node_data): self.node_properties.update(node_data.keys()) return n, node_data else: log.info(f"Ignoring node with no 'id': {node}")
[docs] def read_edges(self, df: pd.DataFrame) -> Generator: """ Load edges from pandas.DataFrame into an instance of BaseGraph. Parameters ---------- df: pandas.DataFrame Dataframe containing records that represent edges Returns ------- Generator A generator for edge records """ for obj in df.to_dict("records"): yield self.read_edge(obj)
[docs] def read_edge(self, edge: Dict) -> Optional[Tuple]: """ Load an edge into an instance of BaseGraph. Parameters ---------- edge: Dict An edge Returns ------- Optional[Tuple] A tuple that contains subject id, object id, edge key, and edge data """ edge = validate_edge(edge) edge_data = sanitize_import(edge.copy()) if "id" not in edge_data: edge_data["id"] = generate_uuid() s = edge_data["subject"] o = edge_data["object"] self.set_edge_provenance(edge_data) key = generate_edge_key(s, edge_data["predicate"], o) self.edge_properties.update(list(edge_data.keys())) if self.check_edge_filter(edge_data): self.node_properties.update(edge_data.keys()) return s, o, key, edge_data