This commit is contained in:
joy,zhou 2021-03-17 11:53:00 +08:00
parent 4471e8338c
commit 192e794748
16 changed files with 111 additions and 1019 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.vscode
build
dist
MANIFEST

View File

@ -1,5 +1,6 @@
{
"cSpell.words": [
"iotedgedriverlinksdk"
]
"iotedgeapplicationlinksdk"
],
"python.pythonPath": "/usr/bin/python3"
}

View File

@ -2,9 +2,8 @@ import json
import logging
import time
from iotedgedriverlinksdk import getLogger
from iotedgedriverlinksdk.client import Config, SubDevice
from iotedgedriverlinksdk.exception import BaseEdgeException
from iotedgeapplicationlinksdk import getLogger
from iotedgeapplicationlinksdk.client import get_application_config, get_application_name, get_gateway_product_sn, get_gateway_device_sn, publish, register_callback
# 配置log
log = getLogger()
@ -14,75 +13,48 @@ log.setLevel(logging.DEBUG)
if __name__ == "__main__":
try:
# 获取驱动及子设备配置信息
driverConfig = Config().getDriverInfo()
log.info('driver config:{}'.format(driverConfig))
appConfig = get_application_config()
log.info('app config:{}'.format(appConfig))
# 从驱动配置获取设备数据上报周期
uploadPeriod = 5
if "period" in driverConfig.keys() and isinstance(driverConfig['period'], int):
uploadPeriod = int(driverConfig['period'])
if "period" in appConfig.keys() and isinstance(appConfig['period'], int):
uploadPeriod = int(appConfig['period'])
deviceInfoList = Config().getDeviceInfos()
log.info('device list config:{}'.format(deviceInfoList))
except Exception as e:
log.error('load driver config error: {}'.format(str(e)))
exit(1)
appName = get_application_name()
productSN = get_gateway_product_sn()
deviceSN = get_gateway_device_sn()
log.info('app info:{}, {}, {}'.format(appName, productSN, deviceSN))
try:
# 判断是否绑定子设备
if len(deviceInfoList) < 1:
log.error(
'subdevice null, please bind sub device for driver')
while True:
time.sleep(60)
topic = '/{}/{}/upload'.format(productSN, deviceSN)
# 取其中一个子设备
subDeviceInfo = deviceInfoList[0]
# 获取子设备的ProductSN key值为 productSN
productSN = subDeviceInfo['productSN']
# 获取子设备的DeviceSN key值为 deviceSN
deviceSN = subDeviceInfo['deviceSN']
def callback(topic: str, payload: b''):
def callback(topic: str, payload: bytes):
log.info("recv message from {} : {}".format(topic, str(payload)))
# 初始化一个子设备对象
subDevice = SubDevice(product_sn=productSN,
device_sn=deviceSN, on_msg_callback=callback)
# 子设备上线
subDevice.login()
def rrpc_callback(topic: str, payload: bytes):
log.info("recv rrpc message from {} : {}".format(
topic, str(payload)))
# 获取当前子设备的配置
deviceConfig = subDeviceInfo['config']
log.info('sub device config:{}'.format(deviceConfig))
# 从子设备配置获取子设备上报topic定义
topic = "/{}/{}/upload".format(productSN, deviceSN) # 此处为默认topic
if 'topic' in deviceConfig and isinstance(deviceConfig['topic'], str):
topic = deviceConfig['topic'].format(productSN, deviceSN)
# 从子设备配置获取子设备上报参数名称
param = 'RelayStatus' # 此处定义默认属性名称: RelayStatus
if 'paramName' in deviceConfig and isinstance(deviceConfig['paramName'], str):
param = deviceConfig['paramName']
register_callback(callback, rrpc_callback)
i = 0
while True:
relayStatus = ("on", "off")[i % 2 == 0]
payload = {
"timestamp": time.time(),
param: relayStatus
'RelayStatus': relayStatus
}
byts = json.dumps(payload).encode('utf-8')
subDevice.publish(topic, byts)
publish(topic, byts)
log.info("upload {} : {}".format(topic, str(byts)))
time.sleep(uploadPeriod)
i = i+1
except BaseEdgeException:
log.error('Edge Exception: {}'.format(str(e)))
except Exception as e:
log.error('Exception error: {}'.format(str(e)))
log.error('load app config error: {}'.format(str(e)))
exit(1)

