#!/usr/bin/env python3 # # Copyright (c) 2024 Intel Corporation # # SPDX-License-Identifier: Apache-2.0 """ This script converts memory footprint data prepared by `./footprint/scripts/track.py` into a JSON files compatible with Twister report schema making them ready for upload to the same ElasticSearch data storage together with other Twister reports for analysis, visualization, etc. The memory footprint input data files (rom.json, ram.json) are expected in directories sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s). The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT' with the corresponding sub-directories. For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3` release commits collected for `frdm_k64f` board. Note: for the above example to work the bash recursive globbing should be active: `shopt -s globstar`. The output `twister_footprint.json` files will be placed into the same directories as the corresponding input files. In Twister report a test instance has either long or short name, each needs test suite name from the test configuration yaml file. This scripts has `--test-name` parameter to customize how to compose test names from the plan.txt columns including an additional (last) one whth explicit test suite name ('dot separated' format). """ from __future__ import annotations from datetime import datetime, timezone import argparse import os import sys import re import csv import logging import json from git import Repo from git.exc import BadName VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$") PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name'] TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' } FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' } RESULT_FILENAME = 'twister_footprint.json' HWMv2_LEVELS = 3 logger = None LOG_LEVELS = { 'DEBUG': (logging.DEBUG, 3), 'INFO': (logging.INFO, 2), 'WARNING': (logging.WARNING, 1), 'ERROR': (logging.ERROR, 0) } def init_logs(logger_name=''): global logger log_level = os.environ.get('LOG_LEVEL', 'ERROR') log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR console = logging.StreamHandler(sys.stdout) console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s')) logger = logging.getLogger(logger_name) logger.setLevel(log_level) logger.addHandler(console) def set_verbose(verbose: int): levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() } if verbose > len(levels): verbose = len(levels) if verbose <= 0: verbose = 0 logger.setLevel(levels[verbose]) def parse_args(): parser = argparse.ArgumentParser(allow_abbrev=False, formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__) parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+', help="Directories with the memory footprint data to convert. " "Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.") parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True, help="An execution plan (CSV file) with details of what footprint applications " "and platforms were chosen to generate the input data. " "It is also applied to filter input directories and check their names.") parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False, default=RESULT_FILENAME, help="Destination JSON file name to create at each of INPUT_PATHS. " "Default: '%(default)s'") parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False, default = os.environ.get('ZEPHYR_BASE'), help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. " "The script needs Zephyr repository there to read SHA and commit time of builds. " "Current default: '%(default)s'") parser.add_argument("--test-name", choices=['application/suite_name', 'suite_name', 'application', 'name.feature'], default='name.feature', help="How to compose Twister test instance names using plan.txt columns. " "Default: '%(default)s'" ) parser.add_argument("--no-testsuite-check", dest='testsuite_check', action="store_false", help="Don't check for applications' testsuite configs in ZEPHYR_BASE.") parser.add_argument('-v', '--verbose', required=False, action='count', default=0, help="Increase the logging level for each occurrence. Default level: ERROR") return parser.parse_args() def read_plan(fname: str) -> list[dict]: plan = [] with open(fname) as plan_file: plan_rows = csv.reader(plan_file) plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ] plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals } return plan def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS): data_id = {} (in_path, data_id['board']) = os.path.split(in_path) if not data_id['board']: # trailing '/' (in_path, data_id['board']) = os.path.split(in_path) for _ in range(max_levels): (in_path, data_id['feature']) = os.path.split(in_path) (c_head, data_id['app']) = os.path.split(in_path) (c_head, data_id['version']) = os.path.split(c_head) if not all(data_id.values()): # incorrect plan id return None if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan: return data_id else: # try with HWMv2 board name one more level deep data_id['board'] = f"{data_id['feature']}/{data_id['board']}" # not found return {} def main(): errors = 0 converted = 0 skipped = 0 filtered = 0 run_date = datetime.now(timezone.utc).isoformat(timespec='seconds') init_logs() args = parse_args() set_verbose(args.verbose) if not args.zephyr_base: logging.error("ZEPHYR_BASE is not defined.") sys.exit(1) zephyr_base = os.path.abspath(args.zephyr_base) zephyr_base_repo = Repo(zephyr_base) logging.info(f"scanning {len(args.input_paths)} directories ...") logging.info(f"use plan '{args.plan}'") plan = read_plan(args.plan) test_name_sep = '/' if '/' in args.test_name else '.' test_name_parts = args.test_name.split(test_name_sep) for report_path in args.input_paths: logging.info(f"convert {report_path}") # print(p) p_head = os.path.normcase(report_path) p_head = os.path.normpath(p_head) if not os.path.isdir(p_head): logging.error(f"not a directory '{p_head}'") errors += 1 continue data_id = get_id_from_path(plan, p_head) if data_id is None: logging.warning(f"skipped '{report_path}' - not a correct report directory") skipped += 1 continue elif not data_id: logging.info(f"filtered '{report_path}' - not in the plan") filtered += 1 continue r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]: logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.") filtered += 1 continue suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts]) # Just some sanity checks of the 'application' in the current ZEPHYR_BASE if args.testsuite_check: suite_type = plan[r_plan]['application'].split('/') if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME: suite_conf_name = TESTSUITE_FILENAME[suite_type[0]] else: logging.error(f"unknown app type to get configuration in '{report_path}'") errors += 1 continue suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name) if not os.path.isfile(suite_conf_fname): logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'") errors += 1 continue # Check SHA presence in the current ZEPHYR_BASE sha_match = VERSION_COMMIT_RE.search(data_id['version']) version_sha = sha_match.group(1) if sha_match else data_id['version'] try: git_commit = zephyr_base_repo.commit(version_sha) except BadName: logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'") errors += 1 continue # Compose twister_footprint.json record - each application (test suite) will have its own # simplified header with options, SHA, etc. res = {} res['environment'] = { 'zephyr_version': data_id['version'], 'commit_date': git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'), 'run_date': run_date, 'options': { 'testsuite_root': [ plan[r_plan]['application'] ], 'build_only': True, 'create_rom_ram_report': True, 'footprint_report': 'all', 'platform': [ plan[r_plan]['board'] ] } } test_suite = { 'name': suite_name, 'arch': None, 'platform': plan[r_plan]['board'], 'status': 'passed', 'footprint': {} } for k,v in FOOTPRINT_FILES.items(): footprint_fname = os.path.join(report_path, v) try: with open(footprint_fname, "rt") as footprint_json: logger.debug(f"reading {footprint_fname}") test_suite['footprint'][k] = json.load(footprint_json) except FileNotFoundError: logger.warning(f"{report_path} missing {v}") res['testsuites'] = [test_suite] report_fname = os.path.join(report_path, args.output_fname) with open(report_fname, "wt") as json_file: logger.debug(f"writing {report_fname}") json.dump(res, json_file, indent=4, separators=(',',':')) converted += 1 logging.info(f'found={len(args.input_paths)}, converted={converted}, ' f'skipped={skipped}, filtered={filtered}, errors={errors}') sys.exit(errors != 0) if __name__ == '__main__': main()