Merge pull request #2 from ucloud/bugfix_support_send_binary_message

support send binary message
This commit is contained in:
杜敏敏 2021-03-16 16:56:58 +08:00 committed by GitHub
commit 9e33b6e793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 21 deletions

View File

@ -73,10 +73,36 @@ char *app_get_info(void)
*
* @param handle: 下行消息回调函数指针
*
* @param handle: rrpc消息回调函数指针
*
* @retval : 成功则返回APP_OK
*/
app_status app_register_cb(msg_handler handle)
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler)
/**
* @brief 发送消息,可发送二进制流
*
* @param topic: topic名字
*
* @param data: 发送数据内容
*
* @param dataLen: 发送数据长度
*
* @retval : 成功则返回APP_OK
*/
app_status app_publish(const char *topic, const char *data, int dataLen)
/**
* @brief 发送字符串消息
*
* @param topic: topic名字
*
* @param data: 发送字符串内容
*
* @retval : 成功则返回APP_OK
*/
app_status app_publishString(const char *topic, const char *data)
/**
* @brief 记录日志

View File

@ -254,19 +254,19 @@ static void _handle_message(natsConnection *nc, natsSubscription *sub, natsMsg *
goto end;
}
memset(msg_base64decode, 0, NATS_MSG_MAX_LEN);
base64_decode(msg_base64code->valuestring, strlen(msg_base64code->valuestring), msg_base64decode);
int msg_base64decodeLen = base64_decode(msg_base64code->valuestring, strlen(msg_base64code->valuestring), msg_base64decode);
log_write(LOG_DEBUG, "_handle_message msg_base64decode:%s", msg_base64decode);
if(NULL != rrpc_cb){
if(app_rrpc_check(topic->valuestring) == APP_OK){
rrpc_cb(topic->valuestring, msg_base64decode);
rrpc_cb(topic->valuestring, msg_base64decode, msg_base64decodeLen);
APP_FREE(msg_base64decode);
goto end;
}
}
if(NULL != normal_cb){
normal_cb(topic->valuestring, msg_base64decode);
normal_cb(topic->valuestring, msg_base64decode, msg_base64decodeLen);
}
APP_FREE(msg_base64decode);
end:
@ -295,11 +295,11 @@ app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler)
return status;
}
app_status app_publish(const char *topic, const char *str)
app_status app_publish(const char *topic, const char *data, int dataLen)
{
app_status status = APP_OK;
if((NULL == topic) || (NULL == str))
if((NULL == topic) || (NULL == data))
{
return APP_INVALID_ARG;
}
@ -311,8 +311,8 @@ app_status app_publish(const char *topic, const char *str)
return APP_NO_MEMORY;
}
memset(normal_payload_base64, 0, NATS_MSG_MAX_LEN);
base64_encode(str, strlen(str), normal_payload_base64);
log_write(LOG_DEBUG, "dyn_reg_payload_base64:%s",normal_payload_base64);
base64_encode(data, dataLen, normal_payload_base64);
log_write(LOG_DEBUG, "send data:%s",data);
char *normal_msg = (char *)APP_MALLOC(NATS_MSG_MAX_LEN);
if(NULL == normal_msg)
@ -323,7 +323,6 @@ app_status app_publish(const char *topic, const char *str)
}
memset(normal_msg, 0, NATS_MSG_MAX_LEN);
snprintf(normal_msg, NATS_MSG_MAX_LEN, NORMAL_MSG_FORMAT, topic, normal_payload_base64, app_get_name());
log_write(LOG_DEBUG, "normal_msg:%s",normal_msg);
status = nats_publish(edge_router_subject, normal_msg);
@ -332,6 +331,16 @@ app_status app_publish(const char *topic, const char *str)
return status;
}
app_status app_publishString(const char *topic, const char *str)
{
app_status status;
status = app_publish(topic,str,strlen(str));
return status;
}
static void _handle_status_sync(union sigval v)
{
app_status status = APP_OK;
@ -410,12 +419,12 @@ app_status app_common_init(void)
return APP_OK;
}
app_status app_rrpc_response(char *topic,char *payload)
app_status app_rrpc_response(char *topic,char *payload, int payloadLen)
{
app_status status;
char response_topic[128];
replace_str(response_topic, topic, "request", "response");
status = app_publish(response_topic, payload);
status = app_publish(response_topic, payload, payloadLen);
if(APP_OK != status){
log_write(LOG_ERROR, "app_publish rrpc fail");
return APP_ERR;

View File

@ -125,16 +125,17 @@ typedef enum
} app_status;
typedef void (*msg_handler)(char *topic, char *payload);
typedef void (*msg_handler)(char *topic, char *payload, int payloadLen);
char *app_get_name();
char *app_get_productSN();
char *app_get_deviceSN();
char *app_get_info();
app_status app_register_cb(msg_handler normal_handler, msg_handler rrpc_handler);
app_status app_publish(const char *topic, const char *str);
app_status app_publish(const char *topic, const char *data, int dataLen);
app_status app_publishString(const char *topic, const char *str);
app_status app_common_init(void);
void log_write(log_level level, const char *format,...);
app_status app_rrpc_response(char *topic,char *payload);
app_status app_rrpc_response(char *topic,char *payload, int payloadLen);
#endif

View File

@ -120,7 +120,7 @@ int base64_decode(const char *indata, int inlen, char *outdata) {
y = t = 0;
}
}
return ret;
return i;
}
int calc_file_len(const char *file_path)

View File

@ -1,17 +1,18 @@
#include "app.h"
#define DEFAULT_TOPIC_FMT "/%s/%s/upload"
void app_normal_msg_handler(char *topic, char *payload)
void app_normal_msg_handler(char *topic, char *payload, int payloadLen)
{
log_write(LOG_INFO, "receive topic:%s",topic);
log_write(LOG_INFO, "receive payload:%s",payload);
log_write(LOG_INFO, "receive payload:%s payloadLen:%d",payload, payloadLen);
return;
}
void app_rrpc_msg_handler(char *topic, char *payload)
void app_rrpc_msg_handler(char *topic, char *payload, int payloadLen)
{
log_write(LOG_INFO, "rrpc topic:%s",topic);
log_write(LOG_INFO, "rrpc payload:%s",payload);
app_rrpc_response(topic, "rrpc response sample!");
log_write(LOG_INFO, "rrpc payload:%s payloadLen:%d",payload, payloadLen);
char *response_str = "rrpc response sample!";
app_rrpc_response(topic, response_str, strlen(response_str));
return;
}
@ -67,7 +68,8 @@ int main(int argc, char **argv)
snprintf(time_stamp, 100, "{\"timestamp\": \"%ld\"}", stamp.tv_sec);
log_write(LOG_INFO, "send message[%s]", time_stamp);
status = app_publish(topic_str, time_stamp);
status = app_publishString(topic_str, time_stamp);
status |= app_publish(topic_str, "0D0A2131", 8);
if(APP_OK != status)
{
log_write(LOG_ERROR, "app_publish fail");