Transformer

The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.

The Transformer supports two modes:

  • No streaming

  • Streaming

No streaming

In this mode, records are read from a source and written to an intermediate graph. This intermediate graph can then be used as a substrate for various graph operations.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=False)

# read from TSV
t.transform(input_args=input_args)

# The intermediate graph store can be accessed via t.store.graph

# write to JSON
t.save(output_args=output_args)

Streaming

In this mode, records are read from a source and written to sink, on-the-fly.

Note

Using streaming disables certain functionalities like the ability to apply graph operations

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=True)

# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)

kgx.transformer

class kgx.transformer.Transformer(stream: bool = False)[source]

Bases: object

The Transformer class is responsible for transforming data from one form to another.

Parameters

stream (bool) – Whether or not to stream

get_sink(**kwargs: Dict) → kgx.sink.sink.Sink[source]

Get an instance of Sink that corresponds to a given format.

Parameters

kwargs (Dict) – Arguments required for initializing an instance of Sink

Returns

An instance of kgx.sink.Sink

Return type

Sink

get_source(format: str) → kgx.source.source.Source[source]

Get an instance of Source that corresponds to a given format.

Parameters

format (str) – The input store format

Returns

An instance of kgx.source.Source

Return type

Source

process(source: Generator, sink: kgx.sink.sink.Sink) → None[source]

This method is responsible for reading from source and writing to sink by calling the relevant methods based on the incoming data.

Note

The streamed data must not be mutated.

Parameters
  • source (Generator) – A generator from a Source

  • sink (kgx.sink.sink.Sink) – An instance of Sink

save(output_args: Dict) → None[source]

Save data from the in-memory store to a desired sink.

Parameters

output_args (Dict) – Arguments relevant to your output sink

transform(input_args: Dict, output_args: Dict = None) → None[source]

Transform an input source and write to an output sink.

If output_args is not defined then the data is persisted to an in-memory graph.

Parameters
  • input_args (Dict) – Arguments relevant to your input source

  • output_args (Dict) – Arguments relevant to your output sink