From e9632b8575382b34927772e742da44b6a4682e37 Mon Sep 17 00:00:00 2001 From: "ethan.du" Date: Tue, 11 May 2021 14:31:36 +0800 Subject: [PATCH] support pub sub nats subject --- README.md | 30 +++++++++++++++++++--- app/app.c | 65 ++++++++++++++++++++++++++++++++++++++++------- app/app.h | 4 ++- samples/samples.c | 23 ++++++++++++++++- 4 files changed, 108 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index f7ed3f4..4684cb2 100644 --- a/README.md +++ b/README.md @@ -71,14 +71,16 @@ char *app_get_info(void) /** * @brief 注册下行消息回调函数 * - * @param handle: 下行消息回调函数指针 + * @param normal_handler: 下行消息回调函数指针 * - * @param handle: rrpc消息回调函数指针 + * @param rrpc_handler: rrpc消息回调函数指针 + * + * @param nats_msg_handle: nats消息回调函数指针 * * @retval : 成功则返回APP_OK */ -app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler) +app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle) /** * @brief 发送消息,可发送二进制流 @@ -104,6 +106,28 @@ app_status app_publish(const char *topic, const char *data, int dataLen) */ app_status app_publishString(const char *topic, const char *data) +/** + * @brief 订阅nats subject + * + * @param subject: nats subject名称 + * + * @retval : 成功则返回APP_OK + */ +app_status nats_subscribe(const char *subject) + +/** + * @brief 向nats subject发送消息 + * + * @param subject: nats subject名称 + * + * @param data: 发送内容 + * + * @param dataLen: 发送内容长度 + * + * @retval : 成功则返回APP_OK + */ +app_status nats_publish(const char *subject, const char *data, int dataLen) + /** * @brief 记录日志 * diff --git a/app/app.c b/app/app.c index 5ec1436..3a8da9e 100644 --- a/app/app.c +++ b/app/app.c @@ -9,11 +9,12 @@ char *app_deviceSN = NULL; char *app_info = NULL; msg_handler normal_cb = NULL; msg_handler rrpc_cb = NULL; +msg_handler app_nats_msg_handle = NULL; char edge_router_subject[NATS_SUBJECT_MAX_LEN] = {0}; -app_status nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure) +app_status _nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure) { app_status status = APP_OK; @@ -22,11 +23,11 @@ app_status nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosur return status; } -app_status nats_publish(const char *topic, const char *str) +app_status _nats_publish(const char *subject, const char *str) { app_status status = APP_OK; - natsConnection_PublishString(conn, topic, str); + natsConnection_PublishString(conn, subject, str); natsConnection_Flush(conn); return status; @@ -87,7 +88,7 @@ void log_write(log_level level, const char *format,...) replace_str(msg_str_rep, msg_str, "\"", "\\\""); snprintf(log_msg, NATS_MSG_MAX_LEN, LOG_UPLOAD_FORMAT, app_get_name(), log_lev[level], msg_str_rep, stamp.tv_sec); - nats_publish(EDGE_LOG_UPLOAD_SUBJECT, log_msg); + _nats_publish(EDGE_LOG_UPLOAD_SUBJECT, log_msg); APP_FREE(msg_str_rep); APP_FREE(msg_str); @@ -275,18 +276,43 @@ end: return; } -app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler) +static void _on_nats_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) +{ + log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); + + char *msg_base64decode = (char *)APP_MALLOC(NATS_MSG_MAX_LEN); + if(NULL == msg_base64decode) + { + log_write(LOG_ERROR, "msg_base64decode malloc fail!"); + return; + } + memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); + int msg_base64decodeLen = base64_decode(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), msg_base64decode); + log_write(LOG_DEBUG, "_on_local_message msg_base64decode:%s", msg_base64decode); + + if(app_nats_msg_handle) + app_nats_msg_handle((char *)natsMsg_GetSubject(msg), msg_base64decode, msg_base64decodeLen); + + APP_FREE(msg_base64decode); + return; +} + + +app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle) { app_status status = APP_OK; char edge_app_subject[NATS_SUBJECT_MAX_LEN] = {0}; if(normal_handler != NULL) normal_cb = normal_handler; - rrpc_cb = rrpc_handler; + if(rrpc_handler != NULL) + rrpc_cb = rrpc_handler; + if(nats_msg_handle != NULL) + app_nats_msg_handle = nats_msg_handle; snprintf(edge_app_subject, NATS_SUBJECT_MAX_LEN, EDGE_APP_SUBJECT_FORMAT, app_get_name()); log_write(LOG_DEBUG, "edge_app_subject:%s",edge_app_subject); - status = nats_subscribe(edge_app_subject, _handle_message, NULL); + status = _nats_subscribe(edge_app_subject, _handle_message, NULL); if(APP_OK != status) { log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_app_subject, status); @@ -324,7 +350,7 @@ app_status app_publish(const char *topic, const char *data, int dataLen) memset(normal_msg, 0, NATS_MSG_MAX_LEN); snprintf(normal_msg, NATS_MSG_MAX_LEN, NORMAL_MSG_FORMAT, topic, normal_payload_base64, app_get_name()); - status = nats_publish(edge_router_subject, normal_msg); + status = _nats_publish(edge_router_subject, normal_msg); APP_FREE(normal_payload_base64); APP_FREE(normal_msg); @@ -354,7 +380,7 @@ static void _handle_status_sync(union sigval v) memset(sync_msg, 0, NATS_MSG_MAX_LEN); snprintf(sync_msg, NATS_MSG_MAX_LEN, STATUS_SYNC_FORMAT, app_get_name()); log_write(LOG_DEBUG, "sync_msg:%s",sync_msg); - status = nats_publish(EDGE_APP_STATUS_SUBJECT, sync_msg); + status = _nats_publish(EDGE_APP_STATUS_SUBJECT, sync_msg); if(APP_OK != status) { log_write(LOG_ERROR, "publish sync msg fail!"); @@ -433,8 +459,29 @@ app_status app_rrpc_response(char *topic,char *payload, int payloadLen) return APP_OK; } +app_status nats_subscribe(const char *subject) +{ + app_status status; + + status = _nats_subscribe(subject, _on_nats_message, NULL); + return status; +} +app_status nats_publish(const char *subject, const char *data, int dataLen) +{ + app_status status; + char *normal_payload_base64 = (char *)APP_MALLOC(NATS_MSG_MAX_LEN); + if(NULL == normal_payload_base64) + { + log_write(LOG_ERROR, "normal_payload_base64 malloc fail!"); + return APP_NO_MEMORY; + } + memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN); + base64_encode(data, dataLen, normal_payload_base64); + status = _nats_publish(subject, normal_payload_base64); + return status; +} diff --git a/app/app.h b/app/app.h index 5677b9c..248465f 100644 --- a/app/app.h +++ b/app/app.h @@ -131,11 +131,13 @@ char *app_get_name(); char *app_get_productSN(); char *app_get_deviceSN(); char *app_get_info(); -app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler); +app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle); app_status app_publish(const char *topic, const char *data, int dataLen); app_status app_publishString(const char *topic, const char *str); app_status app_common_init(void); void log_write(log_level level, const char *format,...); app_status app_rrpc_response(char *topic,char *payload, int payloadLen); +app_status nats_subscribe(const char *subject); +app_status nats_publish(const char *subject, const char *data, int dataLen); #endif diff --git a/samples/samples.c b/samples/samples.c index 1ce2777..b74c864 100644 --- a/samples/samples.c +++ b/samples/samples.c @@ -16,6 +16,12 @@ void app_rrpc_msg_handler(char *topic, char *payload, int payloadLen) return; } +void app_nats_msg_handler_user(char *topic, char *payload, int payloadLen) +{ + log_write(LOG_INFO, "topic:%s payload:%s payloadLen:%d", topic, payload, payloadLen); + return; +} + int main(int argc, char **argv) { app_status status = APP_OK; @@ -35,7 +41,7 @@ int main(int argc, char **argv) log_write(LOG_INFO, "app info:%s",app_get_info()); //注册回调函数 - status = app_register_cb(app_normal_msg_handler, app_rrpc_msg_handler); + status = app_register_cb(app_normal_msg_handler, app_rrpc_msg_handler, app_nats_msg_handler_user); if(APP_OK != status) { log_write(LOG_ERROR, "app_register_cb fail"); @@ -60,6 +66,14 @@ int main(int argc, char **argv) snprintf(topic_str, 100, topic_format->valuestring, app_get_productSN(), app_get_deviceSN()); } + //subscribe nats subject + status = nats_subscribe("/a/b/c"); + if(APP_OK != status) + { + log_write(LOG_ERROR, "nats_subscribe nats subject fail"); + return APP_ERR; + } + while(1) { sleep(5); @@ -75,6 +89,13 @@ int main(int argc, char **argv) log_write(LOG_ERROR, "app_publish fail"); goto end; } + //publish msg to subscribed nats subject + status = nats_publish("/a/b/c",time_stamp,strlen(time_stamp)); + if(APP_OK != status) + { + log_write(LOG_ERROR, "edge_publish nats subject fail"); + goto end; + } } end: