diff --git a/README.md b/README.md index 3066228..050f0b0 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,16 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_handle) void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle) +/** + * @brief 设置nats消息回调接口 + * + * @param nats_msg_handle: nats 消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)). + * + * @retval : void + */ + +void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle) + /** * @brief 获取网关在线状态 * @@ -212,3 +222,23 @@ edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out void log_write(log_level level, const char *format,...) +/** + * @brief 向指定 nats subject,可以发送二进制消息 + * + * @param subject: subject名称 + * @param data: 发送消息内容 + * @param dataLen: 发送消息内容长度 + * + * @retval : 成功则返回EDGE_OK + */ + +edge_status nats_publish(const char *subject, const char *data, int dataLen) + +/** + * @brief 订阅nats subject + * + * @param subject: nats subject名称 + * + * @retval : 成功则返回EDGE_OK + */ +edge_status nats_subscribe(const char *subject) \ No newline at end of file diff --git a/edge/client.c b/edge/client.c index 4d83fee..da19aef 100644 --- a/edge/client.c +++ b/edge/client.c @@ -83,6 +83,24 @@ edge_status edge_publishString(const char *topic, const char *str) return status; } +edge_status nats_publish(const char *subject, const char *data, int dataLen) +{ + edge_status status; + + char *normal_payload_base64 = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN); + if(NULL == normal_payload_base64) + { + log_write(LOG_ERROR, "normal_payload_base64 malloc fail!"); + return EDGE_NO_MEMORY; + } + memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN); + base64_encode(data, dataLen, normal_payload_base64); + + status = _publish_string(subject, normal_payload_base64); + + return status; +} + edge_status _msg_parse_str_init(msg_parse **msg_parse_str, uint32_t request_id) { *msg_parse_str = (msg_parse *)EDGE_MALLOC(sizeof(msg_parse)); diff --git a/edge/client.h b/edge/client.h index 8d572d5..1ec6bb5 100644 --- a/edge/client.h +++ b/edge/client.h @@ -231,4 +231,16 @@ edge_status edge_rrpc_check(char *topic); */ edge_status edge_rrpc_response(char *topic,char *payload, int payloadLen); +/** + * @brief 向nats subject + * + * @param subject: nats subject + * @param data: 发送数据 + * @param dataLen: 发送数据长度 + * + * @retval : 成功返回EDGE_OK + */ + +edge_status nats_publish(const char *subject, const char *data, int dataLen); + #endif diff --git a/edge/edge.c b/edge/edge.c index a982c50..1763e7c 100644 --- a/edge/edge.c +++ b/edge/edge.c @@ -26,6 +26,7 @@ natsSubscription *sub = NULL; bool edge_state = false; edge_topo_notify_handler edge_topo_handle = NULL; edge_subdev_status_handler edge_subdev_status_handle = NULL; +edge_nats_msg_handler edge_nats_msg_handle = NULL; /* global variable end */ static edge_status _edge_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure) @@ -211,6 +212,13 @@ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_hand return; } +void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle) +{ + if(NULL != nats_msg_handle) + edge_nats_msg_handle = nats_msg_handle; + return; +} + static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); @@ -417,6 +425,28 @@ static void _on_edge_state_message(natsConnection *nc, natsSubscription *sub, na return; } +static void _on_nats_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) +{ + log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); + + char *msg_base64decode = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN); + if(NULL == msg_base64decode) + { + log_write(LOG_ERROR, "msg_base64decode malloc fail!"); + return; + } + memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); + int msg_base64decodeLen = base64_decode(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), msg_base64decode); + log_write(LOG_DEBUG, "_on_local_message msg_base64decode:%s", msg_base64decode); + + if(edge_nats_msg_handle) + edge_nats_msg_handle((char *)natsMsg_GetSubject(msg), msg_base64decode, msg_base64decodeLen); + + EDGE_FREE(msg_base64decode); + return; +} + + bool edge_get_online_status(void) { return edge_state; @@ -899,3 +929,11 @@ end: return status; } +edge_status nats_subscribe(const char *subject) +{ + edge_status status; + + status = _edge_subscribe(subject, _on_nats_message, NULL); + return status; +} + diff --git a/edge/edge.h b/edge/edge.h index 4397bec..39e72d0 100644 --- a/edge/edge.h +++ b/edge/edge.h @@ -22,6 +22,9 @@ typedef void (*edge_topo_notify_handler)(topo_operation opera, char *payload); // /topic:$system/{productSN}/{deviceSN}/subdev/enable || /$system/{productSN}/{deviceSN}/subdev/delete msg handle typedef void (*edge_subdev_status_handler)(subdev_able opera,char *payload); +//nats message handle +typedef void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen); + /** * @brief 获取驱动信息 * @@ -85,6 +88,15 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_h */ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle); +/** + * @brief 设置nats消息回调接口(控制台启动禁用设备) + * + * @param nats_msg_handle: nats消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)). + * + * @retval : void + */ +void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle); + /** * @brief 获取子设备拓扑 * @@ -115,4 +127,13 @@ edge_status edge_add_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms */ edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms); +/** + * @brief 订阅nats subject + * + * @param subject: nats subject + * + * @retval : 成功则返回EDGE_OK + */ +edge_status nats_subscribe(const char *subject); + #endif diff --git a/samples/iotstack_driver_test.c b/samples/iotstack_driver_test.c index 612543f..d20a171 100644 --- a/samples/iotstack_driver_test.c +++ b/samples/iotstack_driver_test.c @@ -27,6 +27,12 @@ static void edge_subdev_status_handler_user(subdev_able opera,char *payload) return; } +static void edge_nats_msg_handler_user(char *topic, char *payload, int payloadLen) +{ + log_write(LOG_INFO, "topic:%s payload:%s payloadLen:%d", topic, payload, payloadLen); + return; +} + int main(int argc, char **argv) { edge_status status = EDGE_OK; @@ -66,6 +72,16 @@ int main(int argc, char **argv) edge_set_subdev_status_handle(edge_subdev_status_handler_user); + edge_set_nats_msg_handle(edge_nats_msg_handler_user); + + //subscribe nats subject + status = nats_subscribe("/a/b/c"); + if(EDGE_OK != status) + { + log_write(LOG_ERROR, "nats_subscribe nats topic fail"); + return EDGE_ERR; + } + // 解析驱动配置 /* { @@ -194,6 +210,12 @@ int main(int argc, char **argv) { log_write(LOG_ERROR, "edge_publish fail"); goto end; + } + status = nats_publish("/a/b/c",time_stamp,strlen(time_stamp)); + if(EDGE_OK != status) + { + log_write(LOG_ERROR, "edge_publish nats subject fail"); + goto end; } }