53 lines
1.6 KiB
Python
Executable File
53 lines
1.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
import serial
|
|
import json
|
|
from time import sleep
|
|
import argparse
|
|
|
|
j = """
|
|
[
|
|
{"name":"version","on_changed": false, "read_only": true, "message_size": 4},
|
|
{"name":"sensor_data","on_changed": true, "read_only": false, "message_size": 4},
|
|
{"name":"start_measurement","on_changed": false, "read_only": false, "message_size": 1}
|
|
]"""
|
|
|
|
channels = json.loads(j)
|
|
parser = argparse.ArgumentParser(description='Read zbus events via serial.', allow_abbrev=False)
|
|
parser.add_argument("port", type=str, help='The tty or COM port to be used')
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
def fetch_sentence(ser):
|
|
channel_id = int.from_bytes(ser.read(), "little")
|
|
channel_name = channels[channel_id]['name']
|
|
msg_size = channels[channel_id]['message_size']
|
|
msg = ser.read(msg_size)
|
|
ser.read(1) # skip '*'
|
|
return (channel_name, msg_size, msg)
|
|
|
|
|
|
def pub_start_measurement(ser, action: bool):
|
|
print(
|
|
f"Proxy PUB [{channels[2]['name']}] -> start measurement")
|
|
ser.write(b'$')
|
|
ser.write(b'\x02') # idx
|
|
ser.write(b'\x01')
|
|
ser.write(b'*')
|
|
ser.flush()
|
|
|
|
|
|
ser = serial.Serial(args.port)
|
|
pub_start_measurement(ser, True)
|
|
while True:
|
|
d = ser.read()
|
|
if d == b'$':
|
|
channel_name, msg_size, msg = fetch_sentence(ser)
|
|
if channel_name == "sensor_data":
|
|
print(
|
|
f"Proxy NOTIFY: [{channel_name}] -> sensor value {int.from_bytes(msg, 'little')}")
|
|
sleep(1)
|
|
pub_start_measurement(ser, True)
|