diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go index 425e615d..b77ee5e1 100644 --- a/mqtt/forwarder.go +++ b/mqtt/forwarder.go @@ -43,15 +43,17 @@ func handle(ctx context.Context, pub messaging.Publisher, logger mflog.Logger) h } // Use concatenation instead of fmt.Sprintf for the // sake of simplicity and performance. - topic := fmt.Sprintf("channels/%s/messages", msg.Channel) + topic := "channels/" + msg.Channel + "/messages" if msg.Subtopic != "" { - topic = fmt.Sprintf("%s/%s", topic, strings.ReplaceAll(msg.Subtopic, ".", "/")) + topic = topic + "/" + strings.ReplaceAll(msg.Subtopic, ".", "/") } + go func() { if err := pub.Publish(ctx, topic, msg); err != nil { logger.Warn(fmt.Sprintf("Failed to forward message: %s", err)) } }() + return nil } } diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go index 12445c41..1bb4c622 100644 --- a/pkg/messaging/mqtt/publisher.go +++ b/pkg/messaging/mqtt/publisher.go @@ -10,7 +10,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/mainflux/pkg/messaging" - "google.golang.org/protobuf/proto" ) var errPublishTimeout = errors.New("failed to publish due to timeout reached") @@ -40,20 +39,18 @@ func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.M if topic == "" { return ErrEmptyTopic } - data, err := proto.Marshal(msg) - if err != nil { - return err - } - token := pub.client.Publish(topic, qos, false, data) + + // Publish only the payload and not the whole message. + token := pub.client.Publish(topic, qos, false, msg.GetPayload()) if token.Error() != nil { return token.Error() } - ok := token.WaitTimeout(pub.timeout) - if !ok { + + if ok := token.WaitTimeout(pub.timeout); !ok { return errPublishTimeout } - return token.Error() + return nil } func (pub publisher) Close() error { diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index 7a4b1ee1..4e14bbee 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -62,7 +62,7 @@ type pubsub struct { } // NewPubSub returns MQTT message publisher/subscriber. -func NewPubSub(url, queue string, timeout time.Duration, logger mflog.Logger) (messaging.PubSub, error) { +func NewPubSub(url, _ string, timeout time.Duration, logger mflog.Logger) (messaging.PubSub, error) { client, err := newClient(url, "mqtt-publisher", timeout) if err != nil { return nil, err @@ -194,7 +194,7 @@ func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) { } func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler { - return func(c mqtt.Client, m mqtt.Message) { + return func(_ mqtt.Client, m mqtt.Message) { var msg messaging.Message if err := proto.Unmarshal(m.Payload(), &msg); err != nil { ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) diff --git a/pkg/messaging/mqtt/pubsub_test.go b/pkg/messaging/mqtt/pubsub_test.go index 27f7acae..c45488fd 100644 --- a/pkg/messaging/mqtt/pubsub_test.go +++ b/pkg/messaging/mqtt/pubsub_test.go @@ -111,7 +111,9 @@ func TestPublisher(t *testing.T) { assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) receivedMsg := <-msgChan - assert.Equal(t, data, receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg)) + if tc.payload != nil { + assert.Equal(t, expectedMsg.GetPayload(), receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg)) + } } } @@ -271,9 +273,14 @@ func TestPubSub(t *testing.T) { Subtopic: subtopic, Payload: data, } + data, err := proto.Marshal(&expectedMsg) + assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) + msg := messaging.Message{ + Payload: data, + } // Publish message, and then receive it on message channel. - err := pubsub.Publish(context.TODO(), topic, &expectedMsg) + err = pubsub.Publish(context.TODO(), topic, &msg) assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) receivedMsg := <-msgChan diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index bf0f45e3..8cb09364 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -161,6 +161,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) broker.MsgHandler { ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) return } + if err := h.Handle(&msg); err != nil { ps.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err)) }