add rrpc handler

This commit is contained in:
jamesliu 2021-01-28 15:11:53 +08:00
parent cf644690f8
commit 13293fd7b8
4 changed files with 29 additions and 9 deletions

View File

@ -4,7 +4,7 @@
#include <errno.h>
#include "client.h"
subdev_client * edge_subdev_construct(const char *product_sn, const char *device_sn, edge_normal_msg_handler normal_msg_handle)
subdev_client * edge_subdev_construct(const char *product_sn, const char *device_sn, edge_normal_msg_handler normal_msg_handle, edge_rrpc_msg_handler rrpc_msg_handle)
{
subdev_client *pst_subdev_client;
@ -23,6 +23,7 @@ subdev_client * edge_subdev_construct(const char *product_sn, const char *device
pst_subdev_client->product_sn = product_sn;
pst_subdev_client->device_sn = device_sn;
pst_subdev_client->normal_msg_handle = normal_msg_handle;
pst_subdev_client->rrpc_msg_handle = rrpc_msg_handle;
return pst_subdev_client;
}
@ -528,11 +529,11 @@ void log_write(log_level level, const char *format,...)
edge_status edge_rrpc_check(char *topic)
{
if(strstr(topic,"/rrpc/request/") == NULL){
if(strstr(topic,"/rrpc/request/") == NULL){
return EDGE_ERR;
}else{
}else{
return EDGE_OK;
}
}
}
edge_status edge_rrpc_response(char *topic,char *payload)
@ -540,7 +541,7 @@ edge_status edge_rrpc_response(char *topic,char *payload)
edge_status status;
char response_topic[128];
_replace_str(response_topic, topic, "request", "response");
status = edge_publish(response_topic, payload);
status = edge_publish(response_topic, payload);
if(EDGE_OK != status){
log_write(LOG_ERROR, "edge_publish rrpc fail");
return EDGE_ERR;

View File

@ -7,6 +7,9 @@
// normal message
typedef void (*edge_normal_msg_handler)(char *topic, char *payload);
// rrpc message
typedef void (*edge_rrpc_msg_handler)(char *topic, char *payload);
typedef enum
{
SUBDEV_LOGIN = 0,
@ -25,6 +28,7 @@ typedef struct
const char *product_sn;
const char *device_sn;
edge_normal_msg_handler normal_msg_handle;
edge_rrpc_msg_handler rrpc_msg_handle;
}subdev_client;
typedef enum
@ -121,10 +125,11 @@ void log_print(const char *format,...);
* @param product_sn:
* @param device_sn:
* @param normal_msg_handle: (void (*edge_normal_msg_handler)(char *topic, char *payload)).
* @param rrpc_msg_handle rrpc消息回调处理接口(void (*edge_rrpc_msg_handler)(char *topic, char *payload)).
*
* @retval : NULL
*/
subdev_client * edge_subdev_construct(const char *product_sn, const char *device_sn, edge_normal_msg_handler normal_msg_handle);
subdev_client * edge_subdev_construct(const char *product_sn, const char *device_sn, edge_normal_msg_handler normal_msg_handle,edge_rrpc_msg_handler rrpc_msg_handle);
/**

View File

@ -272,7 +272,16 @@ static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg
log_write(LOG_WARN, "cannot find conn device[%s]", subdev_client_tmp.device_sn);
goto end;
}
pst_subdev_client = (subdev_client *)node_tmp->val;
if(pst_subdev_client->rrpc_msg_handle){
if(edge_rrpc_check(topic->valuestring) == EDGE_OK){
pst_subdev_client->rrpc_msg_handle(topic->valuestring, msg_base64decode);
goto end;
}
}
if(pst_subdev_client->normal_msg_handle)
pst_subdev_client->normal_msg_handle(topic->valuestring, msg_base64decode);

View File

@ -4,8 +4,13 @@
static void edge_normal_msg_handler_user(char *topic, char *payload)
{
log_write(LOG_INFO, "topic:%s payload:%s", topic, payload);
if(edge_rrpc_check(topic) == EDGE_OK)
edge_rrpc_response(topic,"rrpc response test!");
return;
}
static void edge_rrpc_msg_handler_user(char *topic, char *payload)
{
log_write(LOG_INFO, "rrpc topic:%s payload:%s", topic, payload);
edge_rrpc_response(topic, "rrpc response message!");
return;
}
@ -122,7 +127,7 @@ int main(int argc, char **argv)
}
// 初始化一个子设备
subdevClient = edge_subdev_construct(arrary_productsn->valuestring, arrary_devicesn->valuestring, edge_normal_msg_handler_user);
subdevClient = edge_subdev_construct(arrary_productsn->valuestring, arrary_devicesn->valuestring, edge_normal_msg_handler_user, edge_rrpc_msg_handler_user);
if(NULL == subdevClient)
{
log_write(LOG_ERROR, "edge construct fail!");