config_tools: abstract schema walk as a separate class

Today the script default_populator.py fills in default values by visiting
an XML schema and an XML tree simultaneously and add a node to the latter
whenever one satisfying the schema does not exist. This visiting logic is
not only useful for filling in default values, but also for upgrading
XMLs to new schemas.

This patch abstracts the flow of the visiting above as a separate class,
just like tree visitors or transformers. The current default value
populator is then refactored to extend that class by inheritance.

Tracked-On: #6690
Signed-off-by: Junjie Mao <junjie.mao@intel.com>
This commit is contained in:
Junjie Mao 2022-02-07 22:38:25 +08:00 committed by acrnsi-robot
parent 74071d1bd4
commit 9f13bbc349
3 changed files with 101 additions and 76 deletions

View File

@ -1,88 +1,37 @@
#!/usr/bin/env python3
#
# Copyright (C) 2021 Intel Corporation. All rights reserved.
# Copyright (C) 2022 Intel Corporation.
#
# SPDX-License-Identifier: BSD-3-Clause
#
import argparse
import lxml.etree as etree
from scenario_transformer import ScenarioTransformer
xpath_ns = {
"xs": "http://www.w3.org/2001/XMLSchema",
}
def get_node(element, xpath):
return next(iter(element.xpath(xpath, namespaces=xpath_ns)), None)
def populate_sequence(xsd_etree, xsd_sequence_node, xml_node):
children = list(xml_node)
for element_node in xsd_sequence_node.findall("xs:element", namespaces=xpath_ns):
element_name = element_node.get("name")
element_min_occurs = element_node.get("minOccurs")
if len(children) == 0:
if element_min_occurs != "0":
xml_child_node = etree.Element(element_name)
xml_node.append(xml_child_node)
populate(xsd_etree, element_node, xml_child_node, True)
elif children[0].tag != element_name:
if element_min_occurs != "0":
xml_child_node = etree.Element(element_name)
children[0].addprevious(xml_child_node)
populate(xsd_etree, element_node, xml_child_node, True)
else:
while len(children) > 0 and children[0].tag == element_name:
child_node = children.pop(0)
populate(xsd_etree, element_node, child_node, False)
def populate_all(xsd_etree, xsd_all_node, xml_node):
for element_node in xsd_all_node.findall("xs:element", namespaces=xpath_ns):
element_name = element_node.get("name")
if not element_name:
continue
xml_child_nodes = xml_node.findall(element_name)
if len(xml_child_nodes) == 0:
element_min_occurs = element_node.get("minOccurs")
if element_min_occurs == "0":
continue
xml_child_node = etree.Element(element_name)
xml_node.append(xml_child_node)
populate(xsd_etree, element_node, xml_child_node, True)
else:
for xml_child_node in xml_child_nodes:
populate(xsd_etree, element_node, xml_child_node, False)
def populate(xsd_etree, xsd_element_node, xml_node, is_new_node):
complex_type_node = xsd_element_node.find("xs:complexType", namespaces=xpath_ns)
if not complex_type_node:
type_name = xsd_element_node.get("type")
if type_name:
complex_type_node = get_node(xsd_etree, f"//xs:complexType[@name='{type_name}']")
if complex_type_node is not None:
sequence_node = complex_type_node.find("xs:sequence", namespaces=xpath_ns)
if sequence_node is not None:
populate_sequence(xsd_etree, sequence_node, xml_node)
all_node = complex_type_node.find("xs:all", namespaces=xpath_ns)
if all_node is not None:
populate_all(xsd_etree, all_node, xml_node)
elif is_new_node:
class DefaultValuePopulator(ScenarioTransformer):
def add_missing_nodes(self, xsd_element_node, xml_parent_node, new_node_index):
element_name = xsd_element_node.get("name")
default_value = xsd_element_node.get("default")
xml_node.text = default_value
new_node = etree.Element(element_name)
if default_value is not None:
new_node.text = default_value
if new_node_index is not None:
xml_parent_node.insert(new_node_index, new_node)
else:
xml_parent_node.append(new_node)
return [new_node]
def main(xsd_file, xml_file, out_file):
xml_etree = etree.parse(xml_file, etree.XMLParser(remove_blank_text=True))
xsd_etree = etree.parse(xsd_file)
xsd_etree.xinclude()
populator = DefaultValuePopulator(xsd_etree)
xml_root = xml_etree.getroot()
xsd_root = get_node(xsd_etree, f"/xs:schema/xs:element[@name='{xml_root.tag}']")
if xsd_root is not None:
populate(xsd_etree, xsd_root, xml_root, False)
xml_etree = etree.parse(xml_file, etree.XMLParser(remove_blank_text=True))
populator.transform(xml_etree)
xml_etree.write(out_file, pretty_print=True)
@ -90,7 +39,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Populate a given scenario XML with default values of nonexistent nodes")
parser.add_argument("xsd", help="Path to the schema of scenario XMLs")
parser.add_argument("xml", help="Path to the scenario XML file from users")
parser.add_argument("out", default="out.xml", help="Path where the output is placed")
parser.add_argument("out", nargs="?", default="out.xml", help="Path where the output is placed")
args = parser.parse_args()
main(args.xsd, args.xml, args.out)

