support pub sub nats subject

This commit is contained in:
ethan.du 2021-05-11 14:31:36 +08:00
parent 9820af6298
commit e9632b8575
4 changed files with 108 additions and 14 deletions

View File

@ -71,14 +71,16 @@ char *app_get_info(void)
/** /**
* @brief 注册下行消息回调函数 * @brief 注册下行消息回调函数
* *
* @param handle: 下行消息回调函数指针 * @param normal_handler: 下行消息回调函数指针
* *
* @param handle: rrpc消息回调函数指针 * @param rrpc_handler: rrpc消息回调函数指针
*
* @param nats_msg_handle: nats消息回调函数指针
* *
* @retval : 成功则返回APP_OK * @retval : 成功则返回APP_OK
*/ */
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler) app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle)
/** /**
* @brief 发送消息,可发送二进制流 * @brief 发送消息,可发送二进制流
@ -104,6 +106,28 @@ app_status app_publish(const char *topic, const char *data, int dataLen)
*/ */
app_status app_publishString(const char *topic, const char *data) app_status app_publishString(const char *topic, const char *data)
/**
* @brief 订阅nats subject
*
* @param subject: nats subject名称
*
* @retval : 成功则返回APP_OK
*/
app_status nats_subscribe(const char *subject)
/**
* @brief 向nats subject发送消息
*
* @param subject: nats subject名称
*
* @param data: 发送内容
*
* @param dataLen: 发送内容长度
*
* @retval : 成功则返回APP_OK
*/
app_status nats_publish(const char *subject, const char *data, int dataLen)
/** /**
* @brief 记录日志 * @brief 记录日志
* *

View File

@ -9,11 +9,12 @@ char *app_deviceSN = NULL;
char *app_info = NULL; char *app_info = NULL;
msg_handler normal_cb = NULL; msg_handler normal_cb = NULL;
msg_handler rrpc_cb = NULL; msg_handler rrpc_cb = NULL;
msg_handler app_nats_msg_handle = NULL;
char edge_router_subject[NATS_SUBJECT_MAX_LEN] = {0}; char edge_router_subject[NATS_SUBJECT_MAX_LEN] = {0};
app_status nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure) app_status _nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure)
{ {
app_status status = APP_OK; app_status status = APP_OK;
@ -22,11 +23,11 @@ app_status nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosur
return status; return status;
} }
app_status nats_publish(const char *topic, const char *str) app_status _nats_publish(const char *subject, const char *str)
{ {
app_status status = APP_OK; app_status status = APP_OK;
natsConnection_PublishString(conn, topic, str); natsConnection_PublishString(conn, subject, str);
natsConnection_Flush(conn); natsConnection_Flush(conn);
return status; return status;
@ -87,7 +88,7 @@ void log_write(log_level level, const char *format,...)
replace_str(msg_str_rep, msg_str, "\"", "\\\""); replace_str(msg_str_rep, msg_str, "\"", "\\\"");
snprintf(log_msg, NATS_MSG_MAX_LEN, LOG_UPLOAD_FORMAT, app_get_name(), log_lev[level], msg_str_rep, stamp.tv_sec); snprintf(log_msg, NATS_MSG_MAX_LEN, LOG_UPLOAD_FORMAT, app_get_name(), log_lev[level], msg_str_rep, stamp.tv_sec);
nats_publish(EDGE_LOG_UPLOAD_SUBJECT, log_msg); _nats_publish(EDGE_LOG_UPLOAD_SUBJECT, log_msg);
APP_FREE(msg_str_rep); APP_FREE(msg_str_rep);
APP_FREE(msg_str); APP_FREE(msg_str);
@ -275,18 +276,43 @@ end:
return; return;
} }
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler) static void _on_nats_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
char *msg_base64decode = (char *)APP_MALLOC(NATS_MSG_MAX_LEN);
if(NULL == msg_base64decode)
{
log_write(LOG_ERROR, "msg_base64decode malloc fail!");
return;
}
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
int msg_base64decodeLen = base64_decode(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), msg_base64decode);
log_write(LOG_DEBUG, "_on_local_message msg_base64decode:%s", msg_base64decode);
if(app_nats_msg_handle)
app_nats_msg_handle((char *)natsMsg_GetSubject(msg), msg_base64decode, msg_base64decodeLen);
APP_FREE(msg_base64decode);
return;
}
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle)
{ {
app_status status = APP_OK; app_status status = APP_OK;
char edge_app_subject[NATS_SUBJECT_MAX_LEN] = {0}; char edge_app_subject[NATS_SUBJECT_MAX_LEN] = {0};
if(normal_handler != NULL) if(normal_handler != NULL)
normal_cb = normal_handler; normal_cb = normal_handler;
rrpc_cb = rrpc_handler; if(rrpc_handler != NULL)
rrpc_cb = rrpc_handler;
if(nats_msg_handle != NULL)
app_nats_msg_handle = nats_msg_handle;
snprintf(edge_app_subject, NATS_SUBJECT_MAX_LEN, EDGE_APP_SUBJECT_FORMAT, app_get_name()); snprintf(edge_app_subject, NATS_SUBJECT_MAX_LEN, EDGE_APP_SUBJECT_FORMAT, app_get_name());
log_write(LOG_DEBUG, "edge_app_subject:%s",edge_app_subject); log_write(LOG_DEBUG, "edge_app_subject:%s",edge_app_subject);
status = nats_subscribe(edge_app_subject, _handle_message, NULL); status = _nats_subscribe(edge_app_subject, _handle_message, NULL);
if(APP_OK != status) if(APP_OK != status)
{ {
log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_app_subject, status); log_write(LOG_ERROR, "edge_subscribe %s fail! status:%d", edge_app_subject, status);
@ -324,7 +350,7 @@ app_status app_publish(const char *topic, const char *data, int dataLen)
memset(normal_msg, 0, NATS_MSG_MAX_LEN); memset(normal_msg, 0, NATS_MSG_MAX_LEN);
snprintf(normal_msg, NATS_MSG_MAX_LEN, NORMAL_MSG_FORMAT, topic, normal_payload_base64, app_get_name()); snprintf(normal_msg, NATS_MSG_MAX_LEN, NORMAL_MSG_FORMAT, topic, normal_payload_base64, app_get_name());
status = nats_publish(edge_router_subject, normal_msg); status = _nats_publish(edge_router_subject, normal_msg);
APP_FREE(normal_payload_base64); APP_FREE(normal_payload_base64);
APP_FREE(normal_msg); APP_FREE(normal_msg);
@ -354,7 +380,7 @@ static void _handle_status_sync(union sigval v)
memset(sync_msg, 0, NATS_MSG_MAX_LEN); memset(sync_msg, 0, NATS_MSG_MAX_LEN);
snprintf(sync_msg, NATS_MSG_MAX_LEN, STATUS_SYNC_FORMAT, app_get_name()); snprintf(sync_msg, NATS_MSG_MAX_LEN, STATUS_SYNC_FORMAT, app_get_name());
log_write(LOG_DEBUG, "sync_msg:%s",sync_msg); log_write(LOG_DEBUG, "sync_msg:%s",sync_msg);
status = nats_publish(EDGE_APP_STATUS_SUBJECT, sync_msg); status = _nats_publish(EDGE_APP_STATUS_SUBJECT, sync_msg);
if(APP_OK != status) if(APP_OK != status)
{ {
log_write(LOG_ERROR, "publish sync msg fail!"); log_write(LOG_ERROR, "publish sync msg fail!");
@ -433,8 +459,29 @@ app_status app_rrpc_response(char *topic,char *payload, int payloadLen)
return APP_OK; return APP_OK;
} }
app_status nats_subscribe(const char *subject)
{
app_status status;
status = _nats_subscribe(subject, _on_nats_message, NULL);
return status;
}
app_status nats_publish(const char *subject, const char *data, int dataLen)
{
app_status status;
char *normal_payload_base64 = (char *)APP_MALLOC(NATS_MSG_MAX_LEN);
if(NULL == normal_payload_base64)
{
log_write(LOG_ERROR, "normal_payload_base64 malloc fail!");
return APP_NO_MEMORY;
}
memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN);
base64_encode(data, dataLen, normal_payload_base64);
status = _nats_publish(subject, normal_payload_base64);
return status;
}

View File

@ -131,11 +131,13 @@ char *app_get_name();
char *app_get_productSN(); char *app_get_productSN();
char *app_get_deviceSN(); char *app_get_deviceSN();
char *app_get_info(); char *app_get_info();
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler); app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler, msg_handler nats_msg_handle);
app_status app_publish(const char *topic, const char *data, int dataLen); app_status app_publish(const char *topic, const char *data, int dataLen);
app_status app_publishString(const char *topic, const char *str); app_status app_publishString(const char *topic, const char *str);
app_status app_common_init(void); app_status app_common_init(void);
void log_write(log_level level, const char *format,...); void log_write(log_level level, const char *format,...);
app_status app_rrpc_response(char *topic,char *payload, int payloadLen); app_status app_rrpc_response(char *topic,char *payload, int payloadLen);
app_status nats_subscribe(const char *subject);
app_status nats_publish(const char *subject, const char *data, int dataLen);
#endif #endif

View File

@ -16,6 +16,12 @@ void app_rrpc_msg_handler(char *topic, char *payload, int payloadLen)
return; return;
} }
void app_nats_msg_handler_user(char *topic, char *payload, int payloadLen)
{
log_write(LOG_INFO, "topic:%s payload:%s payloadLen:%d", topic, payload, payloadLen);
return;
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
app_status status = APP_OK; app_status status = APP_OK;
@ -35,7 +41,7 @@ int main(int argc, char **argv)
log_write(LOG_INFO, "app info:%s",app_get_info()); log_write(LOG_INFO, "app info:%s",app_get_info());
//注册回调函数 //注册回调函数
status = app_register_cb(app_normal_msg_handler, app_rrpc_msg_handler); status = app_register_cb(app_normal_msg_handler, app_rrpc_msg_handler, app_nats_msg_handler_user);
if(APP_OK != status) if(APP_OK != status)
{ {
log_write(LOG_ERROR, "app_register_cb fail"); log_write(LOG_ERROR, "app_register_cb fail");
@ -60,6 +66,14 @@ int main(int argc, char **argv)
snprintf(topic_str, 100, topic_format->valuestring, app_get_productSN(), app_get_deviceSN()); snprintf(topic_str, 100, topic_format->valuestring, app_get_productSN(), app_get_deviceSN());
} }
//subscribe nats subject
status = nats_subscribe("/a/b/c");
if(APP_OK != status)
{
log_write(LOG_ERROR, "nats_subscribe nats subject fail");
return APP_ERR;
}
while(1) while(1)
{ {
sleep(5); sleep(5);
@ -75,6 +89,13 @@ int main(int argc, char **argv)
log_write(LOG_ERROR, "app_publish fail"); log_write(LOG_ERROR, "app_publish fail");
goto end; goto end;
} }
//publish msg to subscribed nats subject
status = nats_publish("/a/b/c",time_stamp,strlen(time_stamp));
if(APP_OK != status)
{
log_write(LOG_ERROR, "edge_publish nats subject fail");
goto end;
}
} }
end: end: