fix msg callback

This commit is contained in:
ethan.du 2021-01-25 13:28:31 +08:00
parent 17edc680a1
commit 390689219d
1 changed files with 38 additions and 40 deletions

View File

@ -211,19 +211,23 @@ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_hand
return; return;
} }
static void _on_local_message(natsMsg *msg) static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{ {
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg));
if (!msg_json) if (!msg_json)
{ {
log_write(LOG_ERROR, "on local message json parse error: [%s]",cJSON_GetErrorPtr()); log_write(LOG_ERROR, "on local message json parse error: [%s]",cJSON_GetErrorPtr());
natsMsg_Destroy(msg);
return; return;
} }
cJSON *topic = cJSON_GetObjectItem(msg_json, "topic"); cJSON *topic = cJSON_GetObjectItem(msg_json, "topic");
if(NULL == topic) if(NULL == topic)
{ {
log_write(LOG_ERROR, "cannot find topic, illegal msg"); log_write(LOG_ERROR, "cannot find topic, illegal msg");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring); log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring);
@ -232,7 +236,8 @@ static void _on_local_message(natsMsg *msg)
if(NULL == device_sn_tmp) if(NULL == device_sn_tmp)
{ {
log_write(LOG_ERROR, "cannot find topic, illegal msg"); log_write(LOG_ERROR, "cannot find topic, illegal msg");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
log_write(LOG_DEBUG, "_on_local_message deviceSN:%s", device_sn_tmp->valuestring); log_write(LOG_DEBUG, "_on_local_message deviceSN:%s", device_sn_tmp->valuestring);
@ -242,6 +247,7 @@ static void _on_local_message(natsMsg *msg)
{ {
log_write(LOG_ERROR, "cannot find payload, illegal msg"); log_write(LOG_ERROR, "cannot find payload, illegal msg");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
log_write(LOG_DEBUG, "_on_local_message msg_base64code:%s", msg_base64code->valuestring); log_write(LOG_DEBUG, "_on_local_message msg_base64code:%s", msg_base64code->valuestring);
@ -249,7 +255,8 @@ static void _on_local_message(natsMsg *msg)
if(NULL == msg_base64decode) if(NULL == msg_base64decode)
{ {
log_write(LOG_ERROR, "msg_base64decode malloc fail!"); log_write(LOG_ERROR, "msg_base64decode malloc fail!");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
@ -271,23 +278,28 @@ static void _on_local_message(natsMsg *msg)
end: end:
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
EDGE_FREE(msg_base64decode); EDGE_FREE(msg_base64decode);
natsMsg_Destroy(msg);
return; return;
} }
static void _on_broadcast_message(natsMsg *msg) static void _on_broadcast_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{ {
log_write(LOG_DEBUG, "Received broadcast msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg));
if (!msg_json) if (!msg_json)
{ {
log_write(LOG_ERROR, "on broadcast message json parse msg error: [%s]",cJSON_GetErrorPtr()); log_write(LOG_ERROR, "on broadcast message json parse msg error: [%s]",cJSON_GetErrorPtr());
natsMsg_Destroy(msg);
return; return;
} }
cJSON *topic = cJSON_GetObjectItem(msg_json, "topic"); cJSON *topic = cJSON_GetObjectItem(msg_json, "topic");
if(NULL == topic) if(NULL == topic)
{ {
log_write(LOG_ERROR, "cannot find topic, illegal msg"); log_write(LOG_ERROR, "cannot find topic, illegal msg");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring); log_write(LOG_DEBUG, "_on_broadcast_message topic:%s", topic->valuestring);
@ -295,7 +307,8 @@ static void _on_broadcast_message(natsMsg *msg)
if(NULL == msg_base64code) if(NULL == msg_base64code)
{ {
log_write(LOG_ERROR, "cannot find payload, illegal msg"); log_write(LOG_ERROR, "cannot find payload, illegal msg");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
log_write(LOG_DEBUG, "_on_broadcast_message msg_base64code:%s", msg_base64code->valuestring); log_write(LOG_DEBUG, "_on_broadcast_message msg_base64code:%s", msg_base64code->valuestring);
@ -303,7 +316,8 @@ static void _on_broadcast_message(natsMsg *msg)
if(NULL == msg_base64decode) if(NULL == msg_base64decode)
{ {
log_write(LOG_ERROR, "msg_base64decode malloc fail!"); log_write(LOG_ERROR, "msg_base64decode malloc fail!");
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
natsMsg_Destroy(msg);
return; return;
} }
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN); memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
@ -356,21 +370,26 @@ static void _on_broadcast_message(natsMsg *msg)
end: end:
cJSON_Delete(msg_json); cJSON_Delete(msg_json);
EDGE_FREE(msg_base64decode); EDGE_FREE(msg_base64decode);
natsMsg_Destroy(msg);
return; return;
} }
static void _on_edge_state_message(natsMsg *msg) static void _on_edge_state_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{ {
log_write(LOG_DEBUG, "Received state msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg)); cJSON *msg_json = cJSON_Parse(natsMsg_GetData(msg));
if (!msg_json) if (!msg_json)
{ {
log_write(LOG_ERROR, "on broadcast message json parse error: [%s]",cJSON_GetErrorPtr()); log_write(LOG_ERROR, "on broadcast message json parse error: [%s]",cJSON_GetErrorPtr());
natsMsg_Destroy(msg);
return; return;
} }
cJSON *state = cJSON_GetObjectItem(msg_json, "state"); cJSON *state = cJSON_GetObjectItem(msg_json, "state");
if(NULL == state) if(NULL == state)
{ {
log_write(LOG_ERROR, "cannot find state, illegal msg"); log_write(LOG_ERROR, "cannot find state, illegal msg");
natsMsg_Destroy(msg);
return; return;
} }
char *state_str = cJSON_Print(state); char *state_str = cJSON_Print(state);
@ -384,7 +403,8 @@ static void _on_edge_state_message(natsMsg *msg)
edge_state = false; edge_state = false;
} }
EDGE_FREE(state); EDGE_FREE(state);
natsMsg_Destroy(msg);
return; return;
} }
@ -431,28 +451,6 @@ static int _subdev_client_match(void *a, void *b)
return false; return false;
} }
//subject: edge.local.driverId msg handle
static void _recv_msg_handle(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
log_write(LOG_DEBUG, "Received msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
if(0 == strncmp(natsMsg_GetSubject(msg), edge_local_subject, strlen(edge_local_subject)))
{
_on_local_message(msg);
}
else if(0 == strncmp(natsMsg_GetSubject(msg), EDGE_LOCAL_BROADCAST_SUBJECT, strlen(EDGE_LOCAL_BROADCAST_SUBJECT)))
{
_on_broadcast_message(msg);
}
else if(0 == strncmp(natsMsg_GetSubject(msg), EDGE_STATE_REPLY_SUBJECT, strlen(EDGE_STATE_REPLY_SUBJECT)))
{
_on_edge_state_message(msg);
}
natsMsg_Destroy(msg);
return;
}
static void _handle_status_sync(int signo) static void _handle_status_sync(int signo)
{ {
edge_status status; edge_status status;
@ -613,7 +611,7 @@ edge_status edge_common_init(void)
log_write(LOG_DEBUG, "edge_local_subject:%s",edge_local_subject); log_write(LOG_DEBUG, "edge_local_subject:%s",edge_local_subject);
//sub edge.local.driverId //sub edge.local.driverId
status = _edge_subscribe(edge_local_subject, _recv_msg_handle, NULL); status = _edge_subscribe(edge_local_subject, _on_local_message, NULL);
if(EDGE_OK != status) if(EDGE_OK != status)
{ {
log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_local_subject, status); log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_local_subject, status);
@ -621,7 +619,7 @@ edge_status edge_common_init(void)
} }
//edge.local.broadcast //edge.local.broadcast
status = _edge_subscribe(EDGE_LOCAL_BROADCAST_SUBJECT, _recv_msg_handle, NULL); status = _edge_subscribe(EDGE_LOCAL_BROADCAST_SUBJECT, _on_broadcast_message, NULL);
if(EDGE_OK != status) if(EDGE_OK != status)
{ {
log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_LOCAL_BROADCAST_SUBJECT, status); log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_LOCAL_BROADCAST_SUBJECT, status);
@ -629,7 +627,7 @@ edge_status edge_common_init(void)
} }
//edge.state.reply //edge.state.reply
status = _edge_subscribe(EDGE_STATE_REPLY_SUBJECT, _recv_msg_handle, NULL); status = _edge_subscribe(EDGE_STATE_REPLY_SUBJECT, _on_edge_state_message, NULL);
if(EDGE_OK != status) if(EDGE_OK != status)
{ {
log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_STATE_REPLY_SUBJECT, status); log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", EDGE_STATE_REPLY_SUBJECT, status);