299 lines
9.8 KiB
Python
299 lines
9.8 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
from asyncio.log import logger
|
|
import re
|
|
import os
|
|
import subprocess
|
|
from collections import OrderedDict
|
|
import xml.etree.ElementTree as ET
|
|
import logging
|
|
|
|
logger = logging.getLogger('twister')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# pylint: disable=anomalous-backslash-in-string
|
|
result_re = re.compile(".*(PASS|FAIL|SKIP) - (test_)?(.*) in (\d*[.,]?\d*) seconds")
|
|
class Harness:
|
|
GCOV_START = "GCOV_COVERAGE_DUMP_START"
|
|
GCOV_END = "GCOV_COVERAGE_DUMP_END"
|
|
FAULT = "ZEPHYR FATAL ERROR"
|
|
RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
|
|
RUN_FAILED = "PROJECT EXECUTION FAILED"
|
|
run_id_pattern = r"RunID: (?P<run_id>.*)"
|
|
|
|
|
|
ztest_to_status = {
|
|
'PASS': 'passed',
|
|
'SKIP': 'skipped',
|
|
'BLOCK': 'blocked',
|
|
'FAIL': 'failed'
|
|
}
|
|
|
|
def __init__(self):
|
|
self.state = None
|
|
self.type = None
|
|
self.regex = []
|
|
self.matches = OrderedDict()
|
|
self.ordered = True
|
|
self.repeat = 1
|
|
self.testcases = []
|
|
self.id = None
|
|
self.fail_on_fault = True
|
|
self.fault = False
|
|
self.capture_coverage = False
|
|
self.next_pattern = 0
|
|
self.record = None
|
|
self.recording = []
|
|
self.fieldnames = []
|
|
self.ztest = False
|
|
self.is_pytest = False
|
|
self.detected_suite_names = []
|
|
self.run_id = None
|
|
self.matched_run_id = False
|
|
self.run_id_exists = False
|
|
self.instance = None
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
|
|
def configure(self, instance):
|
|
self.instance = instance
|
|
config = instance.testsuite.harness_config
|
|
self.id = instance.testsuite.id
|
|
self.run_id = instance.run_id
|
|
if "ignore_faults" in instance.testsuite.tags:
|
|
self.fail_on_fault = False
|
|
|
|
if config:
|
|
self.type = config.get('type', None)
|
|
self.regex = config.get('regex', [])
|
|
self.repeat = config.get('repeat', 1)
|
|
self.ordered = config.get('ordered', True)
|
|
self.record = config.get('record', {})
|
|
|
|
def process_test(self, line):
|
|
|
|
runid_match = re.search(self.run_id_pattern, line)
|
|
if runid_match:
|
|
run_id = runid_match.group("run_id")
|
|
self.run_id_exists = True
|
|
if run_id == str(self.run_id):
|
|
self.matched_run_id = True
|
|
|
|
if self.RUN_PASSED in line:
|
|
if self.fault:
|
|
self.state = "failed"
|
|
else:
|
|
self.state = "passed"
|
|
|
|
if self.RUN_FAILED in line:
|
|
self.state = "failed"
|
|
|
|
if self.fail_on_fault:
|
|
if self.FAULT == line:
|
|
self.fault = True
|
|
|
|
if self.GCOV_START in line:
|
|
self.capture_coverage = True
|
|
elif self.GCOV_END in line:
|
|
self.capture_coverage = False
|
|
|
|
class Console(Harness):
|
|
|
|
def configure(self, instance):
|
|
super(Console, self).configure(instance)
|
|
if self.type == "one_line":
|
|
self.pattern = re.compile(self.regex[0])
|
|
elif self.type == "multi_line":
|
|
self.patterns = []
|
|
for r in self.regex:
|
|
self.patterns.append(re.compile(r))
|
|
|
|
def handle(self, line):
|
|
if self.type == "one_line":
|
|
if self.pattern.search(line):
|
|
self.state = "passed"
|
|
elif self.type == "multi_line" and self.ordered:
|
|
if (self.next_pattern < len(self.patterns) and
|
|
self.patterns[self.next_pattern].search(line)):
|
|
self.next_pattern += 1
|
|
if self.next_pattern >= len(self.patterns):
|
|
self.state = "passed"
|
|
elif self.type == "multi_line" and not self.ordered:
|
|
for i, pattern in enumerate(self.patterns):
|
|
r = self.regex[i]
|
|
if pattern.search(line) and not r in self.matches:
|
|
self.matches[r] = line
|
|
if len(self.matches) == len(self.regex):
|
|
self.state = "passed"
|
|
else:
|
|
logger.error("Unknown harness_config type")
|
|
|
|
if self.fail_on_fault:
|
|
if self.FAULT in line:
|
|
self.fault = True
|
|
|
|
if self.GCOV_START in line:
|
|
self.capture_coverage = True
|
|
elif self.GCOV_END in line:
|
|
self.capture_coverage = False
|
|
|
|
|
|
if self.record:
|
|
pattern = re.compile(self.record.get("regex", ""))
|
|
match = pattern.search(line)
|
|
if match:
|
|
csv = []
|
|
if not self.fieldnames:
|
|
for k,v in match.groupdict().items():
|
|
self.fieldnames.append(k)
|
|
|
|
for k,v in match.groupdict().items():
|
|
csv.append(v.strip())
|
|
self.recording.append(csv)
|
|
|
|
self.process_test(line)
|
|
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
if self.state == "passed":
|
|
tc.status = "passed"
|
|
else:
|
|
tc.status = "failed"
|
|
|
|
class Pytest(Harness):
|
|
def configure(self, instance):
|
|
super(Pytest, self).configure(instance)
|
|
self.running_dir = instance.build_dir
|
|
self.source_dir = instance.testsuite.source_dir
|
|
self.pytest_root = 'pytest'
|
|
self.pytest_args = []
|
|
self.is_pytest = True
|
|
config = instance.testsuite.harness_config
|
|
|
|
if config:
|
|
self.pytest_root = config.get('pytest_root', 'pytest')
|
|
self.pytest_args = config.get('pytest_args', [])
|
|
|
|
def handle(self, line):
|
|
''' Test cases that make use of pytest more care about results given
|
|
by pytest tool which is called in pytest_run(), so works of this
|
|
handle is trying to give a PASS or FAIL to avoid timeout, nothing
|
|
is writen into handler.log
|
|
'''
|
|
self.state = "passed"
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
tc.status = "passed"
|
|
|
|
def pytest_run(self, log_file):
|
|
''' To keep artifacts of pytest in self.running_dir, pass this directory
|
|
by "--cmdopt". On pytest end, add a command line option and provide
|
|
the cmdopt through a fixture function
|
|
If pytest harness report failure, twister will direct user to see
|
|
handler.log, this method writes test result in handler.log
|
|
'''
|
|
cmd = [
|
|
'pytest',
|
|
'-s',
|
|
os.path.join(self.source_dir, self.pytest_root),
|
|
'--cmdopt',
|
|
self.running_dir,
|
|
'--junit-xml',
|
|
os.path.join(self.running_dir, 'report.xml'),
|
|
'-q'
|
|
]
|
|
|
|
for arg in self.pytest_args:
|
|
cmd.append(arg)
|
|
|
|
log = open(log_file, "a")
|
|
outs = []
|
|
errs = []
|
|
|
|
with subprocess.Popen(cmd,
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE) as proc:
|
|
try:
|
|
outs, errs = proc.communicate()
|
|
tree = ET.parse(os.path.join(self.running_dir, "report.xml"))
|
|
root = tree.getroot()
|
|
for child in root:
|
|
if child.tag == 'testsuite':
|
|
if child.attrib['failures'] != '0':
|
|
self.state = "failed"
|
|
elif child.attrib['skipped'] != '0':
|
|
self.state = "skipped"
|
|
elif child.attrib['errors'] != '0':
|
|
self.state = "errors"
|
|
else:
|
|
self.state = "passed"
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
self.state = "failed"
|
|
except ET.ParseError:
|
|
self.state = "failed"
|
|
except IOError:
|
|
log.write("Can't access report.xml\n")
|
|
self.state = "failed"
|
|
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
if self.state == "passed":
|
|
tc.status = "passed"
|
|
log.write("Pytest cases passed\n")
|
|
elif self.state == "skipped":
|
|
tc.status = "skipped"
|
|
log.write("Pytest cases skipped\n")
|
|
log.write("Please refer report.xml for detail")
|
|
else:
|
|
tc.status = "failed"
|
|
log.write("Pytest cases failed\n")
|
|
|
|
log.write("\nOutput from pytest:\n")
|
|
log.write(outs.decode('UTF-8'))
|
|
log.write(errs.decode('UTF-8'))
|
|
log.close()
|
|
|
|
|
|
class Test(Harness):
|
|
RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
|
|
RUN_FAILED = "PROJECT EXECUTION FAILED"
|
|
test_suite_start_pattern = r"Running TESTSUITE (?P<suite_name>.*)"
|
|
ZTEST_START_PATTERN = r"START - (test_)?(.*)"
|
|
|
|
def handle(self, line):
|
|
test_suite_match = re.search(self.test_suite_start_pattern, line)
|
|
if test_suite_match:
|
|
suite_name = test_suite_match.group("suite_name")
|
|
self.detected_suite_names.append(suite_name)
|
|
|
|
testcase_match = re.search(self.ZTEST_START_PATTERN, line)
|
|
if testcase_match or self._match:
|
|
self.testcase_output += line + "\n"
|
|
self._match = True
|
|
|
|
match = result_re.match(line)
|
|
|
|
if match and match.group(2):
|
|
name = "{}.{}".format(self.id, match.group(3))
|
|
tc = self.instance.get_case_or_create(name)
|
|
|
|
matched_status = match.group(1)
|
|
tc.status = self.ztest_to_status[matched_status]
|
|
if tc.status == "skipped":
|
|
tc.reason = "ztest skip"
|
|
tc.duration = float(match.group(4))
|
|
if tc.status == "failed":
|
|
tc.output = self.testcase_output
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
self.ztest = True
|
|
|
|
self.process_test(line)
|
|
|
|
if not self.ztest and self.state:
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
if self.state == "passed":
|
|
tc.status = "passed"
|
|
else:
|
|
tc.status = "failed"
|
|
|
|
class Ztest(Test):
|
|
pass
|