zephyr/subsys/net/lib/mqtt/mqtt_rx.c

293 lines
7.0 KiB
C

/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL);
#include "mqtt_internal.h"
#include "mqtt_transport.h"
#include "mqtt_os.h"
/** @file mqtt_rx.c
*
* @brief MQTT Received data handling.
*/
static int mqtt_handle_packet(struct mqtt_client *client,
uint8_t type_and_flags,
uint32_t var_length,
struct buf_ctx *buf)
{
int err_code = 0;
bool notify_event = true;
struct mqtt_evt evt;
/* Success by default, overwritten in special cases. */
evt.result = 0;
switch (type_and_flags & 0xF0) {
case MQTT_PKT_TYPE_CONNACK:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
evt.type = MQTT_EVT_CONNACK;
err_code = connect_ack_decode(client, buf, &evt.param.connack);
if (err_code == 0) {
NET_DBG("[CID %p]: return_code: %d", client,
evt.param.connack.return_code);
if (evt.param.connack.return_code ==
MQTT_CONNECTION_ACCEPTED) {
/* Set state. */
MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
} else {
err_code = -ECONNREFUSED;
}
evt.result = evt.param.connack.return_code;
} else {
evt.result = err_code;
}
break;
case MQTT_PKT_TYPE_PUBLISH:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
evt.type = MQTT_EVT_PUBLISH;
err_code = publish_decode(type_and_flags, var_length, buf,
&evt.param.publish);
evt.result = err_code;
client->internal.remaining_payload =
evt.param.publish.message.payload.len;
NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x",
evt.param.publish.message.topic.qos,
evt.param.publish.message.payload.len,
evt.param.publish.message.topic.topic.size);
break;
case MQTT_PKT_TYPE_PUBACK:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
evt.type = MQTT_EVT_PUBACK;
err_code = publish_ack_decode(buf, &evt.param.puback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBREC:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
evt.type = MQTT_EVT_PUBREC;
err_code = publish_receive_decode(buf, &evt.param.pubrec);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBREL:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
evt.type = MQTT_EVT_PUBREL;
err_code = publish_release_decode(buf, &evt.param.pubrel);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBCOMP:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
evt.type = MQTT_EVT_PUBCOMP;
err_code = publish_complete_decode(buf, &evt.param.pubcomp);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_SUBACK:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
evt.type = MQTT_EVT_SUBACK;
err_code = subscribe_ack_decode(buf, &evt.param.suback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_UNSUBACK:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
evt.type = MQTT_EVT_UNSUBACK;
err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PINGRSP:
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
if (client->unacked_ping <= 0) {
NET_WARN("Unexpected PINGRSP");
client->unacked_ping = 0;
} else {
client->unacked_ping--;
}
evt.type = MQTT_EVT_PINGRESP;
break;
default:
/* Nothing to notify. */
notify_event = false;
break;
}
if (notify_event == true) {
event_notify(client, &evt);
}
return err_code;
}
static int mqtt_read_message_chunk(struct mqtt_client *client,
struct buf_ctx *buf, uint32_t length)
{
uint32_t remaining;
int len;
/* In case all data requested has already been buffered, return. */
if (length <= (buf->end - buf->cur)) {
return 0;
}
/* Calculate how much data we need to read from the transport,
* given the already buffered data.
*/
remaining = length - (buf->end - buf->cur);
/* Check if read does not exceed the buffer. */
if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) ||
(buf->end + remaining < client->rx_buf)) {
NET_ERR("[CID %p]: Read would exceed RX buffer bounds.",
client);
return -ENOMEM;
}
len = mqtt_transport_read(client, buf->end, remaining, false);
if (len < 0) {
if (len != -EAGAIN) {
NET_ERR("[CID %p]: Transport read error: %d", client, len);
}
return len;
}
if (len == 0) {
NET_ERR("[CID %p]: Connection closed.", client);
return -ENOTCONN;
}
client->internal.rx_buf_datalen += len;
buf->end += len;
if (len < remaining) {
NET_ERR("[CID %p]: Message partially received.", client);
return -EAGAIN;
}
return 0;
}
static int mqtt_read_publish_var_header(struct mqtt_client *client,
uint8_t type_and_flags,
struct buf_ctx *buf)
{
uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
int err_code;
uint32_t variable_header_length;
/* Read topic length field. */
err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t));
if (err_code < 0) {
return err_code;
}
variable_header_length = *buf->cur << 8; /* MSB */
variable_header_length |= *(buf->cur + 1); /* LSB */
/* Add two bytes for topic length field. */
variable_header_length += sizeof(uint16_t);
/* Add two bytes for message_id, if needed. */
if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
variable_header_length += sizeof(uint16_t);
}
/* Now we can read the whole header. */
err_code = mqtt_read_message_chunk(client, buf,
variable_header_length);
if (err_code < 0) {
return err_code;
}
return 0;
}
static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
uint8_t *type_and_flags,
uint32_t *var_length,
struct buf_ctx *buf)
{
/* Read the mandatory part of the fixed header in first iteration. */
uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
int err_code;
do {
err_code = mqtt_read_message_chunk(client, buf, chunk_size);
if (err_code < 0) {
return err_code;
}
/* Reset to pointer to the beginning of the frame. */
buf->cur = client->rx_buf;
chunk_size = 1U;
err_code = fixed_header_decode(buf, type_and_flags, var_length);
} while (err_code == -EAGAIN);
return err_code;
}
int mqtt_handle_rx(struct mqtt_client *client)
{
int err_code;
uint8_t type_and_flags;
uint32_t var_length;
struct buf_ctx buf;
buf.cur = client->rx_buf;
buf.end = client->rx_buf + client->internal.rx_buf_datalen;
err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
&var_length, &buf);
if (err_code < 0) {
return (err_code == -EAGAIN) ? 0 : err_code;
}
if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
err_code = mqtt_read_publish_var_header(client, type_and_flags,
&buf);
} else {
err_code = mqtt_read_message_chunk(client, &buf, var_length);
}
if (err_code < 0) {
return (err_code == -EAGAIN) ? 0 : err_code;
}
/* At this point, packet is ready to be passed to the application. */
err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
if (err_code < 0) {
return err_code;
}
client->internal.rx_buf_datalen = 0U;
return 0;
}