Source code for kgx.sink.jsonl_sink

import gzip
import os
from typing import Optional, Dict, Any

import jsonlines

from kgx.sink.sink import Sink


[docs]class JsonlSink(Sink): """ JsonlSink is responsible for writing data as records to JSON lines. Parameters ---------- filename: str The filename to write to format: str The file format (``jsonl``) compression: Optional[str] The compression type (``gz``) kwargs: Any Any additional arguments """ def __init__( self, filename: str, format: str = "jsonl", compression: Optional[str] = None, **kwargs: Any, ): super().__init__() dirname = os.path.abspath(os.path.dirname(filename)) basename = os.path.basename(filename) nodes_filename = os.path.join( dirname if dirname else "", f"{basename}_nodes.{format}" ) edges_filename = os.path.join( dirname if dirname else "", f"{basename}_edges.{format}" ) if dirname: os.makedirs(dirname, exist_ok=True) if compression == "gz": nodes_filename += f".{compression}" edges_filename += f".{compression}" NFH = gzip.open(nodes_filename, "wb") self.NFH = jsonlines.Writer(NFH) EFH = gzip.open(edges_filename, "wb") self.EFH = jsonlines.Writer(EFH) else: self.NFH = jsonlines.open(nodes_filename, "w") self.EFH = jsonlines.open(edges_filename, "w")
[docs] def write_node(self, record: Dict) -> None: """ Write a node record to JSON. Parameters ---------- record: Dict A node record """ self.NFH.write(record)
[docs] def write_edge(self, record: Dict) -> None: """ Write an edge record to JSON. Parameters ---------- record: Dict A node record """ self.EFH.write(record)
[docs] def finalize(self) -> None: """ Perform any operations after writing the file. """ self.NFH.close() self.EFH.close()