#!/usr/bin/env python3 # # Copyright (c) 2019 Intel Corporation # # SPDX-License-Identifier: Apache-2.0 # # Author: Sathish Kuttan # This file defines a message class that contains functions to create # commands to the target and to parse responses from the target. import bitstruct class Message: """ Message class containing the methods to create command messages and parse response messages. """ message_id = {1: 'Control'} cmd_rsp = {2: 'Load Firmware', 4: 'Mode Select', 0x10: 'Memory Read', 0x11: 'Memory Write', 0x12: 'Memory Block Write', 0x13: 'Execute', 0x14: 'Wait', 0x20: 'Ready'} tx_data = None tx_bulk_data = None tx_index = 0 cmd_word_fmt = 'u1 u1 u1 u5 u16 u8' cmd_keys = ['cmd', 'rsvd1', 'rsp', 'msg_id', 'rsvd2', 'cmd_rsp'] def __init__(self): """ Intialize a byte array of 64 bytes for command messages Intialize another byte array of 4096 bytes for bulk messages """ self.tx_data = bytearray(64) self.tx_bulk_data = bytearray(4096) def init_tx_data(self): """ Intialize transmit message buffers to zeros """ for index in range(len(self.tx_data)): self.tx_data[index] = 0 self.tx_index = 0 @staticmethod def endian_swap(dst, dst_offset, src): """ Performs a byte swap of a 32-bit word to change it's endianness """ for index in range(0, len(src), 4): dst[dst_offset + index + 0] = src[index + 3] dst[dst_offset + index + 1] = src[index + 2] dst[dst_offset + index + 2] = src[index + 1] dst[dst_offset + index + 3] = src[index + 0] def print_cmd_message(self): """ Prints the contents of the command message buffer """ for index in range(0, self.tx_index, 4): offset = index * 8 word = bitstruct.unpack_from('u32', self.tx_data, offset) print('Index: %2d Content: 0x%08x' %(index, word[0])) def print_response(self, msg, verbose=False): """ Parses and prints the contents of the response message """ unpacked = bitstruct.unpack_from_dict(self.cmd_word_fmt, self.cmd_keys, msg) msg_id = unpacked['msg_id'] rsp = unpacked['cmd_rsp'] if msg_id == 0 and rsp == 0: print('RSP <<< NULL.') else: print('RSP <<< %s.' % self.cmd_rsp[rsp]) if verbose: count = bitstruct.unpack_from('u32', msg, 4 * 8)[0] count &= 0x1ff for index in range(0, 8 + (count * 4), 4): offset = index * 8 word = bitstruct.unpack_from('u32', msg, offset) print('Index: %2d Content: 0x%08x' %(index, word[0])) def get_cmd_code(self, cmd): """ Looks up the command and returns the numeric code """ index = list(self.cmd_rsp.values()).index(cmd) return list(self.cmd_rsp.keys())[index] def print_cmd_code(self, cmd): """ Prints the numeric code for the given command """ key = self.get_cmd_code(cmd) print('CMD >>> %s. Command Code: 0x%02x' % (cmd, key)) def create_null_cmd(self): """ Creates a NULL command """ print('CMD >>> NULL.') for index in range(len(self.tx_data)): self.tx_data[index] = 0 self.tx_index = len(self.tx_data) return self.tx_data def create_memwrite_cmd(self, tuple): """ Creates a memory write command with memory address and value pairs """ cmd = 'Memory Write' print('CMD >>> %s.' % cmd) code = self.get_cmd_code(cmd) self.init_tx_data() index = list(self.message_id.values()).index('Control') msg_id = list(self.message_id.keys())[index] bitstruct.pack_into_dict(self.cmd_word_fmt, self.cmd_keys, self.tx_data, 0, {'cmd': 1, 'rsvd1': 0, 'rsp': 0, 'msg_id': msg_id, 'rsvd2': 0, 'cmd_rsp': code}) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, len(tuple)) self.tx_index += 4 for elm in tuple: bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, elm) self.tx_index += 4 return self.tx_data def create_memread_cmd(self, tuple): """ Creates a memory read command with memory addresses """ cmd = 'Memory Read' print('CMD >>> %s.' % cmd) code = self.get_cmd_code(cmd) self.init_tx_data() index = list(self.message_id.values()).index('Control') msg_id = list(self.message_id.keys())[index] bitstruct.pack_into_dict(self.cmd_word_fmt, self.cmd_keys, self.tx_data, 0, {'cmd': 1, 'rsvd1': 0, 'rsp': 0, 'msg_id': msg_id, 'rsvd2': 0, 'cmd_rsp': code}) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, len(tuple)) self.tx_index += 4 for elm in tuple: bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, elm) self.tx_index += 4 return self.tx_data def create_loadfw_cmd(self, size, sha): """ Creates a command to load firmware with associated parameters """ cmd = 'Load Firmware' print('CMD >>> %s.' % cmd) code = self.get_cmd_code(cmd) FW_NO_EXEC_FLAG = (1 << 26) SEL_HP_CLK = (1 << 21) LD_FW_HEADER_LEN = 3 count_flags = FW_NO_EXEC_FLAG | SEL_HP_CLK count_flags |= (LD_FW_HEADER_LEN + int(len(sha) / 4)) self.init_tx_data() index = list(self.message_id.values()).index('Control') msg_id = list(self.message_id.keys())[index] bitstruct.pack_into_dict(self.cmd_word_fmt, self.cmd_keys, self.tx_data, 0, {'cmd': 1, 'rsvd1': 0, 'rsp': 0, 'msg_id': msg_id, 'rsvd2': 0, 'cmd_rsp': code}) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, count_flags) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, 0xbe000000) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, 0) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, size) self.tx_index += 4 self.endian_swap(self.tx_data, self.tx_index, sha) self.tx_index += len(sha) return self.tx_data def create_execfw_cmd(self): """ Creates a command to excute firmware """ cmd = 'Execute' print('CMD >>> %s.' % cmd) code = self.get_cmd_code(cmd) EXE_FW_HEADER_LEN = 1 count = EXE_FW_HEADER_LEN self.init_tx_data() index = list(self.message_id.values()).index('Control') msg_id = list(self.message_id.keys())[index] bitstruct.pack_into_dict(self.cmd_word_fmt, self.cmd_keys, self.tx_data, 0, {'cmd': 1, 'rsvd1': 0, 'rsp': 0, 'msg_id': msg_id, 'rsvd2': 0, 'cmd_rsp': code}) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, count) self.tx_index += 4 bitstruct.pack_into('u32', self.tx_data, self.tx_index * 8, 0xbe000000) self.tx_index += 4 return self.tx_data def create_bulk_message(self, data): """ Copies the input byte stream to the bulk message buffer """ self.endian_swap(self.tx_bulk_data, 0, data) return self.tx_bulk_data[:len(data)] def get_bulk_message_size(self): """ Returns the size of the bulk message buffer """ return len(self.tx_bulk_data)