From 124b0102210e48ae4ecf251b08c1f67a3c255393 Mon Sep 17 00:00:00 2001 From: "joy,zhou" Date: Wed, 26 May 2021 10:59:17 +0800 Subject: [PATCH] add nats --- README.md | 88 ++++++++++++++++++++++------- iotedgeapplicationlinksdk/client.py | 81 ++++++++++++++++++++++++++ setup.py | 2 +- 3 files changed, 151 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 095339c..4c6d646 100644 --- a/README.md +++ b/README.md @@ -1,63 +1,113 @@ -## API参考文档 +## API 参考文档 -主要的API参考文档如下: +主要的 API 参考文档如下: #### from iotedgeapplicationlinksdk -* **[getLogger()](#getLogger)** + +- **[getLogger()](#getLogger)** --- #### from iotedgeapplicationlinksdk.client -* **[get_application_config()](#get_application_config)** -* **[get_application_name()](#get_application_name)** -* **[get_gateway_product_sn()](#get_gateway_product_sn)** -* **[get_gateway_device_sn()](#get_gateway_device_sn)** -* **[publish()](#publish)** -* **[register_callback()](#register_callback)** +- **[get_application_config()](#get_application_config)** +- **[get_application_name()](#get_application_name)** +- **[get_gateway_product_sn()](#get_gateway_product_sn)** +- **[get_gateway_device_sn()](#get_gateway_device_sn)** +- **[publish()](#publish)** +- **[register_callback()](#register_callback)** +- **[natsPublish()](#natsPublish)** +- **[natsSubscribe()](#natsSubscribe)** --- + + ### getLogger() -返回应用内置logger。 + +返回应用内置 logger。 --- + + ### get_application_config() + 返回应用配置。 --- + + ### get_application_name() + 返回应用名称。 --- + + ### get_gateway_product_sn() -返回应用网关ProductSN。 + +返回应用网关 ProductSN。 --- + + ### get_gateway_device_sn() -返回应用网关DeviceSN。 + +返回应用网关 DeviceSN。 --- + -### publish() -上报消息到Link IoT Edge。参数(topic: str, payload: bytes) -* topic`str`: 上报消息到Link IoT Edge的mqtt topic。 -* payload`bytes`: 上报消息到Link IoT Edge的消息内容 +### publish() + +上报消息到 Link IoT Edge。参数(topic: str, payload: bytes) + +- topic`str`: 上报消息到 Link IoT Edge 的 mqtt topic。 +- payload`bytes`: 上报消息到 Link IoT Edge 的消息内容 --- + + ### register_callback + 设置应用接收消息的回调函数,参数(cb, rrpc_cb) -* cb`func`: 应用消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` -* rrpc_cb`func`: 应用RRPC消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` +- cb`func`: 应用消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` +- rrpc_cb`func`: 应用 RRPC 消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')` + +--- + + + +### natsPublish(subject, payload) + +通过 nats Publish 消息 + +- subject`str`: nats Subject +- payload`bytes`: nats 消息 + +--- + + + +### natsSubscribe(subject, queue, callback) + +通过 nats 订阅 主题 + +- subject`str`: nats Subject +- queue`str`: nats 队列 +- callback`func(msg)`: nats 消息回调, msg 为 nats 消息 + +--- ## Demo + ``` import json import logging @@ -124,4 +174,4 @@ if __name__ == "__main__": log.error('load app config error: {}'.format(str(e))) exit(1) -``` \ No newline at end of file +``` diff --git a/iotedgeapplicationlinksdk/client.py b/iotedgeapplicationlinksdk/client.py index e06c341..631bdc1 100644 --- a/iotedgeapplicationlinksdk/client.py +++ b/iotedgeapplicationlinksdk/client.py @@ -22,6 +22,38 @@ _msg_cb = None _msg_rrpc_cb = None _nat_publish_queue = queue.Queue() +_nat_client_publish_queue = queue.Queue() + + +class _natsPublish(object): + def __init__(self): + self.url = os.environ.get( + 'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222' + self.nc = NATS() + self.loop = asyncio.new_event_loop() + + async def _publish(self): + global _nat_client_publish_queue + try: + await self.nc.connect(servers=[self.url], loop=self.loop) + except Exception as e1: + _logger.error(e1) + sys.exit(1) + + while True: + try: + msg = _nat_client_publish_queue.get() + bty = msg['payload'] + await self.nc.publish(subject=msg['subject'], + payload=bty.encode('utf-8')) + await self.nc.flush() + except NatsError as e: + _logger.error(e) + except Exception as e: + _logger.error(e) + + def start(self): + self.loop.run_until_complete(self._publish()) class _natsClientPub(object): @@ -56,6 +88,30 @@ class _natsClientPub(object): # self.loop.run_forever() +class _natsSubscribe(object): + def __init__(self, subject: str, queue: str, cb): + self.url = os.environ.get( + 'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222' + self.nc = NATS() + self.loop = asyncio.new_event_loop() + self.cb = cb + self.subject = subject + self.queue = queue + + async def _connect(self): + try: + await self.nc.connect(servers=[self.url], loop=self.loop) + except Exception as e1: + _logger.error(e1) + sys.exit(1) + await self.nc.subscribe(self.subject, queue=self.queue, cb=self.cb, is_async=True) + await self.nc.flush() + + def start(self): + self.loop.run_until_complete(self._connect()) + self.loop.run_forever() + + class _natsClientSub(object): def __init__(self): self.url = os.environ.get( @@ -137,14 +193,39 @@ def publish(topic: str, msg: bytes): _nat_publish_queue.put(data) +def natsPublish(subject: str, payload: bytes): + global _nat_client_publish_queue + data = { + 'subject': subject, + 'payload': payload + } + _nat_client_publish_queue.put(data) + + +def natsSubscribe(subject, queue, cb): + def _nats_sub(): + _natsSubscribe(subject, queue, cb).start() + t = threading.Thread(target=_nats_sub) + t.setDaemon(True) + t.start() + + def _start_pub(): _natsClientPub().start() +def _nats_pub(): + _natsPublish().start() + + def _start_sub(): _natsClientSub().start() +_t_pub = threading.Thread(target=_nats_pub) +_t_pub.setDaemon(True) +_t_pub.start() + _t_nats_pub = threading.Thread(target=_start_pub) _t_nats_pub.setDaemon(True) _t_nats_pub.start() diff --git a/setup.py b/setup.py index e4e1d5e..98eedfd 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ if not (sys.version_info[0] == 3): setup( name='iotedge_application_link_sdk', - version='0.0.3', + version='0.1.0', author='ucloud.cn', url='https://pypi.org/project/iotedge_application_link_sdk/', author_email='joy.zhou@ucloud.cn',