307 lines
12 KiB
Python
Executable File
307 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2022 Intel Corporation.
|
|
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
#
|
|
|
|
import sys, os
|
|
import argparse
|
|
import logging
|
|
from copy import copy
|
|
from collections import namedtuple
|
|
import re
|
|
|
|
try:
|
|
import elementpath
|
|
import elementpath_overlay
|
|
from elementpath.xpath_context import XPathContext
|
|
import xmlschema
|
|
except ImportError:
|
|
logging.error("Python package `xmlschema` is not installed.\n" +
|
|
"The scenario XML file will NOT be validated against the schema, which may cause build-time or runtime errors.\n" +
|
|
"To enable the validation, install the python package by executing: pip3 install xmlschema.")
|
|
sys.exit(0)
|
|
|
|
from pipeline import PipelineObject, PipelineStage, PipelineEngine
|
|
from schema_slicer import SlicingSchemaByVMTypeStage
|
|
from default_populator import DefaultValuePopulatingStage
|
|
|
|
def existing_file_type(parser):
|
|
def aux(arg):
|
|
if not os.path.exists(arg):
|
|
parser.error(f"can't open {arg}: No such file or directory")
|
|
elif not os.path.isfile(arg):
|
|
parser.error(f"can't open {arg}: Is not a file")
|
|
else:
|
|
return arg
|
|
return aux
|
|
|
|
def log_level_type(parser):
|
|
def aux(arg):
|
|
arg = arg.lower()
|
|
if arg in ["critical", "error", "warning", "info", "debug"]:
|
|
return arg
|
|
else:
|
|
parser.error(f"{arg} is not a valid log level")
|
|
return aux
|
|
|
|
class ValidationError(dict):
|
|
logging_fns = {
|
|
"critical": logging.critical,
|
|
"error": logging.error,
|
|
"warning": logging.warning,
|
|
"info": logging.info,
|
|
"debug": logging.debug,
|
|
}
|
|
|
|
def __init__(self, paths, message, severity):
|
|
super().__init__(paths = paths, message = message, severity = severity)
|
|
|
|
def __str__(self):
|
|
return f"{', '.join(self['paths'])}: {self['message']}"
|
|
|
|
def log(self):
|
|
try:
|
|
self.logging_fns[self['severity']](self)
|
|
except KeyError:
|
|
logging.debug(self)
|
|
|
|
class ScenarioValidator:
|
|
def __init__(self, schema_etree, datachecks_etree):
|
|
"""Initialize the validator with preprocessed schemas in ElementTree."""
|
|
self.schema = xmlschema.XMLSchema11(schema_etree)
|
|
self.datachecks = xmlschema.XMLSchema11(datachecks_etree)
|
|
|
|
def check_syntax(self, scenario_etree):
|
|
errors = []
|
|
|
|
it = self.schema.iter_errors(scenario_etree)
|
|
for error in it:
|
|
# Syntactic errors are always critical.
|
|
e = ValidationError([error.path], error.reason, "critical")
|
|
e.log()
|
|
errors.append(e)
|
|
|
|
return errors
|
|
|
|
def check_semantics(self, board_etree, scenario_etree):
|
|
errors = []
|
|
|
|
unified_node = copy(scenario_etree.getroot())
|
|
parent_map = {c : p for p in unified_node.iter() for c in p}
|
|
unified_node.extend(board_etree.getroot())
|
|
it = self.datachecks.iter_errors(unified_node)
|
|
for error in it:
|
|
e = self.format_error(unified_node, parent_map, error)
|
|
e.log()
|
|
errors.append(e)
|
|
|
|
return errors
|
|
|
|
@staticmethod
|
|
def format_paths(unified_node, parent_map, report_on, variables):
|
|
elems = elementpath.select(unified_node, report_on, variables = variables)
|
|
paths = []
|
|
for elem in elems:
|
|
path = []
|
|
while elem is not None:
|
|
path_segment = elem.tag
|
|
parent = parent_map.get(elem, None)
|
|
if parent is not None:
|
|
children = parent.findall(elem.tag)
|
|
if len(children) > 1:
|
|
path_segment += f"[{children.index(elem) + 1}]"
|
|
path.insert(0, path_segment)
|
|
elem = parent
|
|
paths.append(f"/{'/'.join(path)}")
|
|
return paths
|
|
|
|
@staticmethod
|
|
def get_counter_example(error):
|
|
assertion = error.validator
|
|
if not isinstance(assertion, xmlschema.validators.assertions.XsdAssert):
|
|
return {}
|
|
|
|
elem = error.obj
|
|
context = XPathContext(elem, variables={'value': None})
|
|
context.counter_example = {}
|
|
result = assertion.token.evaluate(context)
|
|
|
|
if result == False:
|
|
return context.counter_example
|
|
else:
|
|
return {}
|
|
|
|
@staticmethod
|
|
def format_error(unified_node, parent_map, error):
|
|
def format_node(n):
|
|
if isinstance(n, str):
|
|
return n
|
|
elif isinstance(n, (int, float)):
|
|
return str(n)
|
|
elif isinstance(n, object) and n.__class__.__name__.endswith("Element"):
|
|
return n.text
|
|
else:
|
|
return str(n)
|
|
|
|
anno = error.validator.annotation
|
|
counter_example = ScenarioValidator.get_counter_example(error)
|
|
variables = {k.obj.source.strip("$"): v for k,v in counter_example.items()}
|
|
|
|
paths = ScenarioValidator.format_paths(unified_node, parent_map, anno.elem.get("{https://projectacrn.org}report-on"), variables)
|
|
description = anno.elem.find("{http://www.w3.org/2001/XMLSchema}documentation").text
|
|
severity = anno.elem.get("{https://projectacrn.org}severity")
|
|
|
|
expr_regex = re.compile("{[^{}]*}")
|
|
exprs = set(expr_regex.findall(description))
|
|
for expr in exprs:
|
|
result = elementpath.select(unified_node, expr.strip("{}"), variables = variables)
|
|
if isinstance(result, list):
|
|
if len(result) == 1:
|
|
value = format_node(result[0])
|
|
elif len(result) > 1:
|
|
s = ', '.join(map(format_node, result))
|
|
value = f"[{s}]"
|
|
else:
|
|
value = "{unknown}"
|
|
else:
|
|
value = str(result)
|
|
description = description.replace(expr, value)
|
|
|
|
return ValidationError(paths, description, severity)
|
|
|
|
class ValidatorConstructionStage(PipelineStage):
|
|
# The schema etree may still useful for schema-based transformation. Do not consume it.
|
|
uses = {"schema_etree"}
|
|
consumes = {"datachecks_etree"}
|
|
provides = {"validator"}
|
|
|
|
def run(self, obj):
|
|
validator = ScenarioValidator(obj.get("schema_etree"), obj.get("datachecks_etree"))
|
|
obj.set("validator", validator)
|
|
|
|
class ValidatorConstructionByFileStage(PipelineStage):
|
|
uses = {"schema_path", "datachecks_path"}
|
|
provides = {"validator"}
|
|
|
|
def run(self, obj):
|
|
validator = ScenarioValidator(obj.get("schema_path"), obj.get("datachecks_path"))
|
|
obj.set("validator", validator)
|
|
|
|
class SyntacticValidationStage(PipelineStage):
|
|
uses = {"validator", "scenario_etree"}
|
|
provides = {"syntactic_errors"}
|
|
|
|
def run(self, obj):
|
|
errors = obj.get("validator").check_syntax(obj.get("scenario_etree"))
|
|
obj.set("syntactic_errors", errors)
|
|
|
|
class SemanticValidationStage(PipelineStage):
|
|
uses = {"validator", "board_etree", "scenario_etree"}
|
|
provides = {"semantic_errors"}
|
|
|
|
def run(self, obj):
|
|
errors = obj.get("validator").check_semantics(obj.get("board_etree"), obj.get("scenario_etree"))
|
|
obj.set("semantic_errors", errors)
|
|
|
|
class ReportValidationResultStage(PipelineStage):
|
|
consumes = {"board_etree", "scenario_etree", "syntactic_errors", "semantic_errors"}
|
|
provides = {"nr_all_errors"}
|
|
|
|
def run(self, obj):
|
|
board_name = obj.get("board_etree").getroot().get("board")
|
|
scenario_name = obj.get("scenario_etree").getroot().get("scenario")
|
|
|
|
nr_critical = len(obj.get("syntactic_errors"))
|
|
nr_error = len(list(filter(lambda e: e["severity"] == "error", obj.get("semantic_errors"))))
|
|
nr_warning = len(list(filter(lambda e: e["severity"] == "warning", obj.get("semantic_errors"))))
|
|
|
|
if nr_critical > 0 or nr_error > 0:
|
|
logging.error(f"Board {board_name} and scenario {scenario_name} are inconsistent: {nr_critical} syntax errors, {nr_error} data errors, {nr_warning} warnings.")
|
|
elif nr_warning > 0:
|
|
logging.warning(f"Board {board_name} and scenario {scenario_name} are potentially inconsistent: {nr_warning} warnings.")
|
|
else:
|
|
logging.info(f"Board {board_name} and scenario {scenario_name} are valid and consistent.")
|
|
|
|
obj.set("nr_all_errors", nr_critical + nr_error + nr_warning)
|
|
|
|
def validate_one(validation_pipeline, pipeline_obj, board_xml, scenario_xml):
|
|
pipeline_obj.set("board_path", board_xml)
|
|
pipeline_obj.set("scenario_path", scenario_xml)
|
|
validation_pipeline.run(pipeline_obj)
|
|
return pipeline_obj.consume("nr_all_errors")
|
|
|
|
def validate_board(validation_pipeline, pipeline_obj, board_xml):
|
|
board_dir = os.path.dirname(board_xml)
|
|
nr_all_errors = 0
|
|
|
|
for f in os.listdir(board_dir):
|
|
if not f.endswith(".xml"):
|
|
continue
|
|
if f == os.path.basename(board_xml) or "launch" in f:
|
|
continue
|
|
nr_all_errors += validate_one(validation_pipeline, pipeline_obj, board_xml, os.path.join(board_dir, f))
|
|
|
|
return nr_all_errors
|
|
|
|
def validate_all(validation_pipeline, pipeline_obj, data_dir):
|
|
nr_all_errors = 0
|
|
|
|
for f in os.listdir(data_dir):
|
|
board_xml = os.path.join(data_dir, f, f"{f}.xml")
|
|
if os.path.isfile(board_xml):
|
|
nr_all_errors += validate_board(validation_pipeline, pipeline_obj, board_xml)
|
|
else:
|
|
logging.warning(f"Cannot find a board XML under {os.path.join(data_dir, f)}")
|
|
|
|
return nr_all_errors
|
|
|
|
def main(args):
|
|
from xml_loader import XMLLoadStage
|
|
from lxml_loader import LXMLLoadStage
|
|
|
|
validator_construction_pipeline = PipelineEngine(["schema_path", "datachecks_path"])
|
|
validator_construction_pipeline.add_stages([
|
|
LXMLLoadStage("schema"),
|
|
LXMLLoadStage("datachecks"),
|
|
SlicingSchemaByVMTypeStage(),
|
|
ValidatorConstructionStage(),
|
|
])
|
|
|
|
validation_pipeline = PipelineEngine(["board_path", "scenario_path", "schema_etree", "validator"])
|
|
validation_pipeline.add_stages([
|
|
XMLLoadStage("board"),
|
|
XMLLoadStage("scenario"),
|
|
DefaultValuePopulatingStage(),
|
|
SyntacticValidationStage(),
|
|
SemanticValidationStage(),
|
|
ReportValidationResultStage(),
|
|
])
|
|
|
|
obj = PipelineObject(schema_path = args.schema, datachecks_path = args.datachecks)
|
|
validator_construction_pipeline.run(obj)
|
|
if args.board and args.scenario:
|
|
nr_all_errors = validate_one(validation_pipeline, obj, args.board, args.scenario)
|
|
elif args.board:
|
|
nr_all_errors = validate_board(validation_pipeline, obj, args.board)
|
|
else:
|
|
nr_all_errors = validate_all(validation_pipeline, obj, os.path.join(config_tools_dir, "data"))
|
|
|
|
sys.exit(1 if nr_all_errors > 0 else 0)
|
|
|
|
if __name__ == "__main__":
|
|
config_tools_dir = os.path.join(os.path.dirname(__file__), "..")
|
|
schema_dir = os.path.join(config_tools_dir, "schema")
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("board", nargs="?", type=existing_file_type(parser), help="the board XML file to be validated")
|
|
parser.add_argument("scenario", nargs="?", type=existing_file_type(parser), help="the scenario XML file to be validated")
|
|
parser.add_argument("--loglevel", default="warning", type=log_level_type(parser), help="choose log level, e.g. debug, info, warning or error")
|
|
parser.add_argument("--schema", default=os.path.join(schema_dir, "config.xsd"), help="the XML schema that defines the syntax of scenario XMLs")
|
|
parser.add_argument("--datachecks", default=os.path.join(schema_dir, "datachecks.xsd"), help="the XML schema that defines the semantic rules against board and scenario data")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=args.loglevel.upper())
|
|
main(args)
|