View File

@ -6,7 +6,7 @@ fi
mkdir tmp
cp index.py tmp/
cd tmp
pip3 install -t . iotedge_driver_link_sdk==0.0.1 #打包驱动SDK
pip3 install -t . iotedge_application_link_sdk==0.0.1 #打包驱动SDK
zip -r demo.zip .
cd ..
cp tmp/demo.zip .

Binary file not shown.

View File

@ -1,81 +0,0 @@
import json
import logging
import time
from iotedgedriverlinksdk import getLogger
from iotedgedriverlinksdk.client import Config, SubDevice
from iotedgedriverlinksdk.exception import BaseEdgeException
# 配置log
log = getLogger()
log.setLevel(logging.DEBUG)
# 主函数
if __name__ == "__main__":
try:
# 获取驱动及子设备配置信息
driverConfig = Config().getDriverInfo()
log.info('driver config:{}'.format(driverConfig))
# 从驱动配置获取设备数据上报周期
uploadPeriod = 5
if "period" in driverConfig.keys() and isinstance(driverConfig['period'], int):
uploadPeriod = int(driverConfig['period'])
deviceInfoList = Config().getDeviceInfos()
log.info('device list config:{}'.format(deviceInfoList))
except Exception as e:
log.error('load driver config error: {}'.format(str(e)))
exit(1)
try:
# 判断是否绑定子设备
if len(deviceInfoList) < 1:
log.error(
'subdevice null, please bind sub device for driver')
while True:
time.sleep(60)
# 取其中一个子设备
subDeviceInfo = deviceInfoList[0]
# 获取子设备的ProductSN key值为 productSN
productSN = subDeviceInfo['productSN']
# 获取子设备的DeviceSN key值为 deviceSN
deviceSN = subDeviceInfo['deviceSN']
def callback(topic: str, payload: b''):
log.info("recv message from {} : {}".format(topic, str(payload)))
def rrpc_callback(topic: str, payload: b''):
return b'{"a":1}'
# 初始化一个子设备对象
subDevice = SubDevice(product_sn=productSN,
device_sn=deviceSN, on_msg_callback=callback)
subDevice.set_rrpc_callback(rrpc_callback)
# 子设备上线
subDevice.login()
# 获取当前子设备的配置
deviceConfig = subDeviceInfo['config']
log.info('sub device config:{}'.format(deviceConfig))
# 从子设备配置获取子设备上报topic定义
topic = "/{}/{}/upload".format(productSN, deviceSN) # 此处为默认topic
if 'topic' in deviceConfig and isinstance(deviceConfig['topic'], str):
topic = deviceConfig['topic'].format(productSN, deviceSN)
# 从子设备配置获取子设备上报参数名称
param = 'RelayStatus' # 此处定义默认属性名称: RelayStatus
if 'paramName' in deviceConfig and isinstance(deviceConfig['paramName'], str):
param = deviceConfig['paramName']
while True:
time.sleep(uploadPeriod)
except BaseEdgeException:
log.error('Edge Exception: {}'.format(str(e)))
except Exception as e:
log.error('Exception error: {}'.format(str(e)))

View File

@ -1,13 +0,0 @@
#!/bin/bash -e
if [ -e "./demo.zip" ] ; then
rm ./demo.zip
fi
rm -rf tmp
mkdir tmp
cp index.py tmp/
cd tmp
pip3 install -t . iotedge_driver_link_sdk==0.0.2 #打包驱动SDK
zip -r demo.zip .
cd ..
cp tmp/demo.zip .
rm -rf tmp

View File

