import re
from enum import Enum
from typing import List, TextIO, Optional, Dict, Set, Callable
import click
import validators
from bmt import Toolkit
from kgx.config import get_jsonld_context, get_logger
from kgx.graph.base_graph import BaseGraph
from kgx.utils.kgx_utils import (
get_toolkit,
snakecase_to_sentencecase,
sentencecase_to_snakecase,
camelcase_to_sentencecase,
GraphEntityType,
)
from kgx.prefix_manager import PrefixManager
logger = get_logger()
[docs]class ErrorType(Enum):
"""
Validation error types
"""
MISSING_NODE_PROPERTY = 1
MISSING_EDGE_PROPERTY = 2
INVALID_NODE_PROPERTY_VALUE_TYPE = 3
INVALID_NODE_PROPERTY_VALUE = 4
INVALID_EDGE_PROPERTY_VALUE_TYPE = 5
INVALID_EDGE_PROPERTY_VALUE = 6
NO_CATEGORY = 7
INVALID_CATEGORY = 8
NO_EDGE_PREDICATE = 9
INVALID_EDGE_PREDICATE = 10
[docs]class MessageLevel(Enum):
"""
Message level for validation reports
"""
# Recommendations
INFO = 1
# Message to convey 'should'
WARNING = 2
# Message to convey 'must'
ERROR = 3
[docs]class ValidationError(object):
"""
ValidationError class that represents an error.
Parameters
----------
entity: str
The node or edge entity that is failing validation
error_type: kgx.validator.ErrorType
The nature of the error
message: str
The error message
message_level: kgx.validator.MessageLevel
The message level
"""
def __init__(
self,
entity: str,
error_type: ErrorType,
message: str,
message_level: MessageLevel,
):
self.entity = entity
self.error_type = error_type
self.message = message
self.message_level = message_level
def __str__(self):
return f"[{self.message_level.name}][{self.error_type.name}] {self.entity} - {self.message}"
def as_dict(self):
return {
"entity": self.entity,
"error_type": self.error_type.name,
"message": self.message,
"message_level": self.message_level.name,
}
[docs]class Validator(object):
"""
Class for validating a property graph.
The optional 'progress_monitor' for the validator should be a lightweight Callable
which is injected into the class 'inspector' Callable, designed to intercepts
node and edge records streaming through the Validator (inside a Transformer.process() call.
The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE.
The second argument given to the Callable is the current record itself.
This Callable is strictly meant to be procedural and should *not* mutate the record.
The intent of this Callable is to provide a hook to KGX applications wanting the
namesake function of passively monitoring the graph data stream. As such, the Callable
could simply tally up the number of times it is called with a NODE or an EDGE, then
provide a suitable (quick!) report of that count back to the KGX application. The
Callable (function/callable class) should not modify the record and should be of low
complexity, so as not to introduce a large computational overhead to validation!
Parameters
----------
verbose: bool
Whether the generated report should be verbose or not (default: ``False``)
progress_monitor: Optional[Callable[[GraphEntityType, List], None]]
Function given a peek at the current record being processed by the class wrapped Callable.
schema: Optional[str]
URL to (Biolink) Model Schema to be used for validated (default: None, use default Biolink Model Toolkit schema)
"""
def __init__(
self,
verbose: bool = False,
progress_monitor: Optional[Callable[[GraphEntityType, List], None]] = None,
schema: Optional[str] = None,
):
# formal arguments
self.verbose: bool = verbose
self.progress_monitor: Optional[
Callable[[GraphEntityType, List], None]
] = progress_monitor
# internal attributes
# associated currently active _currently_active_toolkit with this Validator instance
self.validating_toolkit = self.get_toolkit()
self.prefix_manager = PrefixManager()
self.jsonld = get_jsonld_context()
self.prefixes = Validator.get_all_prefixes(self.jsonld)
self.required_node_properties = Validator.get_required_node_properties()
self.required_edge_properties = Validator.get_required_edge_properties()
self.errors: List[ValidationError] = list()
[docs] def __call__(self, entity_type: GraphEntityType, rec: List):
"""
Transformer 'inspector' Callable
"""
if self.progress_monitor:
self.progress_monitor(entity_type, rec)
if entity_type == GraphEntityType.EDGE:
self.errors += self.analyse_edge(*rec)
elif entity_type == GraphEntityType.NODE:
self.errors += self.analyse_node(*rec)
else:
raise RuntimeError("Unexpected GraphEntityType: " + str(entity_type))
def get_validating_toolkit(self):
return self.validating_toolkit
def get_validation_model_version(self):
return self.validating_toolkit.get_model_version()
def get_errors(self):
return self.errors
_currently_active_toolkit: Optional[Toolkit] = None
@classmethod
def set_biolink_model(cls, version: Optional[str]):
cls._currently_active_toolkit = get_toolkit(biolink_release=version)
@classmethod
def get_toolkit(cls) -> Toolkit:
if not cls._currently_active_toolkit:
cls._currently_active_toolkit = get_toolkit()
return cls._currently_active_toolkit
_default_model_version = None
@classmethod
def get_default_model_version(cls):
if not cls._default_model_version:
# get default Biolink version from BMT
cls._default_model_version = get_toolkit().get_model_version()
return cls._default_model_version
def analyse_node(self, n, data):
e1 = Validator.validate_node_properties(n, data, self.required_node_properties)
e2 = Validator.validate_node_property_types(
n, data, toolkit=self.validating_toolkit
)
e3 = Validator.validate_node_property_values(n, data)
e4 = Validator.validate_categories(n, data, toolkit=self.validating_toolkit)
return e1 + e2 + e3 + e4
def analyse_edge(self, u, v, k, data):
e1 = Validator.validate_edge_properties(
u, v, data, self.required_edge_properties
)
e2 = Validator.validate_edge_property_types(
u, v, data, toolkit=self.validating_toolkit
)
e3 = Validator.validate_edge_property_values(u, v, data)
e4 = Validator.validate_edge_predicate(
u, v, data, toolkit=self.validating_toolkit
)
return e1 + e2 + e3 + e4
[docs] @staticmethod
def get_all_prefixes(jsonld: Optional[Dict] = None) -> set:
"""
Get all prefixes from Biolink Model JSON-LD context.
It also sets ``self.prefixes`` for subsequent access.
Parameters
---------
jsonld: Optional[Dict]
The JSON-LD context
Returns
-------
Optional[Dict]
A set of prefixes
"""
if not jsonld:
jsonld = get_jsonld_context()
prefixes: Set = set(
k
for k, v in jsonld.items()
if isinstance(v, str)
or (isinstance(v, dict) and v.setdefault("@prefix", False))
) # @type: ignored
if "biolink" not in prefixes:
prefixes.add("biolink")
return prefixes
[docs] @staticmethod
def get_required_node_properties(toolkit: Optional[Toolkit] = None) -> list:
"""
Get all properties for a node that are required, as defined by Biolink Model.
Parameters
----------
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of required node properties
"""
if not toolkit:
toolkit = Validator.get_toolkit()
node_properties = toolkit.get_all_node_properties()
required_properties = []
for p in node_properties:
element = toolkit.get_element(p)
if element and element.deprecated is None:
if hasattr(element, "required") and element.required:
formatted_name = sentencecase_to_snakecase(element.name)
required_properties.append(formatted_name)
elif element.name == "category":
formatted_name = sentencecase_to_snakecase(element.name)
required_properties.append(formatted_name)
return required_properties
[docs] @staticmethod
def get_required_edge_properties(toolkit: Optional[Toolkit] = None) -> list:
"""
Get all properties for an edge that are required, as defined by Biolink Model.
Parameters
----------
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of required edge properties
"""
if not toolkit:
toolkit = Validator.get_toolkit()
edge_properties = toolkit.get_all_edge_properties()
required_properties = []
for p in edge_properties:
element = toolkit.get_element(p)
if element and element.deprecated is None:
if hasattr(element, "required") and element.required:
formatted_name = sentencecase_to_snakecase(element.name)
required_properties.append(formatted_name)
return required_properties
[docs] def validate(self, graph: BaseGraph) -> list:
"""
Validate nodes and edges in a graph.
TODO: Support strict mode
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph to validate
Returns
-------
list
A list of errors for a given graph
"""
node_errors = self.validate_nodes(graph)
edge_errors = self.validate_edges(graph)
self.errors = node_errors + edge_errors
return self.errors
[docs] def validate_nodes(self, graph: BaseGraph) -> list:
"""
Validate all the nodes in a graph.
This method validates for the following,
- Node properties
- Node property type
- Node property value type
- Node categories
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph to validate
Returns
-------
list
A list of errors for a given graph
"""
errors = []
with click.progressbar(
graph.nodes(data=True), label="Validating nodes in graph"
) as bar:
for n, data in bar:
errors += self.analyse_node(n, data)
return errors
[docs] def validate_edges(self, graph: BaseGraph) -> list:
"""
Validate all the edges in a graph.
This method validates for the following,
- Edge properties
- Edge property type
- Edge property value type
- Edge predicate
Parameters
----------
graph: kgx.graph.base_graph.BaseGraph
The graph to validate
Returns
-------
list
A list of errors for a given graph
"""
errors = []
with click.progressbar(
graph.edges(data=True), label="Validate edges in graph"
) as bar:
for u, v, data in bar:
errors += self.analyse_edge(u, v, None, data)
return errors
[docs] @staticmethod
def validate_node_properties(
node: str, data: dict, required_properties: list
) -> list:
"""
Checks if all the required node properties exist for a given node.
Parameters
----------
node: str
Node identifier
data: dict
Node properties
required_properties: list
Required node properties
Returns
-------
list
A list of errors for a given node
"""
errors = []
for p in required_properties:
if p not in data:
error_type = ErrorType.MISSING_NODE_PROPERTY
message = f"Required node property '{p}' missing"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
return errors
[docs] @staticmethod
def validate_edge_properties(
subject: str, object: str, data: dict, required_properties: list
) -> list:
"""
Checks if all the required edge properties exist for a given edge.
Parameters
----------
subject: str
Subject identifier
object: str
Object identifier
data: dict
Edge properties
required_properties: list
Required edge properties
Returns
-------
list
A list of errors for a given edge
"""
errors = []
for p in required_properties:
if p not in data:
if p == "association_id":
# check for 'id' property instead
if "id" not in data:
error_type = ErrorType.MISSING_EDGE_PROPERTY
message = f"Required edge property '{p}' missing"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
else:
error_type = ErrorType.MISSING_EDGE_PROPERTY
message = f"Required edge property '{p}' missing"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
return errors
[docs] @staticmethod
def validate_node_property_types(
node: str, data: dict, toolkit: Optional[Toolkit] = None
) -> list:
"""
Checks if node properties have the expected value type.
Parameters
----------
node: str
Node identifier
data: dict
Node properties
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of errors for a given node
"""
if not toolkit:
toolkit = Validator.get_toolkit()
errors = []
error_type = ErrorType.INVALID_NODE_PROPERTY_VALUE_TYPE
if not isinstance(node, str):
message = "Node property 'id' expected to be of type 'string'"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
for key, value in data.items():
element = toolkit.get_element(key)
if element:
if hasattr(element, "typeof"):
if element.typeof == "string" and not isinstance(value, str):
message = f"Node property '{key}' expected to be of type '{element.typeof}'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
elif (
element.typeof == "uriorcurie"
and not isinstance(value, str)
and not validators.url(value)
):
message = f"Node property '{key}' expected to be of type 'uri' or 'CURIE'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
elif element.typeof == "double" and not isinstance(
value, (int, float)
):
message = f"Node property '{key}' expected to be of type '{element.typeof}'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
else:
logger.warning(
"Skipping validation for Node property '{}'. Expected type '{}' vs Actual type '{}'".format(
key, element.typeof, type(value)
)
)
if hasattr(element, "multivalued"):
if element.multivalued:
if not isinstance(value, list):
message = f"Multi-valued node property '{key}' expected to be of type '{list}'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
else:
if isinstance(value, (list, set, tuple)):
message = f"Single-valued node property '{key}' expected to be of type '{str}'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
return errors
[docs] @staticmethod
def validate_edge_property_types(
subject: str, object: str, data: dict, toolkit: Optional[Toolkit] = None
) -> list:
"""
Checks if edge properties have the expected value type.
Parameters
----------
subject: str
Subject identifier
object: str
Object identifier
data: dict
Edge properties
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of errors for a given edge
"""
if not toolkit:
toolkit = Validator.get_toolkit()
errors = []
error_type = ErrorType.INVALID_EDGE_PROPERTY_VALUE_TYPE
if not isinstance(subject, str):
message = "'subject' of an edge expected to be of type 'string'"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
if not isinstance(object, str):
message = "'object' of an edge expected to be of type 'string'"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
for key, value in data.items():
element = toolkit.get_element(key)
if element:
if hasattr(element, "typeof"):
if element.typeof == "string" and not isinstance(value, str):
message = (
f"Edge property '{key}' expected to be of type 'string'"
)
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
elif (
element.typeof == "uriorcurie"
and not isinstance(value, str)
and not validators.url(value)
):
message = f"Edge property '{key}' expected to be of type 'uri' or 'CURIE'"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
elif element.typeof == "double" and not isinstance(
value, (int, float)
):
message = (
f"Edge property '{key}' expected to be of type 'double'"
)
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
else:
logger.warning(
"Skipping validation for Edge property '{}'. Expected type '{}' vs Actual type '{}'".format(
key, element.typeof, type(value)
)
)
if hasattr(element, "multivalued"):
if element.multivalued:
if not isinstance(value, list):
message = f"Multi-valued edge property '{key}' expected to be of type 'list'"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
else:
if isinstance(value, (list, set, tuple)):
message = f"Single-valued edge property '{key}' expected to be of type 'str'"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
return errors
[docs] @staticmethod
def validate_node_property_values(node: str, data: dict) -> list:
"""
Validate a node property's value.
Parameters
----------
node: str
Node identifier
data: dict
Node properties
Returns
-------
list
A list of errors for a given node
"""
errors = []
error_type = ErrorType.INVALID_NODE_PROPERTY_VALUE
if not PrefixManager.is_curie(node):
message = f"Node property 'id' expected to be of type 'CURIE'"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
else:
prefix = PrefixManager.get_prefix(node)
if prefix and prefix not in Validator.get_all_prefixes():
message = f"Node property 'id' has a value '{node}' with a CURIE prefix '{prefix}' is not represented in Biolink Model JSON-LD context"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
return errors
[docs] @staticmethod
def validate_edge_property_values(subject: str, object: str, data: dict) -> list:
"""
Validate an edge property's value.
Parameters
----------
subject: str
Subject identifier
object: str
Object identifier
data: dict
Edge properties
Returns
-------
list
A list of errors for a given edge
"""
errors = []
error_type = ErrorType.INVALID_EDGE_PROPERTY_VALUE
prefixes = Validator.get_all_prefixes()
if PrefixManager.is_curie(subject):
prefix = PrefixManager.get_prefix(subject)
if prefix and prefix not in prefixes:
message = f"Edge property 'subject' has a value '{subject}' with a CURIE prefix '{prefix}' that is not represented in Biolink Model JSON-LD context"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
else:
message = f"Edge property 'subject' has a value '{subject}' which is not a proper CURIE"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
if PrefixManager.is_curie(object):
prefix = PrefixManager.get_prefix(object)
if prefix not in prefixes:
message = f"Edge property 'object' has a value '{object}' with a CURIE prefix '{prefix}' that is not represented in Biolink Model JSON-LD context"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
else:
message = f"Edge property 'object' has a value '{object}' which is not a proper CURIE"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
if "relation" in data:
if PrefixManager.is_curie(data["relation"]):
prefix = PrefixManager.get_prefix(data["relation"])
if prefix not in prefixes:
message = f"Edge property 'relation' has a value '{data['relation']}' with a CURIE prefix '{prefix}' that is not represented in Biolink Model JSON-LD context"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
else:
message = f"Edge property 'relation' has a value '{data['relation']}' which is not a proper CURIE"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
return errors
[docs] @staticmethod
def validate_categories(
node: str, data: dict, toolkit: Optional[Toolkit] = None
) -> list:
"""
Validate ``category`` field of a given node.
Parameters
----------
node: str
Node identifier
data: dict
Node properties
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of errors for a given node
"""
if not toolkit:
toolkit = Validator.get_toolkit()
error_type = ErrorType.INVALID_CATEGORY
errors = []
categories = data.get("category")
if categories is None:
message = "Node does not have a 'category' property"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
elif not isinstance(categories, list):
message = f"Node property 'category' expected to be of type {list}"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
else:
for category in categories:
if PrefixManager.is_curie(category):
category = PrefixManager.get_reference(category)
m = re.match(r"^([A-Z][a-z\d]+)+$", category)
if not m:
# category is not CamelCase
error_type = ErrorType.INVALID_CATEGORY
message = f"Category '{category}' is not in CamelCase form"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
formatted_category = camelcase_to_sentencecase(category)
if toolkit.is_mixin(formatted_category):
message = f"Category '{category}' is a mixin in the Biolink Model"
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
elif not toolkit.is_category(formatted_category):
message = (
f"Category '{category}' unknown in the current Biolink Model"
)
errors.append(
ValidationError(node, error_type, message, MessageLevel.ERROR)
)
else:
c = toolkit.get_element(formatted_category.lower())
if c:
if category != c.name and category in c.aliases:
message = f"Category {category} is actually an alias for {c.name}; Should replace '{category}' with '{c.name}'"
errors.append(
ValidationError(
node, error_type, message, MessageLevel.ERROR
)
)
return errors
[docs] @staticmethod
def validate_edge_predicate(
subject: str, object: str, data: dict, toolkit: Optional[Toolkit] = None
) -> list:
"""
Validate ``edge_predicate`` field of a given edge.
Parameters
----------
subject: str
Subject identifier
object: str
Object identifier
data: dict
Edge properties
toolkit: Optional[Toolkit]
Optional externally provided toolkit (default: use Validator class defined toolkit)
Returns
-------
list
A list of errors for a given edge
"""
if not toolkit:
toolkit = Validator.get_toolkit()
error_type = ErrorType.INVALID_EDGE_PREDICATE
errors = []
edge_predicate = data.get("predicate")
if edge_predicate is None:
message = "Edge does not have an 'predicate' property"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
elif not isinstance(edge_predicate, str):
message = f"Edge property 'edge_predicate' expected to be of type 'string'"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
else:
if PrefixManager.is_curie(edge_predicate):
edge_predicate = PrefixManager.get_reference(edge_predicate)
m = re.match(r"^([a-z_][^A-Z\s]+_?[a-z_][^A-Z\s]+)+$", edge_predicate)
if m:
p = toolkit.get_element(snakecase_to_sentencecase(edge_predicate))
if p is None:
message = f"Edge predicate '{edge_predicate}' not in Biolink Model"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
elif edge_predicate != p.name and edge_predicate in p.aliases:
message = f"Edge predicate '{edge_predicate}' is actually an alias for {p.name}; Should replace {edge_predicate} with {p.name}"
errors.append(
ValidationError(
f"{subject}-{object}",
error_type,
message,
MessageLevel.ERROR,
)
)
else:
message = f"Edge predicate '{edge_predicate}' is not in snake_case form"
errors.append(
ValidationError(
f"{subject}-{object}", error_type, message, MessageLevel.ERROR
)
)
return errors
[docs] @staticmethod
def report(errors: List[ValidationError]) -> List:
"""
Prepare error report.
Parameters
----------
errors: List[ValidationError]
List of kgx.validator.ValidationError
Returns
-------
List
A list of formatted errors
"""
return [str(x) for x in errors]
[docs] def get_error_messages(self):
"""
A direct Validator "instance" method version of report()
that directly accesses the internal Validator self.errors list.
Returns
-------
List
A list of formatted error messages.
"""
return Validator.report(self.errors)
[docs] def write_report(self, outstream: TextIO) -> None:
"""
Write error report to a file
Parameters
----------
outstream: TextIO
The stream to write to
"""
for x in Validator.report(self.errors):
outstream.write(f"{x}\n")