Source code for kgx.sink.tsv_sink

import os
import tarfile
from typing import Optional, Dict, Set, Any, List
from ordered_set import OrderedSet

from kgx.sink.sink import Sink
from kgx.utils.kgx_utils import (
    extension_types,
    archive_write_mode,
    archive_format,
    remove_null,
    _sanitize_export,
)


DEFAULT_NODE_COLUMNS = {"id", "name", "category", "description", "provided_by"}
DEFAULT_EDGE_COLUMNS = {
    "id",
    "subject",
    "predicate",
    "object",
    "relation",
    "category",
    "provided_by",
}


[docs]class TsvSink(Sink): """ TsvSink is responsible for writing data as records to a TSV/CSV. Parameters ---------- filename: str The filename to write to format: str The file format (``tsv``, ``csv``) compression: str The compression type (``tar``, ``tar.gz``) kwargs: Any Any additional arguments """ def __init__( self, filename: str, format: str, compression: Optional[str] = None, **kwargs: Any, ): super().__init__() if format not in extension_types: raise Exception(f"Unsupported format: {format}") self.delimiter = extension_types[format] self.dirname = os.path.abspath(os.path.dirname(filename)) self.basename = os.path.basename(filename) self.extension = format.split(":")[0] self.mode = ( archive_write_mode[compression] if compression in archive_write_mode else None ) self.nodes_file_basename = f"{self.basename}_nodes.{self.extension}" self.edges_file_basename = f"{self.basename}_edges.{self.extension}" if self.dirname: os.makedirs(self.dirname, exist_ok=True) if "node_properties" in kwargs: self.node_properties.update(set(kwargs["node_properties"])) else: self.node_properties.update(DEFAULT_NODE_COLUMNS) if "edge_properties" in kwargs: self.edge_properties.update(set(kwargs["edge_properties"])) else: self.edge_properties.update(DEFAULT_EDGE_COLUMNS) self.ordered_node_columns = TsvSink._order_node_columns(self.node_properties) self.ordered_edge_columns = TsvSink._order_edge_columns(self.edge_properties) self.nodes_file_name = os.path.join( self.dirname if self.dirname else "", self.nodes_file_basename ) self.NFH = open(self.nodes_file_name, "w") self.NFH.write(self.delimiter.join(self.ordered_node_columns) + "\n") self.edges_file_name = os.path.join( self.dirname if self.dirname else "", self.edges_file_basename ) self.EFH = open(self.edges_file_name, "w") self.EFH.write(self.delimiter.join(self.ordered_edge_columns) + "\n")
[docs] def write_node(self, record: Dict) -> None: """ Write a node record to the underlying store. Parameters ---------- record: Dict A node record """ row = self._build_export_row(record) row["id"] = record["id"] values = [] for c in self.ordered_node_columns: if c in row: values.append(str(row[c])) else: values.append("") self.NFH.write(self.delimiter.join(values) + "\n")
[docs] def write_edge(self, record: Dict) -> None: """ Write an edge record to the underlying store. Parameters ---------- record: Dict An edge record """ row = self._build_export_row(record) values = [] for c in self.ordered_edge_columns: if c in row: values.append(str(row[c])) else: values.append("") self.EFH.write(self.delimiter.join(values) + "\n")
[docs] def finalize(self) -> None: """ Close file handles and create an archive if compression mode is defined. """ self.NFH.close() self.EFH.close() if self.mode: archive_basename = f"{self.basename}.{archive_format[self.mode]}" archive_name = os.path.join( self.dirname if self.dirname else "", archive_basename ) with tarfile.open(name=archive_name, mode=self.mode) as tar: tar.add(self.nodes_file_name, arcname=self.nodes_file_basename) tar.add(self.edges_file_name, arcname=self.edges_file_basename) if os.path.isfile(self.nodes_file_name): os.remove(self.nodes_file_name) if os.path.isfile(self.edges_file_name): os.remove(self.edges_file_name)
@staticmethod def _build_export_row(data: Dict) -> Dict: """ Casts all values to primitive types like str or bool according to the specified type in ``_column_types``. Lists become pipe delimited strings. Parameters ---------- data: Dict A dictionary containing key-value pairs Returns ------- Dict A dictionary containing processed key-value pairs """ tidy_data = {} for key, value in data.items(): new_value = remove_null(value) if new_value: tidy_data[key] = _sanitize_export(key, new_value) return tidy_data @staticmethod def _order_node_columns(cols: Set) -> OrderedSet: """ Arrange node columns in a defined order. Parameters ---------- cols: Set A set with elements in any order Returns ------- OrderedSet A set with elements in a defined order """ node_columns = cols.copy() core_columns = OrderedSet( ["id", "category", "name", "description", "xref", "provided_by", "synonym"] ) ordered_columns = OrderedSet() for c in core_columns: if c in node_columns: ordered_columns.add(c) node_columns.remove(c) internal_columns = set() remaining_columns = node_columns.copy() for c in node_columns: if c.startswith("_"): internal_columns.add(c) remaining_columns.remove(c) ordered_columns.update(sorted(remaining_columns)) ordered_columns.update(sorted(internal_columns)) return ordered_columns @staticmethod def _order_edge_columns(cols: Set) -> OrderedSet: """ Arrange edge columns in a defined order. Parameters ---------- cols: Set A set with elements in any order Returns ------- OrderedSet A set with elements in a defined order """ edge_columns = cols.copy() core_columns = OrderedSet( [ "id", "subject", "predicate", "object", "category", "relation", "provided_by", ] ) ordered_columns = OrderedSet() for c in core_columns: if c in edge_columns: ordered_columns.add(c) edge_columns.remove(c) internal_columns = set() remaining_columns = edge_columns.copy() for c in edge_columns: if c.startswith("_"): internal_columns.add(c) remaining_columns.remove(c) ordered_columns.update(sorted(remaining_columns)) ordered_columns.update(sorted(internal_columns)) return ordered_columns
[docs] def set_node_properties(self, node_properties: List) -> None: """ Update node properties index with a given list. Parameters ---------- node_properties: List A list of node properties """ self._node_properties.update(node_properties) self.ordered_node_columns = TsvSink._order_node_columns(self._node_properties)
[docs] def set_edge_properties(self, edge_properties: List) -> None: """ Update edge properties index with a given list. Parameters ---------- edge_properties: List A list of edge properties """ self._edge_properties.update(edge_properties) self.ordered_edge_columns = TsvSink._order_edge_columns(self._edge_properties)