import gzip
from typing import Any, Optional, Dict
import jsonstreams
from kgx.config import get_logger
from kgx.sink import Sink
log = get_logger()
[docs]class JsonSink(Sink):
"""
JsonSink is responsible for writing data as records
to a JSON.
Parameters
----------
filename: str
The filename to write to
format: str
The file format (``json``)
compression: Optional[str]
The compression type (``gz``)
kwargs: Any
Any additional arguments
"""
def __init__(
self,
filename: str,
format: str = "json",
compression: Optional[str] = None,
**kwargs: Any,
):
super().__init__()
self.filename = filename
if compression:
self.compression = compression
else:
self.compression = None
self.FH = jsonstreams.Stream(
jsonstreams.Type.object, filename=filename, pretty=True, indent=4
)
self.NH = None
self.EH = None
[docs] def write_node(self, record: Dict) -> None:
"""
Write a node record to JSON.
Parameters
----------
record: Dict
A node record
"""
if self.EH:
self.EH.close()
self.EH = None
if not self.NH:
self.NH = self.FH.subarray("nodes")
self.NH.write(record)
[docs] def write_edge(self, record: Dict) -> None:
"""
Write an edge record to JSON.
Parameters
----------
record: Dict
An edge record
"""
if self.NH:
self.NH.close()
self.NH = None
if not self.EH:
self.EH = self.FH.subarray("edges")
self.EH.write(record)
[docs] def finalize(self) -> None:
"""
Finalize by creating a compressed file, if needed.
"""
if self.NH:
self.NH.close()
if self.EH:
self.EH.close()
if self.FH:
self.FH.close()
if self.compression:
WH = gzip.open(f"{self.filename}.gz", "wb")
with open(self.filename, "r") as FH:
for line in FH.buffer:
WH.write(line)