From 390689219d5eaf35184a1d53b70e4d90b8175b72 Mon Sep 17 00:00:00 2001 From: "ethan.du" Date: Mon, 25 Jan 2021 13:28:31 +0800 Subject: [PATCH] fix msg callback --- edge/edge.c | 78 ++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/edge/edge.c b/edge/edge.c index 1b35437..90b02cf 100644 --- a/edge/edge.c +++ b/edge/edge.c @@ -211,19 +211,23 @@ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_hand return; } -static void _on_local_message(natsMsg *msg) +static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { + log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); + cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); if (!msg_json) { - log_write(LOG_ERROR, "on local message json parse error: [%s]",cJSON_GetErrorPtr()); + log_write(LOG_ERROR, "on local message json parse error: [%s]",cJSON_GetErrorPtr()); + natsMsg_Destroy(msg); return; } cJSON *topic = cJSON_GetObjectItem(msg_json, "topic"); if(NULL == topic) { log_write(LOG_ERROR, "cannot find topic, illegal msg"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring); @@ -232,7 +236,8 @@ static void _on_local_message(natsMsg *msg) if(NULL == device_sn_tmp) { log_write(LOG_ERROR, "cannot find topic, illegal msg"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } log_write(LOG_DEBUG, "_on_local_message deviceSN:%s", device_sn_tmp->valuestring); @@ -242,6 +247,7 @@ static void _on_local_message(natsMsg *msg) { log_write(LOG_ERROR, "cannot find payload, illegal msg"); cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } log_write(LOG_DEBUG, "_on_local_message msg_base64code:%s", msg_base64code->valuestring); @@ -249,7 +255,8 @@ static void _on_local_message(natsMsg *msg) if(NULL == msg_base64decode) { log_write(LOG_ERROR, "msg_base64decode malloc fail!"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); @@ -271,23 +278,28 @@ static void _on_local_message(natsMsg *msg) end: cJSON_Delete(msg_json); - EDGE_FREE(msg_base64decode); + EDGE_FREE(msg_base64decode); + natsMsg_Destroy(msg); return; } -static void _on_broadcast_message(natsMsg *msg) +static void _on_broadcast_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { + log_write(LOG_DEBUG, "Received broadcast msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); + cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); if (!msg_json) { - log_write(LOG_ERROR, "on broadcast message json parse msg error: [%s]",cJSON_GetErrorPtr()); + log_write(LOG_ERROR, "on broadcast message json parse msg error: [%s]",cJSON_GetErrorPtr()); + natsMsg_Destroy(msg); return; } cJSON *topic = cJSON_GetObjectItem(msg_json, "topic"); if(NULL == topic) { log_write(LOG_ERROR, "cannot find topic, illegal msg"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring); @@ -295,7 +307,8 @@ static void _on_broadcast_message(natsMsg *msg) if(NULL == msg_base64code) { log_write(LOG_ERROR, "cannot find payload, illegal msg"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } log_write(LOG_DEBUG, "_on_broadcast_message msg_base64code:%s", msg_base64code->valuestring); @@ -303,7 +316,8 @@ static void _on_broadcast_message(natsMsg *msg) if(NULL == msg_base64decode) { log_write(LOG_ERROR, "msg_base64decode malloc fail!"); - cJSON_Delete(msg_json); + cJSON_Delete(msg_json); + natsMsg_Destroy(msg); return; } memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); @@ -356,21 +370,26 @@ static void _on_broadcast_message(natsMsg *msg) end: cJSON_Delete(msg_json); EDGE_FREE(msg_base64decode); + natsMsg_Destroy(msg); return; } -static void _on_edge_state_message(natsMsg *msg) +static void _on_edge_state_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { + log_write(LOG_DEBUG, "Received state msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); + cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); if (!msg_json) { - log_write(LOG_ERROR, "on broadcast message json parse error: [%s]",cJSON_GetErrorPtr()); + log_write(LOG_ERROR, "on broadcast message json parse error: [%s]",cJSON_GetErrorPtr()); + natsMsg_Destroy(msg); return; } cJSON *state = cJSON_GetObjectItem(msg_json, "state"); if(NULL == state) { - log_write(LOG_ERROR, "cannot find state, illegal msg"); + log_write(LOG_ERROR, "cannot find state, illegal msg"); + natsMsg_Destroy(msg); return; } char *state_str = cJSON_Print(state); @@ -384,7 +403,8 @@ static void _on_edge_state_message(natsMsg *msg) edge_state = false; } - EDGE_FREE(state); + EDGE_FREE(state); + natsMsg_Destroy(msg); return; } @@ -431,28 +451,6 @@ static int _subdev_client_match(void *a, void *b) return false; } -//subject: edge.local.driverId msg handle -static void _recv_msg_handle(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) -{ - log_write(LOG_DEBUG, "Received msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); - - if(0 == strncmp(natsMsg_GetSubject(msg), edge_local_subject, strlen(edge_local_subject))) - { - _on_local_message(msg); - } - else if(0 == strncmp(natsMsg_GetSubject(msg), EDGE_LOCAL_BROADCAST_SUBJECT, strlen(EDGE_LOCAL_BROADCAST_SUBJECT))) - { - _on_broadcast_message(msg); - } - else if(0 == strncmp(natsMsg_GetSubject(msg), EDGE_STATE_REPLY_SUBJECT, strlen(EDGE_STATE_REPLY_SUBJECT))) - { - _on_edge_state_message(msg); - } - - natsMsg_Destroy(msg); - return; -} - static void _handle_status_sync(int signo) { edge_status status; @@ -613,7 +611,7 @@ edge_status edge_common_init(void) log_write(LOG_DEBUG, "edge_local_subject:%s",edge_local_subject); //sub edge.local.driverId - status = _edge_subscribe(edge_local_subject, _recv_msg_handle, NULL); + status = _edge_subscribe(edge_local_subject, _on_local_message, NULL); if(EDGE_OK != status) { log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_local_subject, status); @@ -621,7 +619,7 @@ edge_status edge_common_init(void) } //edge.local.broadcast - status = _edge_subscribe(EDGE_LOCAL_BROADCAST_SUBJECT, _recv_msg_handle, NULL); + status = _edge_subscribe(EDGE_LOCAL_BROADCAST_SUBJECT, _on_broadcast_message, NULL); if(EDGE_OK != status) { log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_LOCAL_BROADCAST_SUBJECT, status); @@ -629,7 +627,7 @@ edge_status edge_common_init(void) } //edge.state.reply - status = _edge_subscribe(EDGE_STATE_REPLY_SUBJECT, _recv_msg_handle, NULL); + status = _edge_subscribe(EDGE_STATE_REPLY_SUBJECT, _on_edge_state_message, NULL); if(EDGE_OK != status) { log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_STATE_REPLY_SUBJECT, status);