diff --git a/scripts/ci/upload_test_results_es.py b/scripts/ci/upload_test_results_es.py index c1f53c66381..8e1315cf070 100755 --- a/scripts/ci/upload_test_results_es.py +++ b/scripts/ci/upload_test_results_es.py @@ -1,20 +1,179 @@ #!/usr/bin/env python3 -# Copyright (c) 2022 Intel Corporation +# Copyright (c) 2022-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +""" +This script uploads ``twister.json`` file to Elasticsearch index for reporting and analysis. +see https://kibana.zephyrproject.io/ -# This script upload test ci results to the zephyr ES instance for reporting and analysis. -# see https://kibana.zephyrproject.io/ +The script expects two evironment variables with the Elasticsearch server connection parameters: + `ELASTICSEARCH_SERVER` + `ELASTICSEARCH_KEY` +""" from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk +from elasticsearch.helpers import bulk, BulkIndexError import sys import os import json import argparse +import re -def gendata(f, index, run_date=None, run_id=None, run_attempt=None): + +def flatten(name, value, name_sep="_", names_dict=None, parent_name=None, escape_sep=""): + """ + Flatten ``value`` into a plain dictionary. + + :param name: the flattened name of the ``value`` to be used as a name prefix for all its items. + :param name_sep: string to separate flattened names; if the same string is already present + in the names it will be repeated twise. + :param names_dict: An optional dictionary with 'foo':'bar' items to flatten 'foo' list properties + where each item should be a dictionary with the 'bar' item storing an unique + name, so it will be taken as a part of the flattened item's name instead of + the item's index in its parent list. + :param parent_name: the short, single-level, name of the ``value``. + :param value: object to flatten, for example, a dictionary: + { + "ROM":{ + "symbols":{ + "name":"Root", + "size":4320, + "identifier":"root", + "address":0, + "children":[ + { + "name":"(no paths)", + "size":2222, + "identifier":":", + "address":0, + "children":[ + { + "name":"var1", + "size":20, + "identifier":":/var1", + "address":1234 + }, ... + ] + } ... + ] + } + } ... + } + + :return: the ``value`` flattened to a plain dictionary where each key is concatenated from + names of its initially nested items being separated by the ``name_sep``, + for the above example: + { + "ROM/symbols/name": "Root", + "ROM/symbols/size": 4320, + "ROM/symbols/identifier": "root", + "ROM/symbols/address": 0, + "ROM/symbols/(no paths)/size": 2222, + "ROM/symbols/(no paths)/identifier": ":", + "ROM/symbols/(no paths)/address": 0, + "ROM/symbols/(no paths)/var1/size": 20, + "ROM/symbols/(no paths)/var1/identifier": ":/var1", + "ROM/symbols/(no paths)/var1/address": 1234, + } + """ + res_dict = {} + name_prefix = name + name_sep if name and len(name) else '' + if isinstance(value, list) and len(value): + for idx,val in enumerate(value): + if isinstance(val, dict) and names_dict and parent_name and isinstance(names_dict, dict) and parent_name in names_dict: + flat_name = name_prefix + str(val[names_dict[parent_name]]).replace(name_sep, escape_sep + name_sep) + val_ = val.copy() + val_.pop(names_dict[parent_name]) + flat_item = flatten(flat_name, val_, name_sep, names_dict, parent_name, escape_sep) + else: + flat_name = name_prefix + str(idx) + flat_item = flatten(flat_name, val, name_sep, names_dict, parent_name, escape_sep) + res_dict = { **res_dict, **flat_item } + elif isinstance(value, dict) and len(value): + for key,val in value.items(): + if names_dict and key in names_dict: + name_k = name + else: + name_k = name_prefix + str(key).replace(name_sep, escape_sep + name_sep) + flat_item = flatten(name_k, val, name_sep, names_dict, key, escape_sep) + res_dict = { **res_dict, **flat_item } + elif len(name): + res_dict[name] = value + return res_dict + +def unflatten(src_dict, name_sep): + """ + Unflat ``src_dict`` at its deepest level splitting keys with ``name_sep`` + and using the rightmost chunk to name properties. + + :param src_dict: a dictionary to unflat for example: + { + "ROM/symbols/name": "Root", + "ROM/symbols/size": 4320, + "ROM/symbols/identifier": "root", + "ROM/symbols/address": 0, + "ROM/symbols/(no paths)/size": 2222, + "ROM/symbols/(no paths)/identifier": ":", + "ROM/symbols/(no paths)/address": 0, + "ROM/symbols/(no paths)/var1/size": 20, + "ROM/symbols/(no paths)/var1/identifier": ":/var1", + "ROM/symbols/(no paths)/var1/address": 1234, + } + + :param name_sep: string to split the dictionary keys. + :return: the unflatten dictionary, for the above example: + { + "ROM/symbols": { + "name": "Root", + "size": 4320, + "identifier": "root", + "address": 0 + }, + "ROM/symbols/(no paths)": { + "size": 2222, + "identifier": ":", + "address": 0 + }, + "ROM/symbols/(no paths)/var1": { + "size": 20, + "identifier": ":/var1", + "address": 1234 + } + } + """ + res_dict = {} + for k,v in src_dict.items(): + k_pref, _, k_suff = k.rpartition(name_sep) + if not k_pref in res_dict: + res_dict[k_pref] = {k_suff: v} + else: + if k_suff in res_dict[k_pref]: + if not isinstance(res_dict[k_pref][k_suff], list): + res_dict[k_pref][k_suff] = [res_dict[k_pref][k_suff]] + res_dict[k_pref][k_suff].append(v) + else: + res_dict[k_pref][k_suff] = v + return res_dict + + +def transform(t, args): + if args.transform: + rules = json.loads(str(args.transform).replace("'", "\"").replace("\\", "\\\\")) + for property_name, rule in rules.items(): + if property_name in t: + match = re.match(rule, t[property_name]) + if match: + t.update(match.groupdict(default="")) + # + # + for excl_item in args.exclude: + if excl_item in t: + t.pop(excl_item) + + return t + +def gendata(f, args): with open(f, "r") as j: data = json.load(j) for t in data['testsuites']: @@ -23,34 +182,84 @@ def gendata(f, index, run_date=None, run_id=None, run_attempt=None): main_group = _grouping.split(".")[0] sub_group = _grouping.split(".")[1] env = data['environment'] - if run_date: - env['run_date'] = run_date - if run_id: - env['run_id'] = run_id - if run_attempt: - env['run_attempt'] = run_attempt + if args.run_date: + env['run_date'] = args.run_date + if args.run_id: + env['run_id'] = args.run_id + if args.run_attempt: + env['run_attempt'] = args.run_attempt + if args.run_branch: + env['run_branch'] = args.run_branch + if args.run_workflow: + env['run_workflow'] = args.run_workflow t['environment'] = env t['component'] = main_group t['sub_component'] = sub_group - yield { - "_index": index, - "_source": t + + yield_records = 0 + # If the flattered property is a dictionary, convert it to a plain list + # where each item is a flat dictionaly. + if args.flatten and args.flatten in t and isinstance(t[args.flatten], dict): + flat = t.pop(args.flatten) + flat_list_dict = {} + if args.flatten_list_names: + flat_list_dict = json.loads(str(args.flatten_list_names).replace("'", "\"").replace("\\", "\\\\")) + # + # Normalize flattening to a plain dictionary. + flat = flatten('', flat, args.transpose_separator, flat_list_dict, str(args.escape_separator)) + # Unflat one, the deepest level, expecting similar set of property names there. + flat = unflatten(flat, args.transpose_separator) + # Keep dictionary names as their properties and flatten the dictionary to a list of dictionaries. + as_name = args.flatten_dict_name + if len(as_name): + flat_list = [] + for k,v in flat.items(): + v[as_name] = k + args.transpose_separator + v[as_name] if as_name in v else k + v[as_name + '_depth'] = v[as_name].count(args.transpose_separator) + flat_list.append(v) + t[args.flatten] = flat_list + else: + t[args.flatten] = flat + + # Flatten lists or dictionaries cloning the records with the rest of their items and + # rename them composing the flattened property name with the item's name or index respectively. + if args.flatten and args.flatten in t and isinstance(t[args.flatten], list): + flat = t.pop(args.flatten) + for flat_item in flat: + t_clone = t.copy() + if isinstance(flat_item, dict): + t_clone.update({ args.flatten + args.flatten_separator + k : v for k,v in flat_item.items() }) + elif isinstance(flat_item, list): + t_clone.update({ args.flatten + args.flatten_separator + str(idx) : v for idx,v in enumerate(flat_item) }) + yield { + "_index": args.index, + "_source": transform(t_clone, args) } + yield_records += 1 + + if not yield_records: # also yields a record without an empty flat object. + yield { + "_index": args.index, + "_source": transform(t, args) + } + def main(): args = parse_args() - if args.index: - index_name = args.index - else: - index_name = 'tests-zephyr-1' - settings = { "index": { "number_of_shards": 4 } } - mappings = { + + mappings = {} + + if args.map_file: + with open(args.map_file, "rt") as json_map: + mappings = json.load(json_map) + else: + mappings = { "properties": { "execution_time": {"type": "float"}, "retries": {"type": "integer"}, @@ -61,9 +270,9 @@ def main(): if args.dry_run: xx = None for f in args.files: - xx = gendata(f, index_name, args.run_date, args.run_id, args.run_attempt) - for x in xx: - print(x) + xx = gendata(f, args) + for x in xx: + print(json.dumps(x, indent=4)) sys.exit(0) es = Elasticsearch( @@ -73,24 +282,84 @@ def main(): ) if args.create_index: - es.indices.create(index=index_name, mappings=mappings, settings=settings) + es.indices.create(index=args.index, mappings=mappings, settings=settings) else: if args.run_date: print(f"Setting run date from command line: {args.run_date}") - for f in args.files: - bulk(es, gendata(f, index_name, args.run_date, args.run_id, args.run_attempt)) + for f in args.files: + print(f"Process: '{f}'") + try: + bulk(es, gendata(f, args), request_timeout=args.bulk_timeout) + except BulkIndexError as e: + print(f"ERROR adding '{f}' exception: {e}") + error_0 = e.errors[0].get("index", {}).get("error", {}) + reason_0 = error_0.get('reason') + print(f"ERROR reason: {reason_0}") + raise e + # + # +# def parse_args(): - parser = argparse.ArgumentParser(allow_abbrev=False) + parser = argparse.ArgumentParser(allow_abbrev=False, + formatter_class=argparse.RawTextHelpFormatter, + description=__doc__) parser.add_argument('-y','--dry-run', action="store_true", help='Dry run.') parser.add_argument('-c','--create-index', action="store_true", help='Create index.') - parser.add_argument('-i', '--index', help='index to push to.', required=True) + parser.add_argument('-m', '--map-file', required=False, + help='JSON map file with Elasticsearch index structure and data types.') + parser.add_argument('-i', '--index', required=True, default='tests-zephyr-1', + help='Elasticsearch index to push to.') parser.add_argument('-r', '--run-date', help='Run date in ISO format', required=False) + parser.add_argument('--flatten', required=False, default=None, + metavar='TESTSUITE_PROPERTY', + help="Flatten one of the test suite's properties:\n" + "it will be converted to a list where each list item becomes a separate index record\n" + "with all other properties of the test suite object duplicated and the flattened\n" + "property name used as a prefix for all its items, e.g.\n" + "'recording.cycles' becomes 'recording_cycles'.") + parser.add_argument('--flatten-dict-name', required=False, default="name", + metavar='PROPERTY_NAME', + help="For dictionaries flattened into a list, use this name for additional property\n" + "to store the item's flat concatenated name. One more property with that name\n" + "and'_depth' suffix will be added for number of `--transpose_separator`s in the name.\n" + "Default: '%(default)s'. Set empty string to disable.") + parser.add_argument('--flatten-list-names', required=False, default=None, + metavar='DICT', + help="An optional string with json dictionary like {'children':'name', ...}\n" + "to use it for flattening lists of dictionaries named 'children' which should\n" + "contain keys 'name' with unique string value as an actual name for the item.\n" + "This name value will be composed instead of the container's name 'children' and\n" + "the item's numeric index.") + parser.add_argument('--flatten-separator', required=False, default="_", + help="Separator to use it for the flattened property names. Default: '%(default)s'") + parser.add_argument('--transpose-separator', required=False, default="/", + help="Separator to use it for the transposed dictionary names stored in\n" + "`flatten-dict-name` properties. Default: '%(default)s'") + parser.add_argument('--escape-separator', required=False, default='', + help="Prepend name separators with the escape string if already present in names. " + "Default: '%(default)s'.") + parser.add_argument('--transform', required=False, + metavar='RULE', + help="Apply regexp group parsing to selected string properties after flattening.\n" + "The string is a json dictionary with property names and regexp strings to apply\n" + "on them to extract values, for example:\n" + r"\"{ 'recording_metric': '(?P[^\.]+)\.(?P[^\.]+)\.' }\"") + parser.add_argument('--exclude', required=False, nargs='*', default=[], + metavar='TESTSUITE_PROPERTY', + help="Don't store these properties in the Elasticsearch index.") + parser.add_argument('--run-workflow', required=False, + help="Source workflow identificator, e.g. the workflow short name " + "and its triggering event name.") + parser.add_argument('--run-branch', required=False, + help="Source branch identificator.") parser.add_argument('--run-id', required=False, help="unique run-id (e.g. from github.run_id context)") parser.add_argument('--run-attempt', required=False, help="unique run attempt number (e.g. from github.run_attempt context)") + parser.add_argument('--bulk-timeout', required=False, type=int, default=60, + help="Elasticsearch bulk request timeout, seconds. Default %(default)s.") parser.add_argument('files', metavar='FILE', nargs='+', help='file with test data.') args = parser.parse_args()