View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Intel Corporation.
#
# SPDX-License-Identifier: BSD-3-Clause
#
class ScenarioTransformer:
xpath_ns = {
"xs": "http://www.w3.org/2001/XMLSchema",
}
@classmethod
def get_node(cls, element, xpath):
return next(iter(element.xpath(xpath, namespaces=cls.xpath_ns)), None)
def __init__(self, xsd_etree, visit_optional_node=False):
self.xsd_etree = xsd_etree
self._visit_optional_node = visit_optional_node
def transform_node(self, xsd_element_node, xml_node):
complex_type_node = xsd_element_node.find("xs:complexType", namespaces=self.xpath_ns)
if not complex_type_node:
type_name = xsd_element_node.get("type")
if type_name:
complex_type_node = self.get_node(self.xsd_etree, f"//xs:complexType[@name='{type_name}']")
if complex_type_node is not None:
xsd_sequence_node = complex_type_node.find("xs:sequence", namespaces=self.xpath_ns)
if xsd_sequence_node is not None:
self.transform_sequence(xsd_sequence_node, xml_node)
xsd_all_node = complex_type_node.find("xs:all", namespaces=self.xpath_ns)
if xsd_all_node is not None:
self.transform_all(xsd_all_node, xml_node)
def transform_sequence(self, xsd_sequence_node, xml_node):
children = list(enumerate(list(xml_node)))
for xsd_element_node in xsd_sequence_node.findall("xs:element", namespaces=self.xpath_ns):
element_name = xsd_element_node.get("name")
element_min_occurs = xsd_element_node.get("minOccurs")
if len(children) == 0 or children[0][1].tag != element_name:
if self._visit_optional_node or element_min_occurs != "0":
index = children[0][0] if len(children) > 0 else None
self.add_and_transform_missing_node(xsd_element_node, xml_node, new_node_index=index)
else:
while len(children) > 0 and children[0][1].tag == element_name:
self.transform_node(xsd_element_node, children.pop(0)[0])
def transform_all(self, xsd_all_node, xml_node):
for xsd_element_node in xsd_all_node.findall("xs:element", namespaces=self.xpath_ns):
element_name = xsd_element_node.get("name")
if not element_name:
continue
xml_children = xml_node.findall(element_name)
if len(xml_children) == 0:
element_min_occurs = xsd_element_node.get("minOccurs")
if self._visit_optional_node or element_min_occurs != "0":
self.add_and_transform_missing_node(xsd_element_node, xml_node)
else:
for xml_child_node in xml_children:
self.transform_node(xsd_element_node, xml_child_node)
def add_and_transform_missing_node(self, xsd_element_node, xml_parent_node, new_node_index=None):
for new_node in self.add_missing_nodes(xsd_element_node, xml_parent_node, new_node_index):
self.transform_node(xsd_element_node, new_node)
def add_missing_nodes(self, xsd_element_node, xml_parent_node, new_node_index):
return []
def transform(self, xml_etree):
xml_root_node = xml_etree.getroot()
xsd_root_node = self.get_node(self.xsd_etree, f"/xs:schema/xs:element[@name='{xml_root_node.tag}']")
if xsd_root_node is not None:
self.transform_node(xsd_root_node, xml_root_node)

View File

@ -18,7 +18,7 @@ except ImportError:
"To enable the validation, install the python package by executing: pip3 install xmlschema.")
sys.exit(0)
from default_populator import get_node, populate
from default_populator import DefaultValuePopulator
def existing_file_type(parser):
def aux(arg):
@ -40,12 +40,11 @@ def log_level_type(parser):
return aux
def load_schema(xsd_xml, datachecks_xml):
global schema, schema_etree, schema_root, datachecks
global schema, schema_etree, datachecks
schema_etree = etree.parse(xsd_xml)
schema_etree.xinclude()
schema = xmlschema.XMLSchema11(etree.tostring(schema_etree, encoding="unicode"))
schema_root = get_node(schema_etree, f"/xs:schema/xs:element")
datachecks_etree = etree.parse(datachecks_xml)
datachecks_etree.xinclude()
@ -55,7 +54,6 @@ config_tools_dir = os.path.join(os.path.dirname(__file__), "..")
schema_dir = os.path.join(config_tools_dir, "schema")
schema = None
schema_etree = None
schema_root = None
datachecks = None
load_schema(os.path.join(schema_dir, "config.xsd"), os.path.join(schema_dir, "datachecks.xsd"))
@ -67,7 +65,7 @@ def validate_one(board_xml, scenario_xml):
scenario_name = os.path.basename(scenario_xml)
scenario_etree = etree.parse(scenario_xml, etree.XMLParser(remove_blank_text=True))
populate(schema_etree, schema_root, scenario_etree.getroot(), False)
DefaultValuePopulator(schema_etree).transform(scenario_etree)
it = schema.iter_errors(scenario_etree)
for error in it: