Mainflux.mainflux/pkg/messaging/nats/pubsub_test.go

298 lines
8.6 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package nats_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/stretchr/testify/assert"
)
const (
topic = "topic"
chansPrefix = "channels"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
)
var (
msgChan = make(chan *messaging.Message)
message = &messaging.Message{
Channel: channel,
Subtopic: subtopic,
Publisher: "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b",
Protocol: "mqtt",
Payload: []byte("payload"),
Created: time.Now().UnixNano(),
}
)
func TestPublisher(t *testing.T) {
subCfg := messaging.SubscriberConfig{
ID: clientID,
Topic: fmt.Sprintf("%s.>", chansPrefix),
Handler: handler{},
}
err := pubsub.Subscribe(context.TODO(), subCfg)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
topic string
subtopic string
message *messaging.Message
error error
}{
{
desc: "publish message with empty message",
topic: channel,
subtopic: subtopic,
message: &messaging.Message{},
error: nil,
},
{
desc: "publish message with message",
topic: channel,
subtopic: subtopic,
message: message,
error: nil,
},
{
desc: "publish message with topic and empty subtopic",
topic: channel,
subtopic: "",
message: message,
error: nil,
},
{
desc: "publish message with subtopic and empty topic",
topic: "",
subtopic: subtopic,
message: message,
error: nats.ErrEmptyTopic,
},
{
desc: "publish message with topic and subtopic",
topic: channel,
subtopic: subtopic,
message: message,
error: nil,
},
}
for _, tc := range cases {
tc.message.Subtopic = tc.subtopic
err := pubsub.Publish(context.TODO(), tc.topic, tc.message)
assert.Equal(t, tc.error, err, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, tc.error, err))
if err == nil {
receivedMsg := <-msgChan
assert.Equal(t, tc.message.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, tc.message.Payload, receivedMsg))
assert.Equal(t, tc.message.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
assert.Equal(t, tc.message.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
assert.Equal(t, tc.message.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
assert.Equal(t, tc.message.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
assert.Equal(t, tc.message.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
assert.Equal(t, tc.message.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &tc.message, receivedMsg))
}
}
}
func TestPubsub(t *testing.T) {
// Test Subscribe and Unsubscribe.
subcases := []struct {
desc string
topic string
clientID string
errorMessage error
pubsub bool // true for subscribe and false for unsubscribe.
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.>", chansPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.*", chansPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
handler: handler{},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
topic: "h",
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{},
},
{
desc: "Unsubscribe from the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{},
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
handler: handler{},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: true,
handler: handler{},
},
{
desc: "Unsubscribe from an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: false,
handler: handler{},
},
{
desc: "Subscribe to a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
handler: handler{},
},
{
desc: "Unsubscribe from a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
handler: handler{},
},
}
for _, pc := range subcases {
subCfg := messaging.SubscriberConfig{
ID: pc.clientID,
Topic: pc.topic,
Handler: pc.handler,
}
if pc.pubsub == true {
err := pubsub.Subscribe(context.TODO(), subCfg)
if pc.errorMessage == nil {
assert.Nil(t, err, fmt.Sprintf("%s expected %+v got %+v\n", pc.desc, pc.errorMessage, err))
} else {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s expected %+v got %+v\n", pc.desc, pc.errorMessage, err))
}
} else {
err := pubsub.Unsubscribe(context.TODO(), pc.clientID, pc.topic)
if pc.errorMessage == nil {
assert.Nil(t, err, fmt.Sprintf("%s expected %+v got %+v\n", pc.desc, pc.errorMessage, err))
} else {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s expected %+v got %+v\n", pc.desc, pc.errorMessage, err))
}
}
}
}
type handler struct{}
func (h handler) Handle(msg *messaging.Message) error {
msgChan <- msg
return nil
}
func (h handler) Cancel() error {
return nil
}