add rrpc & fix none config

This commit is contained in:
jamesliu 2021-01-29 10:37:42 +08:00
parent 9809fe4350
commit e29ddeff0b
3 changed files with 65 additions and 14 deletions

View File

@ -7,7 +7,10 @@ char *app_name = NULL;
char *app_productSN = NULL;
char *app_deviceSN = NULL;
char *app_info = NULL;
msg_handler cb = NULL;
msg_handler normal_cb = NULL;
msg_handler rrpc_cb = NULL;
char edge_router_subject[NATS_SUBJECT_MAX_LEN] = {0};
app_status nats_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure)
@ -206,6 +209,16 @@ char *app_get_info()
return app_info;
}
static app_status app_rrpc_check(char *topic)
{
if(strstr(topic,"/rrpc/request/") == NULL){
return APP_ERR;
}else{
return APP_OK;
}
}
static void _handle_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
log_write(LOG_DEBUG, "Received msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
@ -244,9 +257,17 @@ static void _handle_message(natsConnection *nc, natsSubscription *sub, natsMsg *
base64_decode(msg_base64code->valuestring, strlen(msg_base64code->valuestring), msg_base64decode);
log_write(LOG_DEBUG, "_handle_message msg_base64decode:%s", msg_base64decode);
if(NULL != cb)
cb(topic->valuestring, msg_base64decode);
if(NULL != rrpc_cb){
if(app_rrpc_check(topic->valuestring) == APP_OK){
rrpc_cb(topic->valuestring, msg_base64decode);
APP_FREE(msg_base64decode);
goto end;
}
}
if(NULL != normal_cb){
normal_cb(topic->valuestring, msg_base64decode);
}
APP_FREE(msg_base64decode);
end:
natsMsg_Destroy(msg);
@ -254,17 +275,17 @@ end:
return;
}
app_status app_register_cb(msg_handler handle)
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler)
{
app_status status = APP_OK;
app_status status = APP_OK;
char edge_app_subject[NATS_SUBJECT_MAX_LEN] = {0};
if(normal_handler != NULL)
normal_cb = normal_handler;
rrpc_cb = rrpc_handler;
if((NULL == cb) && (NULL != handle))
cb = handle;
snprintf(edge_app_subject, NATS_SUBJECT_MAX_LEN, EDGE_APP_SUBJECT_FORMAT, app_get_name());
log_write(LOG_DEBUG, "edge_app_subject:%s",edge_app_subject);
status = nats_subscribe(edge_app_subject, _handle_message, NULL);
if(APP_OK != status)
{
@ -389,6 +410,20 @@ app_status app_common_init(void)
return APP_OK;
}
app_status app_rrpc_response(char *topic,char *payload)
{
app_status status;
char response_topic[128];
replace_str(response_topic, topic, "request", "response");
status = app_publish(response_topic, payload);
if(APP_OK != status){
log_write(LOG_ERROR, "app_publish rrpc fail");
return APP_ERR;
}
log_write(LOG_DEBUG, "app_publish rrpc :%s",payload);
return APP_OK;
}

View File

@ -131,9 +131,12 @@ char *app_get_name();
char *app_get_productSN();
char *app_get_deviceSN();
char *app_get_info();
app_status app_register_cb(msg_handler handle);
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler);
app_status app_publish(const char *topic, const char *str);
app_status app_common_init(void);
void log_write(log_level level, const char *format,...);
app_status app_rrpc_response(char *topic,char *payload);
#endif

View File

@ -1,12 +1,20 @@
#include "app.h"
void recvmsg_handler(char *topic, char *payload)
#define DEFAULT_TOPIC_FMT "/%s/%s/upload"
void app_normal_msg_handler(char *topic, char *payload)
{
log_write(LOG_INFO, "receive topic:%s",topic);
log_write(LOG_INFO, "receive payload:%s",payload);
return;
}
void app_rrpc_msg_handler(char *topic, char *payload)
{
log_write(LOG_INFO, "rrpc topic:%s",topic);
log_write(LOG_INFO, "rrpc payload:%s",payload);
app_rrpc_response(topic, "rrpc response sample!");
return;
}
int main(int argc, char **argv)
{
app_status status = APP_OK;
@ -26,7 +34,7 @@ int main(int argc, char **argv)
log_write(LOG_INFO, "app info:%s",app_get_info());
//注册回调函数
status = app_register_cb(recvmsg_handler);
status = app_register_cb(app_normal_msg_handler, app_rrpc_msg_handler);
if(APP_OK != status)
{
log_write(LOG_ERROR, "app_register_cb fail");
@ -40,12 +48,17 @@ int main(int argc, char **argv)
log_write(LOG_ERROR, "parse app info fail");
goto end;
}
/*
"topic":"/%s/%s/upload"
*/
cJSON *topic_format = cJSON_GetObjectItem(app_info, "topic");
if(topic_format == NULL){
snprintf(topic_str, 100, DEFAULT_TOPIC_FMT, app_get_productSN(), app_get_deviceSN());
}else{
snprintf(topic_str, 100, topic_format->valuestring, app_get_productSN(), app_get_deviceSN());
}
snprintf(topic_str, 100, topic_format->valuestring, app_get_productSN(), app_get_deviceSN());
while(1)
{
sleep(5);