MF-1199 - Add NATS messaging tests (#1209)
* Add test setup for mqtt Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add pubsub tests Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add message and payload helper funcs Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add time.Sleep after subscribe to topic Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add NATS tests setup Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add NATS test cases Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Add message forwarded check to handler Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com> * Remove race affected mqtt tests Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
parent
2453cd75ed
commit
c1088b9315
|
@ -0,0 +1,84 @@
|
|||
// Copyright (c) Mainflux
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package nats_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
topic = "topic"
|
||||
chansPrefix = "channels"
|
||||
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
|
||||
subtopic = "engine"
|
||||
)
|
||||
|
||||
var (
|
||||
msgChan = make(chan messaging.Message)
|
||||
data = []byte("payload")
|
||||
)
|
||||
|
||||
func TestPubsub(t *testing.T) {
|
||||
err := pubsub.Subscribe(fmt.Sprintf("%s.%s", chansPrefix, topic), handler)
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
err = pubsub.Subscribe(fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler)
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
channel string
|
||||
subtopic string
|
||||
payload []byte
|
||||
}{
|
||||
{
|
||||
desc: "publish message with nil payload",
|
||||
payload: nil,
|
||||
},
|
||||
{
|
||||
desc: "publish message with string payload",
|
||||
payload: data,
|
||||
},
|
||||
{
|
||||
desc: "publish message with channel",
|
||||
payload: data,
|
||||
channel: channel,
|
||||
},
|
||||
{
|
||||
desc: "publish message with subtopic",
|
||||
payload: data,
|
||||
subtopic: subtopic,
|
||||
},
|
||||
{
|
||||
desc: "publish message with channel and subtopic",
|
||||
payload: data,
|
||||
channel: channel,
|
||||
subtopic: subtopic,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
expectedMsg := messaging.Message{
|
||||
Channel: tc.channel,
|
||||
Subtopic: tc.subtopic,
|
||||
Payload: tc.payload,
|
||||
}
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
err = pubsub.Publish(topic, expectedMsg)
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
receivedMsg := <-msgChan
|
||||
assert.Equal(t, expectedMsg, receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, expectedMsg, receivedMsg))
|
||||
}
|
||||
}
|
||||
|
||||
func handler(msg messaging.Message) error {
|
||||
msgChan <- msg
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright (c) Mainflux
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package nats_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
"github.com/mainflux/mainflux/pkg/messaging/nats"
|
||||
dockertest "github.com/ory/dockertest/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
publisher messaging.Publisher
|
||||
pubsub messaging.PubSub
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
pool, err := dockertest.NewPool("")
|
||||
if err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
container, err := pool.Run("nats", "1.3.0", []string{})
|
||||
if err != nil {
|
||||
log.Fatalf("Could not start container: %s", err)
|
||||
}
|
||||
handleInterrupt(pool, container)
|
||||
|
||||
address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("4222/tcp"))
|
||||
if err := pool.Retry(func() error {
|
||||
publisher, err = nats.NewPublisher(address)
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
logger, err := logger.New(os.Stdout, "error")
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
if err := pool.Retry(func() error {
|
||||
pubsub, err = nats.NewPubSub(address, "", logger)
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
code := m.Run()
|
||||
if err := pool.Purge(container); err != nil {
|
||||
log.Fatalf("Could not purge container: %s", err)
|
||||
}
|
||||
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-c
|
||||
if err := pool.Purge(container); err != nil {
|
||||
log.Fatalf("Could not purge container: %s", err)
|
||||
}
|
||||
os.Exit(0)
|
||||
}()
|
||||
}
|
Loading…
Reference in New Issue