298 lines
11 KiB
Python
298 lines
11 KiB
Python
|
#!/usr/bin/env python3
|
||
|
#
|
||
|
# Copyright (c) 2024 Intel Corporation
|
||
|
#
|
||
|
# SPDX-License-Identifier: Apache-2.0
|
||
|
|
||
|
"""
|
||
|
This script converts memory footprint data prepared by `./footprint/scripts/track.py`
|
||
|
into a JSON files compatible with Twister report schema making them ready for upload
|
||
|
to the same ElasticSearch data storage together with other Twister reports
|
||
|
for analysis, visualization, etc.
|
||
|
|
||
|
The memory footprint input data files (rom.json, ram.json) are expected in directories
|
||
|
sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s).
|
||
|
The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT'
|
||
|
with the corresponding sub-directories.
|
||
|
|
||
|
For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be
|
||
|
expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3`
|
||
|
release commits collected for `frdm_k64f` board.
|
||
|
Note: for the above example to work the bash recursive globbing should be active:
|
||
|
`shopt -s globstar`.
|
||
|
|
||
|
The output `twister_footprint.json` files will be placed into the same directories
|
||
|
as the corresponding input files.
|
||
|
|
||
|
In Twister report a test instance has either long or short name, each needs test
|
||
|
suite name from the test configuration yaml file.
|
||
|
This scripts has `--test-name` parameter to customize how to compose test names
|
||
|
from the plan.txt columns including an additional (last) one whth explicit
|
||
|
test suite name ('dot separated' format).
|
||
|
"""
|
||
|
|
||
|
from __future__ import annotations
|
||
|
|
||
|
from datetime import datetime, timezone
|
||
|
import argparse
|
||
|
import os
|
||
|
import sys
|
||
|
import re
|
||
|
import csv
|
||
|
import logging
|
||
|
import json
|
||
|
from git import Repo
|
||
|
from git.exc import BadName
|
||
|
|
||
|
|
||
|
VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$")
|
||
|
PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name']
|
||
|
TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' }
|
||
|
FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' }
|
||
|
RESULT_FILENAME = 'twister_footprint.json'
|
||
|
HWMv2_LEVELS = 3
|
||
|
|
||
|
logger = None
|
||
|
LOG_LEVELS = {
|
||
|
'DEBUG': (logging.DEBUG, 3),
|
||
|
'INFO': (logging.INFO, 2),
|
||
|
'WARNING': (logging.WARNING, 1),
|
||
|
'ERROR': (logging.ERROR, 0)
|
||
|
}
|
||
|
|
||
|
|
||
|
def init_logs(logger_name=''):
|
||
|
global logger
|
||
|
|
||
|
log_level = os.environ.get('LOG_LEVEL', 'ERROR')
|
||
|
log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR
|
||
|
|
||
|
console = logging.StreamHandler(sys.stdout)
|
||
|
console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s'))
|
||
|
|
||
|
logger = logging.getLogger(logger_name)
|
||
|
logger.setLevel(log_level)
|
||
|
logger.addHandler(console)
|
||
|
|
||
|
def set_verbose(verbose: int):
|
||
|
levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() }
|
||
|
if verbose > len(levels):
|
||
|
verbose = len(levels)
|
||
|
if verbose <= 0:
|
||
|
verbose = 0
|
||
|
logger.setLevel(levels[verbose])
|
||
|
|
||
|
|
||
|
def parse_args():
|
||
|
parser = argparse.ArgumentParser(allow_abbrev=False,
|
||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
|
description=__doc__)
|
||
|
|
||
|
parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+',
|
||
|
help="Directories with the memory footprint data to convert. "
|
||
|
"Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.")
|
||
|
|
||
|
parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True,
|
||
|
help="An execution plan (CSV file) with details of what footprint applications "
|
||
|
"and platforms were chosen to generate the input data. "
|
||
|
"It is also applied to filter input directories and check their names.")
|
||
|
|
||
|
parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False,
|
||
|
default=RESULT_FILENAME,
|
||
|
help="Destination JSON file name to create at each of INPUT_PATHS. "
|
||
|
"Default: '%(default)s'")
|
||
|
|
||
|
parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False,
|
||
|
default = os.environ.get('ZEPHYR_BASE'),
|
||
|
help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. "
|
||
|
"The script needs Zephyr repository there to read SHA and commit time of builds. "
|
||
|
"Current default: '%(default)s'")
|
||
|
|
||
|
parser.add_argument("--test-name",
|
||
|
choices=['application/suite_name', 'suite_name', 'application', 'name.feature'],
|
||
|
default='name.feature',
|
||
|
help="How to compose Twister test instance names using plan.txt columns. "
|
||
|
"Default: '%(default)s'" )
|
||
|
|
||
|
parser.add_argument("--no-testsuite-check",
|
||
|
dest='testsuite_check', action="store_false",
|
||
|
help="Don't check for applications' testsuite configs in ZEPHYR_BASE.")
|
||
|
|
||
|
parser.add_argument('-v', '--verbose', required=False, action='count', default=0,
|
||
|
help="Increase the logging level for each occurrence. Default level: ERROR")
|
||
|
|
||
|
return parser.parse_args()
|
||
|
|
||
|
|
||
|
def read_plan(fname: str) -> list[dict]:
|
||
|
plan = []
|
||
|
with open(fname) as plan_file:
|
||
|
plan_rows = csv.reader(plan_file)
|
||
|
plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ]
|
||
|
plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals }
|
||
|
return plan
|
||
|
|
||
|
|
||
|
def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS):
|
||
|
data_id = {}
|
||
|
(in_path, data_id['board']) = os.path.split(in_path)
|
||
|
if not data_id['board']:
|
||
|
# trailing '/'
|
||
|
(in_path, data_id['board']) = os.path.split(in_path)
|
||
|
|
||
|
for _ in range(max_levels):
|
||
|
(in_path, data_id['feature']) = os.path.split(in_path)
|
||
|
(c_head, data_id['app']) = os.path.split(in_path)
|
||
|
(c_head, data_id['version']) = os.path.split(c_head)
|
||
|
if not all(data_id.values()):
|
||
|
# incorrect plan id
|
||
|
return None
|
||
|
if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan:
|
||
|
return data_id
|
||
|
else:
|
||
|
# try with HWMv2 board name one more level deep
|
||
|
data_id['board'] = f"{data_id['feature']}/{data_id['board']}"
|
||
|
|
||
|
# not found
|
||
|
return {}
|
||
|
|
||
|
|
||
|
def main():
|
||
|
errors = 0
|
||
|
converted = 0
|
||
|
skipped = 0
|
||
|
filtered = 0
|
||
|
|
||
|
run_date = datetime.now(timezone.utc).isoformat(timespec='seconds')
|
||
|
|
||
|
init_logs()
|
||
|
|
||
|
args = parse_args()
|
||
|
|
||
|
set_verbose(args.verbose)
|
||
|
|
||
|
if not args.zephyr_base:
|
||
|
logging.error("ZEPHYR_BASE is not defined.")
|
||
|
sys.exit(1)
|
||
|
|
||
|
zephyr_base = os.path.abspath(args.zephyr_base)
|
||
|
zephyr_base_repo = Repo(zephyr_base)
|
||
|
|
||
|
logging.info(f"scanning {len(args.input_paths)} directories ...")
|
||
|
|
||
|
logging.info(f"use plan '{args.plan}'")
|
||
|
plan = read_plan(args.plan)
|
||
|
|
||
|
test_name_sep = '/' if '/' in args.test_name else '.'
|
||
|
test_name_parts = args.test_name.split(test_name_sep)
|
||
|
|
||
|
for report_path in args.input_paths:
|
||
|
logging.info(f"convert {report_path}")
|
||
|
# print(p)
|
||
|
p_head = os.path.normcase(report_path)
|
||
|
p_head = os.path.normpath(p_head)
|
||
|
if not os.path.isdir(p_head):
|
||
|
logging.error(f"not a directory '{p_head}'")
|
||
|
errors += 1
|
||
|
continue
|
||
|
|
||
|
data_id = get_id_from_path(plan, p_head)
|
||
|
if data_id is None:
|
||
|
logging.warning(f"skipped '{report_path}' - not a correct report directory")
|
||
|
skipped += 1
|
||
|
continue
|
||
|
elif not data_id:
|
||
|
logging.info(f"filtered '{report_path}' - not in the plan")
|
||
|
filtered += 1
|
||
|
continue
|
||
|
|
||
|
r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}"
|
||
|
|
||
|
if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]:
|
||
|
logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.")
|
||
|
filtered += 1
|
||
|
continue
|
||
|
|
||
|
suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts])
|
||
|
|
||
|
# Just some sanity checks of the 'application' in the current ZEPHYR_BASE
|
||
|
if args.testsuite_check:
|
||
|
suite_type = plan[r_plan]['application'].split('/')
|
||
|
if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME:
|
||
|
suite_conf_name = TESTSUITE_FILENAME[suite_type[0]]
|
||
|
else:
|
||
|
logging.error(f"unknown app type to get configuration in '{report_path}'")
|
||
|
errors += 1
|
||
|
continue
|
||
|
|
||
|
suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name)
|
||
|
if not os.path.isfile(suite_conf_fname):
|
||
|
logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'")
|
||
|
errors += 1
|
||
|
continue
|
||
|
|
||
|
|
||
|
# Check SHA presence in the current ZEPHYR_BASE
|
||
|
sha_match = VERSION_COMMIT_RE.search(data_id['version'])
|
||
|
version_sha = sha_match.group(1) if sha_match else data_id['version']
|
||
|
try:
|
||
|
git_commit = zephyr_base_repo.commit(version_sha)
|
||
|
except BadName:
|
||
|
logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'")
|
||
|
errors += 1
|
||
|
continue
|
||
|
|
||
|
|
||
|
# Compose twister_footprint.json record - each application (test suite) will have its own
|
||
|
# simplified header with options, SHA, etc.
|
||
|
|
||
|
res = {}
|
||
|
|
||
|
res['environment'] = {
|
||
|
'zephyr_version': data_id['version'],
|
||
|
'commit_date':
|
||
|
git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'),
|
||
|
'run_date': run_date,
|
||
|
'options': {
|
||
|
'testsuite_root': [ plan[r_plan]['application'] ],
|
||
|
'build_only': True,
|
||
|
'create_rom_ram_report': True,
|
||
|
'footprint_report': 'all',
|
||
|
'platform': [ plan[r_plan]['board'] ]
|
||
|
}
|
||
|
}
|
||
|
|
||
|
test_suite = {
|
||
|
'name': suite_name,
|
||
|
'arch': None,
|
||
|
'platform': plan[r_plan]['board'],
|
||
|
'status': 'passed',
|
||
|
'footprint': {}
|
||
|
}
|
||
|
|
||
|
for k,v in FOOTPRINT_FILES.items():
|
||
|
footprint_fname = os.path.join(report_path, v)
|
||
|
try:
|
||
|
with open(footprint_fname, "rt") as footprint_json:
|
||
|
logger.debug(f"reading {footprint_fname}")
|
||
|
test_suite['footprint'][k] = json.load(footprint_json)
|
||
|
except FileNotFoundError:
|
||
|
logger.warning(f"{report_path} missing {v}")
|
||
|
|
||
|
res['testsuites'] = [test_suite]
|
||
|
|
||
|
report_fname = os.path.join(report_path, args.output_fname)
|
||
|
with open(report_fname, "wt") as json_file:
|
||
|
logger.debug(f"writing {report_fname}")
|
||
|
json.dump(res, json_file, indent=4, separators=(',',':'))
|
||
|
|
||
|
converted += 1
|
||
|
|
||
|
logging.info(f'found={len(args.input_paths)}, converted={converted}, '
|
||
|
f'skipped={skipped}, filtered={filtered}, errors={errors}')
|
||
|
sys.exit(errors != 0)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|