support pub&sub nats subject
This commit is contained in:
parent
c8467af84e
commit
f2cd9aa300
30
README.md
30
README.md
|
@ -158,6 +158,16 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_handle)
|
|||
|
||||
void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle)
|
||||
|
||||
/**
|
||||
* @brief 设置nats消息回调接口
|
||||
*
|
||||
* @param nats_msg_handle: nats 消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)).
|
||||
*
|
||||
* @retval : void
|
||||
*/
|
||||
|
||||
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle)
|
||||
|
||||
/**
|
||||
* @brief 获取网关在线状态
|
||||
*
|
||||
|
@ -212,3 +222,23 @@ edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out
|
|||
|
||||
void log_write(log_level level, const char *format,...)
|
||||
|
||||
/**
|
||||
* @brief 向指定 nats subject,可以发送二进制消息
|
||||
*
|
||||
* @param subject: subject名称
|
||||
* @param data: 发送消息内容
|
||||
* @param dataLen: 发送消息内容长度
|
||||
*
|
||||
* @retval : 成功则返回EDGE_OK
|
||||
*/
|
||||
|
||||
edge_status nats_publish(const char *subject, const char *data, int dataLen)
|
||||
|
||||
/**
|
||||
* @brief 订阅nats subject
|
||||
*
|
||||
* @param subject: nats subject名称
|
||||
*
|
||||
* @retval : 成功则返回EDGE_OK
|
||||
*/
|
||||
edge_status nats_subscribe(const char *subject)
|
|
@ -83,6 +83,24 @@ edge_status edge_publishString(const char *topic, const char *str)
|
|||
return status;
|
||||
}
|
||||
|
||||
edge_status nats_publish(const char *subject, const char *data, int dataLen)
|
||||
{
|
||||
edge_status status;
|
||||
|
||||
char *normal_payload_base64 = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN);
|
||||
if(NULL == normal_payload_base64)
|
||||
{
|
||||
log_write(LOG_ERROR, "normal_payload_base64 malloc fail!");
|
||||
return EDGE_NO_MEMORY;
|
||||
}
|
||||
memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN);
|
||||
base64_encode(data, dataLen, normal_payload_base64);
|
||||
|
||||
status = _publish_string(subject, normal_payload_base64);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
edge_status _msg_parse_str_init(msg_parse **msg_parse_str, uint32_t request_id)
|
||||
{
|
||||
*msg_parse_str = (msg_parse *)EDGE_MALLOC(sizeof(msg_parse));
|
||||
|
|
|
@ -231,4 +231,16 @@ edge_status edge_rrpc_check(char *topic);
|
|||
*/
|
||||
edge_status edge_rrpc_response(char *topic,char *payload, int payloadLen);
|
||||
|
||||
/**
|
||||
* @brief 向nats subject
|
||||
*
|
||||
* @param subject: nats subject
|
||||
* @param data: 发送数据
|
||||
* @param dataLen: 发送数据长度
|
||||
*
|
||||
* @retval : 成功返回EDGE_OK
|
||||
*/
|
||||
|
||||
edge_status nats_publish(const char *subject, const char *data, int dataLen);
|
||||
|
||||
#endif
|
||||
|
|
38
edge/edge.c
38
edge/edge.c
|
@ -26,6 +26,7 @@ natsSubscription *sub = NULL;
|
|||
bool edge_state = false;
|
||||
edge_topo_notify_handler edge_topo_handle = NULL;
|
||||
edge_subdev_status_handler edge_subdev_status_handle = NULL;
|
||||
edge_nats_msg_handler edge_nats_msg_handle = NULL;
|
||||
/* global variable end */
|
||||
|
||||
static edge_status _edge_subscribe(const char *subject, natsMsgHandler cb, void *cbClosure)
|
||||
|
@ -211,6 +212,13 @@ void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_hand
|
|||
return;
|
||||
}
|
||||
|
||||
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle)
|
||||
{
|
||||
if(NULL != nats_msg_handle)
|
||||
edge_nats_msg_handle = nats_msg_handle;
|
||||
return;
|
||||
}
|
||||
|
||||
static void _on_local_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
|
||||
{
|
||||
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
|
||||
|
@ -417,6 +425,28 @@ static void _on_edge_state_message(natsConnection *nc, natsSubscription *sub, na
|
|||
return;
|
||||
}
|
||||
|
||||
static void _on_nats_message(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
|
||||
{
|
||||
log_write(LOG_DEBUG, "Received local msg: %s - %.*s", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));
|
||||
|
||||
char *msg_base64decode = (char *)EDGE_MALLOC(NATS_MSG_MAX_LEN);
|
||||
if(NULL == msg_base64decode)
|
||||
{
|
||||
log_write(LOG_ERROR, "msg_base64decode malloc fail!");
|
||||
return;
|
||||
}
|
||||
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
|
||||
int msg_base64decodeLen = base64_decode(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), msg_base64decode);
|
||||
log_write(LOG_DEBUG, "_on_local_message msg_base64decode:%s", msg_base64decode);
|
||||
|
||||
if(edge_nats_msg_handle)
|
||||
edge_nats_msg_handle((char *)natsMsg_GetSubject(msg), msg_base64decode, msg_base64decodeLen);
|
||||
|
||||
EDGE_FREE(msg_base64decode);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
bool edge_get_online_status(void)
|
||||
{
|
||||
return edge_state;
|
||||
|
@ -899,3 +929,11 @@ end:
|
|||
return status;
|
||||
}
|
||||
|
||||
edge_status nats_subscribe(const char *subject)
|
||||
{
|
||||
edge_status status;
|
||||
|
||||
status = _edge_subscribe(subject, _on_nats_message, NULL);
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
21
edge/edge.h
21
edge/edge.h
|
@ -22,6 +22,9 @@ typedef void (*edge_topo_notify_handler)(topo_operation opera, char *payload);
|
|||
// /topic:$system/{productSN}/{deviceSN}/subdev/enable || /$system/{productSN}/{deviceSN}/subdev/delete msg handle
|
||||
typedef void (*edge_subdev_status_handler)(subdev_able opera,char *payload);
|
||||
|
||||
//nats message handle
|
||||
typedef void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen);
|
||||
|
||||
/**
|
||||
* @brief 获取驱动信息
|
||||
*
|
||||
|
@ -85,6 +88,15 @@ void edge_set_topo_notify_handle(edge_topo_notify_handler topo_notify_h
|
|||
*/
|
||||
void edge_set_subdev_status_handle(edge_subdev_status_handler subdev_status_handle);
|
||||
|
||||
/**
|
||||
* @brief 设置nats消息回调接口(控制台启动禁用设备)
|
||||
*
|
||||
* @param nats_msg_handle: nats消息回调接口(void (*edge_nats_msg_handler)(char *topic, char *payload, int payloadLen)).
|
||||
*
|
||||
* @retval : void
|
||||
*/
|
||||
void edge_set_nats_msg_handle(edge_nats_msg_handler nats_msg_handle);
|
||||
|
||||
/**
|
||||
* @brief 获取子设备拓扑
|
||||
*
|
||||
|
@ -115,4 +127,13 @@ edge_status edge_add_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms
|
|||
*/
|
||||
edge_status edge_delete_topo(subdev_client *pst_subdev_client, uint32_t time_out_ms);
|
||||
|
||||
/**
|
||||
* @brief 订阅nats subject
|
||||
*
|
||||
* @param subject: nats subject
|
||||
*
|
||||
* @retval : 成功则返回EDGE_OK
|
||||
*/
|
||||
edge_status nats_subscribe(const char *subject);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -27,6 +27,12 @@ static void edge_subdev_status_handler_user(subdev_able opera,char *payload)
|
|||
return;
|
||||
}
|
||||
|
||||
static void edge_nats_msg_handler_user(char *topic, char *payload, int payloadLen)
|
||||
{
|
||||
log_write(LOG_INFO, "topic:%s payload:%s payloadLen:%d", topic, payload, payloadLen);
|
||||
return;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
edge_status status = EDGE_OK;
|
||||
|
@ -66,6 +72,16 @@ int main(int argc, char **argv)
|
|||
|
||||
edge_set_subdev_status_handle(edge_subdev_status_handler_user);
|
||||
|
||||
edge_set_nats_msg_handle(edge_nats_msg_handler_user);
|
||||
|
||||
//subscribe nats subject
|
||||
status = nats_subscribe("/a/b/c");
|
||||
if(EDGE_OK != status)
|
||||
{
|
||||
log_write(LOG_ERROR, "nats_subscribe nats topic fail");
|
||||
return EDGE_ERR;
|
||||
}
|
||||
|
||||
// 解析驱动配置
|
||||
/*
|
||||
{
|
||||
|
@ -194,6 +210,12 @@ int main(int argc, char **argv)
|
|||
{
|
||||
log_write(LOG_ERROR, "edge_publish fail");
|
||||
goto end;
|
||||
}
|
||||
status = nats_publish("/a/b/c",time_stamp,strlen(time_stamp));
|
||||
if(EDGE_OK != status)
|
||||
{
|
||||
log_write(LOG_ERROR, "edge_publish nats subject fail");
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue