diff --git a/README.md b/README.md index 54055f7..095339c 100644 --- a/README.md +++ b/README.md @@ -56,3 +56,72 @@ * cb`func`: 应用消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` * rrpc_cb`func`: 应用RRPC消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` + +## Demo +``` +import json +import logging +import time + +from iotedgeapplicationlinksdk import getLogger +from iotedgeapplicationlinksdk.client import get_application_config, get_application_name, get_gateway_product_sn, get_gateway_device_sn, publish, register_callback + +# 配置log +log = getLogger() +log.setLevel(logging.DEBUG) + +# 主函数 +if __name__ == "__main__": + try: + # 获取应用配置信息 + appConfig = get_application_config() + log.info('app config:{}'.format(appConfig)) + + # 从应用配置获取设备数据上报周期 + uploadPeriod = 5 + if "period" in appConfig.keys() and isinstance(appConfig['period'], int): + uploadPeriod = int(appConfig['period']) + + appName = get_application_name() + productSN = get_gateway_product_sn() + deviceSN = get_gateway_device_sn() + log.info('app info:{}, {}, {}'.format(appName, productSN, deviceSN)) + + # 设置应用上报topic + topic = '/{}/{}/upload'.format(productSN, deviceSN) + + # 设置应用消息回调 + def callback(topic: str, payload: bytes): + log.info("recv message from {} : {}".format(topic, str(payload))) + + # 设置应用RRPC消息回调 + def rrpc_callback(topic: str, payload: bytes): + log.info("recv rrpc message from {} : {}".format( + topic, str(payload))) + + register_callback(callback, rrpc_callback) + i = 0 + # 周期上报消息 + while True: + relayStatus = ("on", "off")[i % 2 == 0] + payload = { + "timestamp": time.time(), + 'RelayStatus': relayStatus + } + + byts = json.dumps(payload).encode('utf-8') + + publish(topic, byts) + + log.info("upload {} : {}".format(topic, str(byts))) + + time.sleep(uploadPeriod) + i = i+1 + if (i > 1000): + i = 1 + + except Exception as e: + log.error('load app config error: {}'.format(str(e))) + exit(1) + +``` \ No newline at end of file diff --git a/examples/demo/index.py b/examples/demo/index.py index 0b6d755..32b74dc 100644 --- a/examples/demo/index.py +++ b/examples/demo/index.py @@ -12,11 +12,11 @@ log.setLevel(logging.DEBUG) # 主函数 if __name__ == "__main__": try: - # 获取驱动及子设备配置信息 + # 获取应用配置信息 appConfig = get_application_config() log.info('app config:{}'.format(appConfig)) - # 从驱动配置获取设备数据上报周期 + # 从应用配置获取设备数据上报周期 uploadPeriod = 5 if "period" in appConfig.keys() and isinstance(appConfig['period'], int): uploadPeriod = int(appConfig['period']) @@ -26,19 +26,21 @@ if __name__ == "__main__": deviceSN = get_gateway_device_sn() log.info('app info:{}, {}, {}'.format(appName, productSN, deviceSN)) - # 判断是否绑定子设备 + # 设置应用上报topic topic = '/{}/{}/upload'.format(productSN, deviceSN) + # 设置应用消息回调 def callback(topic: str, payload: bytes): log.info("recv message from {} : {}".format(topic, str(payload))) + # 设置应用RRPC消息回调 def rrpc_callback(topic: str, payload: bytes): log.info("recv rrpc message from {} : {}".format( topic, str(payload))) register_callback(callback, rrpc_callback) - i = 0 + # 周期上报消息 while True: relayStatus = ("on", "off")[i % 2 == 0] payload = { @@ -54,6 +56,8 @@ if __name__ == "__main__": time.sleep(uploadPeriod) i = i+1 + if (i > 1000): + i = 1 except Exception as e: log.error('load app config error: {}'.format(str(e))) diff --git a/iotedgeapplicationlinksdk/client.py b/iotedgeapplicationlinksdk/client.py index 0c25078..e06c341 100644 --- a/iotedgeapplicationlinksdk/client.py +++ b/iotedgeapplicationlinksdk/client.py @@ -80,8 +80,11 @@ class _natsClientSub(object): data = base64.b64decode(js['payload']) if isinstance(topic, str) and topic.startswith("/$system/") and topic.count('/rrpc/request/') > 0: topic = topic.replace("/request/", "/response/", 1) - _msg_rrpc_cb(topic, data) - else: + if _msg_rrpc_cb: + _msg_rrpc_cb(topic, data) + return + + if _msg_cb: _msg_cb(topic, data) except Exception as e: _logger.error('handle msg error {}'.format(str(e)))