add nats
This commit is contained in:
parent
23dc5e34e5
commit
124b010221
88
README.md
88
README.md
|
@ -1,63 +1,113 @@
|
|||
## API参考文档
|
||||
## API 参考文档
|
||||
|
||||
主要的API参考文档如下:
|
||||
主要的 API 参考文档如下:
|
||||
|
||||
#### from iotedgeapplicationlinksdk
|
||||
* **[getLogger()](#getLogger)**
|
||||
|
||||
- **[getLogger()](#getLogger)**
|
||||
|
||||
---
|
||||
|
||||
#### from iotedgeapplicationlinksdk.client
|
||||
* **[get_application_config()](#get_application_config)**
|
||||
* **[get_application_name()](#get_application_name)**
|
||||
* **[get_gateway_product_sn()](#get_gateway_product_sn)**
|
||||
* **[get_gateway_device_sn()](#get_gateway_device_sn)**
|
||||
* **[publish()](#publish)**
|
||||
* **[register_callback()](#register_callback)**
|
||||
|
||||
- **[get_application_config()](#get_application_config)**
|
||||
- **[get_application_name()](#get_application_name)**
|
||||
- **[get_gateway_product_sn()](#get_gateway_product_sn)**
|
||||
- **[get_gateway_device_sn()](#get_gateway_device_sn)**
|
||||
- **[publish()](#publish)**
|
||||
- **[register_callback()](#register_callback)**
|
||||
- **[natsPublish()](#natsPublish)**
|
||||
- **[natsSubscribe()](#natsSubscribe)**
|
||||
|
||||
---
|
||||
|
||||
<a name="getLogger"></a>
|
||||
|
||||
### getLogger()
|
||||
返回应用内置logger。
|
||||
|
||||
返回应用内置 logger。
|
||||
|
||||
---
|
||||
|
||||
<a name="get_application_config"></a>
|
||||
|
||||
### get_application_config()
|
||||
|
||||
返回应用配置。
|
||||
|
||||
---
|
||||
|
||||
<a name="get_application_name"></a>
|
||||
|
||||
### get_application_name()
|
||||
|
||||
返回应用名称。
|
||||
|
||||
---
|
||||
|
||||
<a name="get_gateway_product_sn"></a>
|
||||
|
||||
### get_gateway_product_sn()
|
||||
返回应用网关ProductSN。
|
||||
|
||||
返回应用网关 ProductSN。
|
||||
|
||||
---
|
||||
|
||||
<a name="get_gateway_device_sn"></a>
|
||||
|
||||
### get_gateway_device_sn()
|
||||
返回应用网关DeviceSN。
|
||||
|
||||
返回应用网关 DeviceSN。
|
||||
|
||||
---
|
||||
|
||||
<a name="publish"></a>
|
||||
### publish()
|
||||
上报消息到Link IoT Edge。参数(topic: str, payload: bytes)
|
||||
|
||||
* topic`str`: 上报消息到Link IoT Edge的mqtt topic。
|
||||
* payload`bytes`: 上报消息到Link IoT Edge的消息内容
|
||||
### publish()
|
||||
|
||||
上报消息到 Link IoT Edge。参数(topic: str, payload: bytes)
|
||||
|
||||
- topic`str`: 上报消息到 Link IoT Edge 的 mqtt topic。
|
||||
- payload`bytes`: 上报消息到 Link IoT Edge 的消息内容
|
||||
|
||||
---
|
||||
|
||||
<a name="register_callback"></a>
|
||||
|
||||
### register_callback
|
||||
|
||||
设置应用接收消息的回调函数,参数(cb, rrpc_cb)
|
||||
|
||||
* cb`func`: 应用消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')`
|
||||
* rrpc_cb`func`: 应用RRPC消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')`
|
||||
- cb`func`: 应用消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')`
|
||||
- rrpc_cb`func`: 应用 RRPC 消息回调,例如:` def callback(topic:str, msg:bytes): print(str(msg,'utf-8')`
|
||||
|
||||
---
|
||||
|
||||
<a name="natsPublish"></a>
|
||||
|
||||
### natsPublish(subject, payload)
|
||||
|
||||
通过 nats Publish 消息
|
||||
|
||||
- subject`str`: nats Subject
|
||||
- payload`bytes`: nats 消息
|
||||
|
||||
---
|
||||
|
||||
<a name="natsSubscribe"></a>
|
||||
|
||||
### natsSubscribe(subject, queue, callback)
|
||||
|
||||
通过 nats 订阅 主题
|
||||
|
||||
- subject`str`: nats Subject
|
||||
- queue`str`: nats 队列
|
||||
- callback`func(msg)`: nats 消息回调, msg 为 nats 消息
|
||||
|
||||
---
|
||||
|
||||
## Demo
|
||||
|
||||
```
|
||||
import json
|
||||
import logging
|
||||
|
@ -124,4 +174,4 @@ if __name__ == "__main__":
|
|||
log.error('load app config error: {}'.format(str(e)))
|
||||
exit(1)
|
||||
|
||||
```
|
||||
```
|
||||
|
|
|
@ -22,6 +22,38 @@ _msg_cb = None
|
|||
_msg_rrpc_cb = None
|
||||
|
||||
_nat_publish_queue = queue.Queue()
|
||||
_nat_client_publish_queue = queue.Queue()
|
||||
|
||||
|
||||
class _natsPublish(object):
|
||||
def __init__(self):
|
||||
self.url = os.environ.get(
|
||||
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
|
||||
self.nc = NATS()
|
||||
self.loop = asyncio.new_event_loop()
|
||||
|
||||
async def _publish(self):
|
||||
global _nat_client_publish_queue
|
||||
try:
|
||||
await self.nc.connect(servers=[self.url], loop=self.loop)
|
||||
except Exception as e1:
|
||||
_logger.error(e1)
|
||||
sys.exit(1)
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = _nat_client_publish_queue.get()
|
||||
bty = msg['payload']
|
||||
await self.nc.publish(subject=msg['subject'],
|
||||
payload=bty.encode('utf-8'))
|
||||
await self.nc.flush()
|
||||
except NatsError as e:
|
||||
_logger.error(e)
|
||||
except Exception as e:
|
||||
_logger.error(e)
|
||||
|
||||
def start(self):
|
||||
self.loop.run_until_complete(self._publish())
|
||||
|
||||
|
||||
class _natsClientPub(object):
|
||||
|
@ -56,6 +88,30 @@ class _natsClientPub(object):
|
|||
# self.loop.run_forever()
|
||||
|
||||
|
||||
class _natsSubscribe(object):
|
||||
def __init__(self, subject: str, queue: str, cb):
|
||||
self.url = os.environ.get(
|
||||
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
|
||||
self.nc = NATS()
|
||||
self.loop = asyncio.new_event_loop()
|
||||
self.cb = cb
|
||||
self.subject = subject
|
||||
self.queue = queue
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
await self.nc.connect(servers=[self.url], loop=self.loop)
|
||||
except Exception as e1:
|
||||
_logger.error(e1)
|
||||
sys.exit(1)
|
||||
await self.nc.subscribe(self.subject, queue=self.queue, cb=self.cb, is_async=True)
|
||||
await self.nc.flush()
|
||||
|
||||
def start(self):
|
||||
self.loop.run_until_complete(self._connect())
|
||||
self.loop.run_forever()
|
||||
|
||||
|
||||
class _natsClientSub(object):
|
||||
def __init__(self):
|
||||
self.url = os.environ.get(
|
||||
|
@ -137,14 +193,39 @@ def publish(topic: str, msg: bytes):
|
|||
_nat_publish_queue.put(data)
|
||||
|
||||
|
||||
def natsPublish(subject: str, payload: bytes):
|
||||
global _nat_client_publish_queue
|
||||
data = {
|
||||
'subject': subject,
|
||||
'payload': payload
|
||||
}
|
||||
_nat_client_publish_queue.put(data)
|
||||
|
||||
|
||||
def natsSubscribe(subject, queue, cb):
|
||||
def _nats_sub():
|
||||
_natsSubscribe(subject, queue, cb).start()
|
||||
t = threading.Thread(target=_nats_sub)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
|
||||
|
||||
def _start_pub():
|
||||
_natsClientPub().start()
|
||||
|
||||
|
||||
def _nats_pub():
|
||||
_natsPublish().start()
|
||||
|
||||
|
||||
def _start_sub():
|
||||
_natsClientSub().start()
|
||||
|
||||
|
||||
_t_pub = threading.Thread(target=_nats_pub)
|
||||
_t_pub.setDaemon(True)
|
||||
_t_pub.start()
|
||||
|
||||
_t_nats_pub = threading.Thread(target=_start_pub)
|
||||
_t_nats_pub.setDaemon(True)
|
||||
_t_nats_pub.start()
|
||||
|
|
Loading…
Reference in New Issue