ci: elasticsearch: Upload script improvements

Multiple improvements of the `upload_test_results_es.py` script:

 * JSON objects flattening.

   This feature allows `twister.json` file preprocessing to simplify
   its Elasticsearch index structure for complex hierarhical objects,
   for example with memory footprint, or code coverage data.

   A new command line option `--flatten` is added to change testsuite data
   structure in regard of one of its list objects: either `testcases` or
   `recording`, so each item there becomes an independent data record
   inheriting all other testsuite properties, whereas the children
   object's properties are renamed with the parent object's name
   as a prefix: 'testcases_' or 'recording_' respectively.
   Only one testsuite property can be flattened this way per index upload.
   Other children objects will be treated accorging to the index structure.

   Related new command line options (with help text explanations):
    `--flatten-dict-name`,
    `--flatten-list-names`,
    `--flatten-separator`,
    `--transpose-separator`,
    `--escape-separator`

 * A new command line option `--transform` is added to allow regexp group
   parsing in string propertites extracting additional derived properties.

 * A new command line option `--exclude` is added to exclude testsuite
   properties not needed to store at Elasticsearch index.

 * Branch name `--run-branch` and Workflow ID `--run-workflow` command
   line options as additional key fields to allow data from different
   branches, workflows and triggering events in the same index.

 * A new command line option `--map-file` is added to apply
   an explicit index structure to the `twister.json` input data.

 * Add bulk operation timeout parameter for heavy/long uploads.

Other changes:
  * batch upload error handling and logging;
  * inline documentation improvements;
  * some corner case fixes on empty objects.

Signed-off-by: Dmitrii Golovanov <dmitrii.golovanov@intel.com>
This commit is contained in:
Dmitrii Golovanov 2024-08-09 14:28:30 +02:00 committed by Johan Hedberg
parent fa0bbad205
commit d9f567051e
1 changed files with 297 additions and 28 deletions

View File

