Merge pull request #7 from ucloud/feature_add_nats_subject

support pub&sub nats subject
This commit is contained in:
杜敏敏 2021-05-11 14:06:56 +08:00 committed by GitHub
commit cb2e4745ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 141 additions and 0 deletions

View File

@ -158,6 +158,16 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_handle)
void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle)
/**
* @brief 设置nats消息回调接口
*
* @param nats_msg_handle: nats 消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)).
*
* @retval : void
*/
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle)
/**
* @brief 获取网关在线状态
*
@ -212,3 +222,23 @@ edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out
void log_write(log_level level, const char *format,...)
/**
* @brief 向指定 nats subject,可以发送二进制消息
*
* @param subject: subject名称
* @param data: 发送消息内容
* @param dataLen: 发送消息内容长度
*
* @retval : 成功则返回EDGE_OK
*/
edge_status nats_publish(const char *subject, const char *data, int dataLen)
/**
* @brief 订阅nats subject
*
* @param subject: nats subject名称
*
* @retval : 成功则返回EDGE_OK
*/
edge_status nats_subscribe(const char *subject)

View File

@ -83,6 +83,24 @@ edge_status edge_publishString(const char *topic, const char *str)
return status;
}
edge_status nats_publish(const char *subject, const char *data, int dataLen)
{
edge_status status;
char *normal_payload_base64 = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN);
if(NULL == normal_payload_base64)
{
log_write(LOG_ERROR, "normal_payload_base64 malloc fail!");
return EDGE_NO_MEMORY;
}
memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN);
base64_encode(data, dataLen, normal_payload_base64);
status = _publish_string(subject, normal_payload_base64);
return status;
}
edge_status _msg_parse_str_init(msg_parse **msg_parse_str, uint32_t request_id)
{
*msg_parse_str = (msg_parse *)EDGE_MALLOC(sizeof(msg_parse));

View File

@ -231,4 +231,16 @@ edge_status edge_rrpc_check(char *topic);
*/
edge_status edge_rrpc_response(char *topic,char *payload, int payloadLen);
/**
* @brief nats subject
*
* @param subject: nats subject
* @param data:
* @param dataLen:
*
* @retval : EDGE_OK
*/
edge_status nats_publish(const char *subject, const char *data, int dataLen);
#endif

View File

@ -26,6 +26,7 @@ natsSubscription *sub = NULL;
bool edge_state = false;
edge_topo_notify_handler edge_topo_handle = NULL;
edge_subdev_status_handler edge_subdev_status_handle = NULL;
edge_nats_msg_handler edge_nats_msg_handle = NULL;
/* global variable end */
static edge_status _edge_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure)
@ -211,6 +212,13 @@ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_hand
return;
}
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle)
{
if(NULL != nats_msg_handle)
edge_nats_msg_handle = nats_msg_handle;
return;
}
static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
@ -417,6 +425,28 @@ static void _on_edge_state_message(natsConnection *nc, natsSubscription *sub, na
return;
}
static void _on_nats_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
char *msg_base64decode = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN);
if(NULL == msg_base64decode)
{
log_write(LOG_ERROR, "msg_base64decode malloc fail!");
return;
}
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
int msg_base64decodeLen = base64_decode(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), msg_base64decode);
log_write(LOG_DEBUG, "_on_local_message msg_base64decode:%s", msg_base64decode);
if(edge_nats_msg_handle)
edge_nats_msg_handle((char *)natsMsg_GetSubject(msg), msg_base64decode, msg_base64decodeLen);
EDGE_FREE(msg_base64decode);
return;
}
bool edge_get_online_status(void)
{
return edge_state;
@ -899,3 +929,11 @@ end:
return status;
}
edge_status nats_subscribe(const char *subject)
{
edge_status status;
status = _edge_subscribe(subject, _on_nats_message, NULL);
return status;
}

View File

@ -22,6 +22,9 @@ typedef void (*edge_topo_notify_handler)(topo_operation opera, char *payload);
// /topic:$system/{productSN}/{deviceSN}/subdev/enable || /$system/{productSN}/{deviceSN}/subdev/delete msg handle
typedef void (*edge_subdev_status_handler)(subdev_able opera,char *payload);
//nats message handle
typedef void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen);
/**
* @brief
*
@ -85,6 +88,15 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_h
*/
void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle);
/**
* @brief nats消息回调接口
*
* @param nats_msg_handle: nats消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)).
*
* @retval : void
*/
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle);
/**
* @brief
*
@ -115,4 +127,13 @@ edge_status edge_add_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms
*/
edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms);
/**
* @brief nats subject
*
* @param subject: nats subject
*
* @retval : EDGE_OK
*/
edge_status nats_subscribe(const char *subject);
#endif

View File

@ -27,6 +27,12 @@ static void edge_subdev_status_handler_user(subdev_able opera,char *payload)
return;
}
static void edge_nats_msg_handler_user(char *topic, char *payload, int payloadLen)
{
log_write(LOG_INFO, "topic:%s payload:%s payloadLen:%d", topic, payload, payloadLen);
return;
}
int main(int argc, char **argv)
{
edge_status status = EDGE_OK;
@ -66,6 +72,16 @@ int main(int argc, char **argv)
edge_set_subdev_status_handle(edge_subdev_status_handler_user);
edge_set_nats_msg_handle(edge_nats_msg_handler_user);
//subscribe nats subject
status = nats_subscribe("/a/b/c");
if(EDGE_OK != status)
{
log_write(LOG_ERROR, "nats_subscribe nats topic fail");
return EDGE_ERR;
}
// 解析驱动配置
/*
{
@ -194,6 +210,12 @@ int main(int argc, char **argv)
{
log_write(LOG_ERROR, "edge_publish fail");
goto end;
}
status = nats_publish("/a/b/c",time_stamp,strlen(time_stamp));
if(EDGE_OK != status)
{
log_write(LOG_ERROR, "edge_publish nats subject fail");
goto end;
}
}