@ -1,156 +0,0 @@
import datetime
import json
import logging
import signal
import sys
import tornado.httpserver
import tornado.ioloop
import tornado.web
from tornado.websocket import WebSocketHandler
from iotedgedriverlinksdk import getLogger
from iotedgedriverlinksdk.client import Config, SubDevice, getConfig
from iotedgedriverlinksdk.edge import (add_topo, delete_topo, get_topo,
register_device,
set_on_status_change_callback,
set_on_topo_change_callback)
from iotedgedriverlinksdk.exception import (
BaseEdgeException, EdgeDriverLinkDeviceOfflineException,
EdgeDriverLinkException, EdgeDriverLinkOfflineException,
EdgeDriverLinkTimeoutException)
log = getLogger()
log.setLevel(logging.DEBUG)
class WebSocketSever(WebSocketHandler):
def check_origin(self, origin):
return True
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*") # 这个地方可以写域名
self.set_header("Access-Control-Allow-Headers", "*")
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
def initialize(self, loop):
self.loop = loop
self.client_id = ''
self.product_sn = ''
self.device_sn = ''
self.client = SubDevice()
def on_message_callback(self, topic, msg):
self.write_message(str(msg))
def open(self):
try:
product_sn = self.get_argument('product_sn')
device_sn = self.get_argument('device_sn')
except tornado.web.MissingArgumentError as e:
self.close(reason=str(e))
else:
log.info("websocket connect from: {}.{}".format(
product_sn, device_sn))
self.product_sn = product_sn
self.device_sn = device_sn
self.client_id = product_sn+'.'+device_sn
try:
self.client.set_product_sn(product_sn)
self.client.set_device_sn(device_sn)
self.client.login()
self.client.set_msg_callback(self.on_message_callback)
self.write_message('login success')
except Exception as e:
self.close(reason=str(e))
def on_message(self, message):
try:
if self.client_id == '' or self.client is None:
self.client.logout()
self.close(reason='unknown client identify')
return
data = json.loads(message)
log.info("websocket [{}] from:{}".format(
data, self.client_id))
if 'action' in data:
action = data['action']
if action == 'add_topo':
add_topo(self.product_sn, self.device_sn)
elif action == 'delete_topo':
delete_topo(self.product_sn, self.device_sn)
elif action == "logout":
self.client.logout()
self.close(reason='client exit')
return
elif action == 'get_topo':
topo = get_topo()
self.write_message(topo)
elif 'topic' in data and 'payload' in data:
payload = data['payload']
if isinstance(payload, dict):
byts = json.dumps(payload)
log.info('send time:{}'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
self.client.publish(
topic=data['topic'], payload=byts.encode('utf-8'))
log.info('send time:{}'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
elif isinstance(payload, str):
byts = payload.encode('utf-8')
log.info('send time:{}'.format(
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
self.client.publish(
topic=data['topic'], payload=byts)
else:
print('unknown message')
except Exception as e:
self.client.logout()
self.close(reason=str(e))
def allow_draft76(self):
return True
def on_close(self):
self.client.logout()
log.info("websocket closed from:{}, with reason: {}".format(
self.client_id, self.close_reason))
class Application(tornado.web.Application):
def __init__(self, handlers, setting):
super(Application, self).__init__(handlers, **setting)
def main():
from tornado.platform.asyncio import AnyThreadEventLoopPolicy
import asyncio
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
lo = tornado.ioloop.IOLoop.current()
handlers = [
(r"/ws", WebSocketSever, dict(loop=lo))
]
setting = dict(xsrf_cookies=False)
app = Application(handlers, setting)
app.listen(port=4567)
print("websocket start listen port on:{}".format('4567'))
lo.start()
def exit_handler(signum, frame):
sys.exit(0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
main()

View File

@ -1,18 +0,0 @@
#!/bin/bash -e
if [ -e "./ws.zip" ] ; then
rm ./ws.zip
fi
mkdir tmp
cp index.py tmp
cd tmp
pip3 install -t . iotedge_driver_link_sdk==0.0.1 #打包驱动SDK
pip3 install -t . tornado -i https://mirrors.aliyun.com/pypi/simple/ #打包自己的依赖
zip -r ws.zip .
cd ..
cp tmp/ws.zip .
rm -rf tmp

Binary file not shown.

View File

@ -10,11 +10,10 @@ import time
from nats.aio.client import Client as NATS
from nats.aio.errors import NatsError
_driver_id = ''
_driver_name = ''
_deviceInfos = []
_driverInfo = None
_driver_name = ''
_app_name = ''
_appInfo = None
_product_sn = ''
_device_sn = ''
# get Config
_config_path = './etc/iotedge/config.json'
@ -23,24 +22,18 @@ with open(_config_path, 'r') as load_f:
load_dict = json.load(load_f)
print(str(load_dict))
# print('----config: {} -------'.format(load_dict))
_app_name = load_dict['appName']
_product_sn = load_dict['productSN']
_device_sn = load_dict['deviceSN']
_driver_id = load_dict['driverID']
if 'appInfo' in load_dict.keys():
_appInfo = load_dict['appInfo']
if 'driverName' in load_dict.keys():
_driver_name = load_dict['driverName']
else:
_driver_name = _driver_id
if 'deviceList' in load_dict.keys():
_deviceInfos = load_dict['deviceList']
if 'driverInfo' in load_dict.keys():
_driverInfo = load_dict['driverInfo']
except Exception as e:
print('load config file error:{}'.format(e))
sys.exit(1)
print("driver_id: {}, driver name:{}".format(_driver_id, _driver_name))
print("application name:{}".format(_app_name))
class _Logger(object):
@ -135,12 +128,12 @@ class _Logger(object):
self.logger.setLevel(level)
_uiotedge_logger = _Logger(_driver_name)
_iotedge_logger = _Logger("app_"+_app_name)
def _init_logger():
global _uiotedge_logger
_uiotedge_logger.start()
global _iotedge_logger
_iotedge_logger.start()
logging.debug('init logger success')
@ -150,5 +143,5 @@ _t_logger.start()
def getLogger():
global _uiotedge_logger
return _uiotedge_logger
global _iotedge_logger
return _iotedge_logger

View File

@ -2,16 +2,14 @@ import asyncio
import json
import os
import queue
import random
import signal
import string
import base64
import sys
import threading
import time
from nats.aio.client import Client as NATS
from nats.aio.errors import NatsError
from iotedgedriverlinksdk import _driver_id, getLogger
from iotedgeapplicationlinksdk import _app_name, getLogger, _product_sn, _device_sn, _appInfo
_logger = getLogger()
@ -20,11 +18,13 @@ def exit_handler(signum, frame):
sys.exit(0)
_msg_cb = None
_msg_rrpc_cb = None
_nat_publish_queue = queue.Queue()
_nat_subscribe_queue = queue.Queue()
class natsClientPub(object):
class _natsClientPub(object):
def __init__(self):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
@ -56,7 +56,7 @@ class natsClientPub(object):
# self.loop.run_forever()
class natsClientSub(object):
class _natsClientSub(object):
def __init__(self):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
@ -71,16 +71,22 @@ class natsClientSub(object):
sys.exit(1)
async def message_handler(msg):
global _nat_subscribe_queue
# subject = msg.subject
# reply = msg.reply
# data = msg.data.decode()
# sdk_print("Received a message on '{subject} {reply}': {data}".format(
# subject=subject, reply=reply, data=data))
_nat_subscribe_queue.put(msg)
await self.nc.subscribe("edge.local."+_driver_id, queue=_driver_id, cb=message_handler, is_async=True)
await self.nc.subscribe("edge.local.broadcast", queue=_driver_id, cb=message_handler, is_async=True)
await self.nc.subscribe("edge.state.reply", queue=_driver_id, cb=message_handler, is_async=True)
global _msg_rrpc_cb
global _msg_cb
_logger.debug("recv message:{} " .format(str(msg)))
try:
js = json.loads(msg)
topic = js['topic']
data = base64.b64decode(js['payload'])
if isinstance(topic, str) and topic.startswith("/$system/") and topic.count('/rrpc/request/') > 0:
topic = topic.replace("/request/", "/response/", 1)
_msg_rrpc_cb(topic, data)
else:
_msg_cb(topic, data)
except Exception as e:
_logger.error('handle msg error {}'.format(str(e)))
await self.nc.subscribe("edge.app."+_app_name, queue=_app_name, cb=message_handler, is_async=True)
await self.nc.flush()
def start(self):
@ -88,27 +94,57 @@ class natsClientSub(object):
self.loop.run_forever()
def publish_nats_msg(msg):
def register_callback(cb, rrpc_cb):
global _msg_cb
_msg_cb = cb
global _msg_rrpc_cb
_msg_rrpc_cb = rrpc_cb
def get_gateway_product_sn():
return _product_sn
def get_gateway_device_sn():
return _device_sn
def get_application_name():
return _app_name
def get_application_config():
return _appInfo
def publish(topic: str, msg: bytes):
global _nat_publish_queue
payload_encode = base64.b64encode(msg)
payload = {
'src': "app",
'topic': topic,
'payload': str(payload_encode, encoding='utf-8')
}
data = {
'subject': 'edge.router.'+_driver_id,
'payload': msg
'subject': 'edge.router.'+_app_name,
'payload': payload
}
_nat_publish_queue.put(data)
def start_pub():
natsClientPub().start()
def _start_pub():
_natsClientPub().start()
def start_sub():
natsClientSub().start()
def _start_sub():
_natsClientSub().start()
_t_nats_pub = threading.Thread(target=start_pub)
_t_nats_pub = threading.Thread(target=_start_pub)
_t_nats_pub.setDaemon(True)
_t_nats_pub.start()
_t_nats_sub = threading.Thread(target=start_sub)
_t_nats_sub = threading.Thread(target=_start_sub)
_t_nats_sub.setDaemon(True)
_t_nats_sub.start()

View File

@ -1,109 +0,0 @@
import json
from iotedgedriverlinksdk import _deviceInfos, _driverInfo
from iotedgedriverlinksdk.edge import (add_connect_map, del_connect_map,
device_login_async, device_login_sync,
device_logout_async,
device_logout_sync, register_device,
send_message)
from iotedgedriverlinksdk.exception import (
EdgeDriverLinkDeviceConfigException, EdgeDriverLinkDeviceOfflineException,
EdgeDriverLinkDeviceProductSecretException)
class SubDevice(object):
def __init__(self, product_sn: str = '', device_sn: str = '', on_msg_callback=None):
self.device_sn = device_sn
self.product_sn = product_sn
self.product_secret = ''
self.callback = on_msg_callback
self.rrpc = None
self.online = False
if self.product_sn != '' and self.device_sn != '':
self._identity = self.product_sn+'.'+self.device_sn
def set_product_sn(self, product_sn: str):
self.product_sn = product_sn
if self.product_sn != '' and self.device_sn != '':
self._identity = self.product_sn+'.'+self.device_sn
def set_device_sn(self, device_sn: str):
self.device_sn = device_sn
if self.product_sn != '' and self.device_sn != '':
self._identity = self.product_sn+'.'+self.device_sn
def set_product_secret(self, product_secret: str):
self.product_secret = product_secret
def set_msg_callback(self, msg_callback):
self.callback = msg_callback
def set_rrpc_callback(self, rrpc_callback):
self.rrpc = rrpc_callback
def get_device_info(self):
return {
"productSN": self.product_sn,
"deviceSN": self.device_sn
}
def registerDevice(self, timeout=5):
if self.product_sn == '' or self.device_sn == '' or self.product_secret == '':
raise EdgeDriverLinkDeviceProductSecretException
register_device(self.product_sn, self.device_sn,
self.product_secret, timeout=timeout)
def logout(self, sync=False, timeout=5):
if self.online:
if sync:
device_logout_sync(product_sn=self.product_sn,
device_sn=self.device_sn,
timeout=timeout)
else:
device_logout_async(product_sn=self.product_sn,
device_sn=self.device_sn)
self.online = False
del_connect_map(self._identity)
def login(self, sync=False, timeout=5):
if self._identity == '':
raise EdgeDriverLinkDeviceConfigException
add_connect_map(self._identity, self)
if sync:
device_login_sync(product_sn=self.product_sn,
device_sn=self.device_sn, timeout=timeout)
else:
device_login_async(product_sn=self.product_sn,
device_sn=self.device_sn)
self.online = True
def publish(self, topic: str, payload: b''):
if self.online:
send_message(topic, payload, is_cached=False,
duration=0)
else:
raise EdgeDriverLinkDeviceOfflineException
class Config(object):
def __init__(self, config=None):
self.config = config
def getDriverInfo(self):
return _driverInfo
def getDeviceInfos(self):
return _deviceInfos
def getConfig():
if _driverInfo is None:
config = {"deviceList": _deviceInfos}
else:
config = {
"config": _driverInfo,
"deviceList": _deviceInfos}
return json.dumps(config)

View File

@ -1,495 +0,0 @@
from iotedgedriverlinksdk.nats import (_nat_publish_queue,
_nat_subscribe_queue, publish_nats_msg)
from iotedgedriverlinksdk.exception import (EdgeDriverLinkException,
EdgeDriverLinkOfflineException,
EdgeDriverLinkTimeoutException)
from iotedgedriverlinksdk import _driver_id, getLogger
from cachetools import TTLCache
import base64
import json
import queue
import random
import string
import threading
import time
import re
_logger = getLogger()
_action_queue_map = {}
_connect_map = {}
_cache = TTLCache(maxsize=10, ttl=30)
def get_edge_online_status():
global _cache
is_online = _cache.get('edge_status')
if is_online:
return True
return False
def add_connect_map(key: str, value):
global _connect_map
_connect_map[key] = value
def del_connect_map(key: str):
global _connect_map
_connect_map.pop(key)
def _generate_request_id():
return ''.join(random.sample(string.ascii_letters + string.digits, 16)).lower()
class _device_notify(object):
def __init__(self, callback):
self.callback = callback
def run(self, message):
self.callback(message)
_on_topo_change_callback = None
def set_on_topo_change_callback(callback):
global _on_topo_change_callback
_on_topo_change_callback = _device_notify(callback)
_on_status_change_callback = None
def set_on_status_change_callback(callback):
global _on_status_change_callback
_on_status_change_callback = _device_notify(callback)
def get_topo(timeout=5):
global _action_queue_map
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/topo/get' % (
_generate_request_id(), "123456")
get_topo = {
'RequestID': request_id,
"Params": []
}
try:
data = json.dumps(get_topo)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=5)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
return msg
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
finally:
_action_queue_map.pop(request_id)
def add_topo(product_sn, device_sn, timeout=5):
global _action_queue_map
if get_edge_online_status():
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/topo/add' % (
product_sn, device_sn)
add_topo = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
try:
data = json.dumps(add_topo)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=5)
_action_queue_map.pop(request_id)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
else:
raise EdgeDriverLinkOfflineException
def delete_topo(product_sn, device_sn, timeout=5):
global _action_queue_map
if get_edge_online_status():
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/topo/delete' % (
product_sn, device_sn)
delete_topo = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
try:
data = json.dumps(delete_topo)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=5)
_action_queue_map.pop(request_id)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
else:
raise EdgeDriverLinkOfflineException
def register_device(product_sn, device_sn, product_secret, timeout=5):
global _action_queue_map
if get_edge_online_status():
request_id = _generate_request_id()
register_data = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn,
'ProductSecret': product_secret
}
]
}
topic = '/$system/%s/%s/subdev/register' % (
product_sn, device_sn)
try:
data = json.dumps(register_data)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=timeout)
_action_queue_map.pop(request_id)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
else:
raise EdgeDriverLinkOfflineException
def device_login_async(product_sn, device_sn):
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/login' % (
product_sn, device_sn)
device_login_msg = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
data = json.dumps(device_login_msg)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
def device_logout_async(product_sn, device_sn):
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/logout' % (
product_sn, device_sn)
device_logout_msg = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
data = json.dumps(device_logout_msg)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
def device_login_sync(product_sn, device_sn, timeout=5):
global _action_queue_map
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/login' % (
product_sn, device_sn)
device_login_msg = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
try:
data = json.dumps(device_login_msg)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=timeout)
_action_queue_map.pop(request_id)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
def device_logout_sync(product_sn, device_sn, timeout=5):
global _action_queue_map
request_id = _generate_request_id()
topic = '/$system/%s/%s/subdev/logout' % (
product_sn, device_sn)
device_logout_msg = {
'RequestID': request_id,
"Params": [
{
'ProductSN': product_sn,
'DeviceSN': device_sn
}
]
}
try:
data = json.dumps(device_logout_msg)
_publish(topic=topic, payload=data.encode('utf-8'),
is_cached=False, duration=0)
q = queue.Queue()
_action_queue_map[request_id] = q
msg = q.get(timeout=timeout)
_action_queue_map.pop(request_id)
if msg['RetCode'] != 0:
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
except queue.Empty:
raise EdgeDriverLinkTimeoutException
except EdgeDriverLinkException as e:
raise e
except Exception as e:
raise e
def send_message(topic: str, payload: b'', is_cached=False, duration=0):
_publish(topic=topic, payload=payload,
is_cached=is_cached, duration=duration)
def _on_broadcast_message(message):
global _on_topo_change_callback
global _on_status_change_callback
global _action_queue_map
_logger.debug("recv message:{} " .format(str(message)))
try:
js = json.loads(message)
topic = js['topic']
data = str(base64.b64decode(js['payload']), "utf-8")
# _logger.debug("broadcast message payload: " + data)
msg = json.loads(data)
if isinstance(topic, str) and topic.startswith("/$system/"):
# on topo change callback
if topic.endswith("/subdev/topo/notify/add"):
if _on_topo_change_callback:
msg['operation'] = 'add'
_on_topo_change_callback.run(msg)
elif topic.endswith("/subdev/topo/notify/delete"):
if _on_topo_change_callback:
msg['operation'] = 'delete'
_on_topo_change_callback.run(msg)
elif topic.endswith('/subdev/enable'):
if _on_status_change_callback:
msg['operation'] = 'enable'
_on_status_change_callback.run(msg)
elif topic.endswith('/subdev/disable'):
if _on_status_change_callback:
msg['operation'] = 'disable'
_on_status_change_callback.run(msg)
else:
request_id = msg['RequestID']
if request_id in _action_queue_map:
q = _action_queue_map[request_id]
q.put(msg)
else:
_logger.debug('unknown message topic:{}'.format(topic))
return
except Exception as e:
_logger.error(e)
def _on_message(message):
global _connect_map
_logger.debug("recv message: {}".format(str(message)))
try:
js = json.loads(message)
identify = js['productSN'] + \
'.'+js['deviceSN']
topic = js['topic']
msg = base64.b64decode(js['payload'])
# _logger.debug("normal message payload: {}".format(str(msg, 'utf-8')))
if identify in _connect_map:
sub_dev = _connect_map[identify]
if isinstance(topic, str) and topic.startswith("/$system/") and topic.find("/rrpc/request/") > 0:
if sub_dev.rrpc:
_logger.debug("rrpc request: {}".format(str(message)))
resp = sub_dev.rrpc(topic, msg)
if resp:
_logger.debug("rrpc response: {}".format(str(resp)))
topic = topic.replace(
"/rrpc/request/", "/rrpc/response/", 1)
sub_dev.publish(topic, resp)
return
if sub_dev.callback:
sub_dev.callback(topic, msg)
else:
_logger.error('unknown message topic:{}'.format(topic))
return
except Exception as e:
_logger.error(e)
def _publish(topic: str, payload: b'', is_cached=False, duration=0):
try:
payload_encode = base64.b64encode(payload)
data = {
'src': 'local',
'topic': topic,
'isCatched': is_cached,
'duration': duration,
'payload': str(payload_encode, 'utf-8')
}
publish_nats_msg(data)
except Exception as e:
_logger.error(e)
raise e
def _set_edge_status():
global _cache
_cache['edge_status'] = True
def init_subscribe_handler():
while True:
msg = _nat_subscribe_queue.get()
# _logger.debug(msg)
subject = msg.subject
data = msg.data.decode()
if subject == "edge.local.broadcast":
_on_broadcast_message(data)
elif subject == "edge.state.reply":
_set_edge_status()
else:
_on_message(data)
def _get_device_list():
global _connect_map
result = []
for v in _connect_map.values():
result.append(v.get_device_info())
return result
def fetch_online_status():
min_retry_timeout = 1
max_retry_timeout = 15
retry_timeout = min_retry_timeout
while True:
device_list = _get_device_list()
data = {
'payload': {
'driverID': _driver_id,
'devices': device_list
},
'subject': 'edge.state.req'
}
_nat_publish_queue.put(data)
if get_edge_online_status():
time.sleep(max_retry_timeout)
else:
if retry_timeout < max_retry_timeout:
retry_timeout = retry_timeout + 1
time.sleep(retry_timeout)
_t_sub = threading.Thread(target=init_subscribe_handler)
_t_sub.setDaemon(True)
_t_sub.start()
_t_online = threading.Thread(target=fetch_online_status)
_t_online.setDaemon(True)
_t_online.start()