@ -1,20 +1,179 @@
#!/usr/bin/env python3
# Copyright (c) 2022 Intel Corporation
# Copyright (c) 2022-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""
This script uploads ``twister.json`` file to Elasticsearch index for reporting and analysis.
see https://kibana.zephyrproject.io/
# This script upload test ci results to the zephyr ES instance for reporting and analysis.
# see https://kibana.zephyrproject.io/
The script expects two evironment variables with the Elasticsearch server connection parameters:
`ELASTICSEARCH_SERVER`
`ELASTICSEARCH_KEY`
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, BulkIndexError
import sys
import os
import json
import argparse
import re
def gendata(f, index, run_date=None, run_id=None, run_attempt=None):
def flatten(name, value, name_sep="_", names_dict=None, parent_name=None, escape_sep=""):
"""
Flatten ``value`` into a plain dictionary.
:param name: the flattened name of the ``value`` to be used as a name prefix for all its items.
:param name_sep: string to separate flattened names; if the same string is already present
in the names it will be repeated twise.
:param names_dict: An optional dictionary with 'foo':'bar' items to flatten 'foo' list properties
where each item should be a dictionary with the 'bar' item storing an unique
name, so it will be taken as a part of the flattened item's name instead of
the item's index in its parent list.
:param parent_name: the short, single-level, name of the ``value``.
:param value: object to flatten, for example, a dictionary:
{
"ROM":{
"symbols":{
"name":"Root",
"size":4320,
"identifier":"root",
"address":0,
"children":[
{
"name":"(no paths)",
"size":2222,
"identifier":":",
"address":0,
"children":[
{
"name":"var1",
"size":20,
"identifier":":/var1",
"address":1234
}, ...
]
} ...
]
}
} ...
}
:return: the ``value`` flattened to a plain dictionary where each key is concatenated from
names of its initially nested items being separated by the ``name_sep``,
for the above example:
{
"ROM/symbols/name": "Root",
"ROM/symbols/size": 4320,
"ROM/symbols/identifier": "root",
"ROM/symbols/address": 0,
"ROM/symbols/(no paths)/size": 2222,
"ROM/symbols/(no paths)/identifier": ":",
"ROM/symbols/(no paths)/address": 0,
"ROM/symbols/(no paths)/var1/size": 20,
"ROM/symbols/(no paths)/var1/identifier": ":/var1",
"ROM/symbols/(no paths)/var1/address": 1234,
}
"""
res_dict = {}
name_prefix = name + name_sep if name and len(name) else ''
if isinstance(value, list) and len(value):
for idx,val in enumerate(value):
if isinstance(val, dict) and names_dict and parent_name and isinstance(names_dict, dict) and parent_name in names_dict:
flat_name = name_prefix + str(val[names_dict[parent_name]]).replace(name_sep, escape_sep + name_sep)
val_ = val.copy()
val_.pop(names_dict[parent_name])
flat_item = flatten(flat_name, val_, name_sep, names_dict, parent_name, escape_sep)
else:
flat_name = name_prefix + str(idx)
flat_item = flatten(flat_name, val, name_sep, names_dict, parent_name, escape_sep)
res_dict = { **res_dict, **flat_item }
elif isinstance(value, dict) and len(value):
for key,val in value.items():
if names_dict and key in names_dict:
name_k = name
else:
name_k = name_prefix + str(key).replace(name_sep, escape_sep + name_sep)
flat_item = flatten(name_k, val, name_sep, names_dict, key, escape_sep)
res_dict = { **res_dict, **flat_item }
elif len(name):
res_dict[name] = value
return res_dict
def unflatten(src_dict, name_sep):
"""
Unflat ``src_dict`` at its deepest level splitting keys with ``name_sep``
and using the rightmost chunk to name properties.
:param src_dict: a dictionary to unflat for example:
{
"ROM/symbols/name": "Root",
"ROM/symbols/size": 4320,
"ROM/symbols/identifier": "root",
"ROM/symbols/address": 0,
"ROM/symbols/(no paths)/size": 2222,
"ROM/symbols/(no paths)/identifier": ":",
"ROM/symbols/(no paths)/address": 0,
"ROM/symbols/(no paths)/var1/size": 20,
"ROM/symbols/(no paths)/var1/identifier": ":/var1",
"ROM/symbols/(no paths)/var1/address": 1234,
}
:param name_sep: string to split the dictionary keys.
:return: the unflatten dictionary, for the above example:
{
"ROM/symbols": {
"name": "Root",
"size": 4320,
"identifier": "root",
"address": 0
},
"ROM/symbols/(no paths)": {
"size": 2222,
"identifier": ":",
"address": 0
},
"ROM/symbols/(no paths)/var1": {
"size": 20,
"identifier": ":/var1",
"address": 1234
}
}
"""
res_dict = {}
for k,v in src_dict.items():
k_pref, _, k_suff = k.rpartition(name_sep)
if not k_pref in res_dict:
res_dict[k_pref] = {k_suff: v}
else:
if k_suff in res_dict[k_pref]:
if not isinstance(res_dict[k_pref][k_suff], list):
res_dict[k_pref][k_suff] = [res_dict[k_pref][k_suff]]
res_dict[k_pref][k_suff].append(v)
else:
res_dict[k_pref][k_suff] = v
return res_dict
def transform(t, args):
if args.transform:
rules = json.loads(str(args.transform).replace("'", "\"").replace("\\", "\\\\"))
for property_name, rule in rules.items():
if property_name in t:
match = re.match(rule, t[property_name])
if match:
t.update(match.groupdict(default=""))
#
#
for excl_item in args.exclude:
if excl_item in t:
t.pop(excl_item)
return t
def gendata(f, args):
with open(f, "r") as j:
data = json.load(j)
for t in data['testsuites']:
@ -23,34 +182,84 @@ def gendata(f, index, run_date=None, run_id=None, run_attempt=None):
main_group = _grouping.split(".")[0]
sub_group = _grouping.split(".")[1]
env = data['environment']
if run_date:
env['run_date'] = run_date
if run_id:
env['run_id'] = run_id
if run_attempt:
env['run_attempt'] = run_attempt
if args.run_date:
env['run_date'] = args.run_date
if args.run_id:
env['run_id'] = args.run_id
if args.run_attempt:
env['run_attempt'] = args.run_attempt
if args.run_branch:
env['run_branch'] = args.run_branch
if args.run_workflow:
env['run_workflow'] = args.run_workflow
t['environment'] = env
t['component'] = main_group
t['sub_component'] = sub_group
yield {
"_index": index,
"_source": t
yield_records = 0
# If the flattered property is a dictionary, convert it to a plain list
# where each item is a flat dictionaly.
if args.flatten and args.flatten in t and isinstance(t[args.flatten], dict):
flat = t.pop(args.flatten)
flat_list_dict = {}
if args.flatten_list_names:
flat_list_dict = json.loads(str(args.flatten_list_names).replace("'", "\"").replace("\\", "\\\\"))
#
# Normalize flattening to a plain dictionary.
flat = flatten('', flat, args.transpose_separator, flat_list_dict, str(args.escape_separator))
# Unflat one, the deepest level, expecting similar set of property names there.
flat = unflatten(flat, args.transpose_separator)
# Keep dictionary names as their properties and flatten the dictionary to a list of dictionaries.
as_name = args.flatten_dict_name
if len(as_name):
flat_list = []
for k,v in flat.items():
v[as_name] = k + args.transpose_separator + v[as_name] if as_name in v else k
v[as_name + '_depth'] = v[as_name].count(args.transpose_separator)
flat_list.append(v)
t[args.flatten] = flat_list
else:
t[args.flatten] = flat
# Flatten lists or dictionaries cloning the records with the rest of their items and
# rename them composing the flattened property name with the item's name or index respectively.
if args.flatten and args.flatten in t and isinstance(t[args.flatten], list):
flat = t.pop(args.flatten)
for flat_item in flat:
t_clone = t.copy()
if isinstance(flat_item, dict):
t_clone.update({ args.flatten + args.flatten_separator + k : v for k,v in flat_item.items() })
elif isinstance(flat_item, list):
t_clone.update({ args.flatten + args.flatten_separator + str(idx) : v for idx,v in enumerate(flat_item) })
yield {
"_index": args.index,
"_source": transform(t_clone, args)
}
yield_records += 1
if not yield_records: # also yields a record without an empty flat object.
yield {
"_index": args.index,
"_source": transform(t, args)
}
def main():
args = parse_args()
if args.index:
index_name = args.index
else:
index_name = 'tests-zephyr-1'
settings = {
"index": {
"number_of_shards": 4
}
}
mappings = {
mappings = {}
if args.map_file:
with open(args.map_file, "rt") as json_map:
mappings = json.load(json_map)
else:
mappings = {
"properties": {
"execution_time": {"type": "float"},
"retries": {"type": "integer"},
@ -61,9 +270,9 @@ def main():
if args.dry_run:
xx = None
for f in args.files:
xx = gendata(f, index_name, args.run_date, args.run_id, args.run_attempt)
for x in xx:
print(x)
xx = gendata(f, args)
for x in xx:
print(json.dumps(x, indent=4))
sys.exit(0)
es = Elasticsearch(
@ -73,24 +282,84 @@ def main():
)
if args.create_index:
es.indices.create(index=index_name, mappings=mappings, settings=settings)
es.indices.create(index=args.index, mappings=mappings, settings=settings)
else:
if args.run_date:
print(f"Setting run date from command line: {args.run_date}")
for f in args.files:
bulk(es, gendata(f, index_name, args.run_date, args.run_id, args.run_attempt))
for f in args.files:
print(f"Process: '{f}'")
try:
bulk(es, gendata(f, args), request_timeout=args.bulk_timeout)
except BulkIndexError as e:
print(f"ERROR adding '{f}' exception: {e}")
error_0 = e.errors[0].get("index", {}).get("error", {})
reason_0 = error_0.get('reason')
print(f"ERROR reason: {reason_0}")
raise e
#
#
#
def parse_args():
parser = argparse.ArgumentParser(allow_abbrev=False)
parser = argparse.ArgumentParser(allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
description=__doc__)
parser.add_argument('-y','--dry-run', action="store_true", help='Dry run.')
parser.add_argument('-c','--create-index', action="store_true", help='Create index.')
parser.add_argument('-i', '--index', help='index to push to.', required=True)
parser.add_argument('-m', '--map-file', required=False,
help='JSON map file with Elasticsearch index structure and data types.')
parser.add_argument('-i', '--index', required=True, default='tests-zephyr-1',
help='Elasticsearch index to push to.')
parser.add_argument('-r', '--run-date', help='Run date in ISO format', required=False)
parser.add_argument('--flatten', required=False, default=None,
metavar='TESTSUITE_PROPERTY',
help="Flatten one of the test suite's properties:\n"
"it will be converted to a list where each list item becomes a separate index record\n"
"with all other properties of the test suite object duplicated and the flattened\n"
"property name used as a prefix for all its items, e.g.\n"
"'recording.cycles' becomes 'recording_cycles'.")
parser.add_argument('--flatten-dict-name', required=False, default="name",
metavar='PROPERTY_NAME',
help="For dictionaries flattened into a list, use this name for additional property\n"
"to store the item's flat concatenated name. One more property with that name\n"
"and'_depth' suffix will be added for number of `--transpose_separator`s in the name.\n"
"Default: '%(default)s'. Set empty string to disable.")
parser.add_argument('--flatten-list-names', required=False, default=None,
metavar='DICT',
help="An optional string with json dictionary like {'children':'name', ...}\n"
"to use it for flattening lists of dictionaries named 'children' which should\n"
"contain keys 'name' with unique string value as an actual name for the item.\n"
"This name value will be composed instead of the container's name 'children' and\n"
"the item's numeric index.")
parser.add_argument('--flatten-separator', required=False, default="_",
help="Separator to use it for the flattened property names. Default: '%(default)s'")
parser.add_argument('--transpose-separator', required=False, default="/",
help="Separator to use it for the transposed dictionary names stored in\n"
"`flatten-dict-name` properties. Default: '%(default)s'")
parser.add_argument('--escape-separator', required=False, default='',
help="Prepend name separators with the escape string if already present in names. "
"Default: '%(default)s'.")
parser.add_argument('--transform', required=False,
metavar='RULE',
help="Apply regexp group parsing to selected string properties after flattening.\n"
"The string is a json dictionary with property names and regexp strings to apply\n"
"on them to extract values, for example:\n"
r"\"{ 'recording_metric': '(?P<object>[^\.]+)\.(?P<action>[^\.]+)\.' }\"")
parser.add_argument('--exclude', required=False, nargs='*', default=[],
metavar='TESTSUITE_PROPERTY',
help="Don't store these properties in the Elasticsearch index.")
parser.add_argument('--run-workflow', required=False,
help="Source workflow identificator, e.g. the workflow short name "
"and its triggering event name.")
parser.add_argument('--run-branch', required=False,
help="Source branch identificator.")
parser.add_argument('--run-id', required=False,
help="unique run-id (e.g. from github.run_id context)")
parser.add_argument('--run-attempt', required=False,
help="unique run attempt number (e.g. from github.run_attempt context)")
parser.add_argument('--bulk-timeout', required=False, type=int, default=60,
help="Elasticsearch bulk request timeout, seconds. Default %(default)s.")
parser.add_argument('files', metavar='FILE', nargs='+', help='file with test data.')
args = parser.parse_args()