View File

@ -1,39 +0,0 @@
class BaseEdgeException(Exception):
def gatherAttrs(self):
return ",".join("{}={}"
.format(k, getattr(self, k))
for k in self.__dict__.keys())
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, self.gatherAttrs())
class EdgeDriverLinkException(BaseEdgeException):
def __init__(self, code, msg):
self.code = code
self.msg = msg
class EdgeDriverLinkTimeoutException(BaseEdgeException):
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, 'wait response timeout, please check network or edge connect state.')
class EdgeDriverLinkDeviceConfigException(BaseEdgeException):
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, 'device param error, please make sure product_sn and device_sn not null.')
class EdgeDriverLinkDeviceOfflineException(BaseEdgeException):
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, 'device offline, please login first.')
class EdgeDriverLinkOfflineException(BaseEdgeException):
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, 'edge offline, please connect first.')
class EdgeDriverLinkDeviceProductSecretException(BaseEdgeException):
def __str__(self):
return "[{}:{}]".format(self.__class__.__name__, 'product secret param error, please make sure product secret not null.')

View File

@ -6,18 +6,18 @@ if not (sys.version_info[0] == 3):
sys.exit("Link IoT Edge only support Python 3")
setup(
name='iotedge_driver_link_sdk',
version='0.0.2',
name='iotedge_application_link_sdk',
version='0.0.1',
author='ucloud.cn',
url='https://pypi.org/project/iotedge_driver_link_sdk/',
url='https://pypi.org/project/iotedge_application_link_sdk/',
author_email='joy.zhou@ucloud.cn',
packages=['iotedgedriverlinksdk'],
packages=['iotedgeapplicationlinksdk'],
platforms="any",
license='Apache 2 License',
install_requires=[
"asyncio-nats-client>=0.10.0",
"cachetools>=4.0.0"
],
description="IoT Edge Driver Link SDK",
long_description="UIoT Edge Driver Link SDK\n https://www.ucloud.cn/site/product/uiot.html"
description="IoT Stack Edge application Link SDK",
long_description="IoT Stack Edge application Link SDK\n https://www.ucloud.cn/site/product/uiot.html"
)