MF-1061 - Add name, protocol and publisher tests to influxdb-reader (#1320)

* MF-1061 - Add name, protocol and publisher tests to influxdb-reader

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix typo

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use short package aliases

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2021-01-11 10:53:38 +01:00 committed by GitHub
parent ac09815457
commit a185855c06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 54 deletions

View File

@ -136,38 +136,32 @@ func fmtCondition(chanID string, query map[string]string) string {
case
"channel",
"subtopic",
"publisher":
condition = fmt.Sprintf(`%s AND %s='%s'`, condition, name,
strings.Replace(value, "'", "\\'", -1))
case
"publisher",
"name",
"protocol":
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name,
strings.Replace(value, "\"", "\\\"", -1))
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, value)
case "v":
condition = fmt.Sprintf(`%s AND value = %s`, condition, value)
condition = fmt.Sprintf(`%s AND "value" = %s`, condition, value)
case "vb":
condition = fmt.Sprintf(`%s AND boolValue = %s`, condition, value)
condition = fmt.Sprintf(`%s AND "boolValue" = %s`, condition, value)
case "vs":
condition = fmt.Sprintf(`%s AND "stringValue"='%s'`, condition,
strings.Replace(value, "\"", "\\\"", -1))
condition = fmt.Sprintf(`%s AND "stringValue"='%s'`, condition, value)
case "vd":
condition = fmt.Sprintf(`%s AND "dataValue"='%s'`, condition,
strings.Replace(value, "\"", "\\\"", -1))
condition = fmt.Sprintf(`%s AND "dataValue"='%s'`, condition, value)
case "from":
fVal, err := strconv.ParseFloat(value, 64)
if err != nil {
continue
}
iVal := int64(fVal * 1e9)
condition = fmt.Sprintf(`%s AND time >= %d`, condition, iVal)
condition = fmt.Sprintf(`%s AND "time" >= %d`, condition, iVal)
case "to":
fVal, err := strconv.ParseFloat(value, 64)
if err != nil {
continue
}
iVal := int64(fVal * 1e9)
condition = fmt.Sprintf(`%s AND time < %d`, condition, iVal)
condition = fmt.Sprintf(`%s AND "time" < %d`, condition, iVal)
}
}
return condition

View File

@ -2,28 +2,29 @@ package influxdb_test
import (
"fmt"
"os"
"testing"
"time"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux/pkg/transformers/senml"
uuidProvider "github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
reader "github.com/mainflux/mainflux/readers/influxdb"
writer "github.com/mainflux/mainflux/writers/influxdb"
ireader "github.com/mainflux/mainflux/readers/influxdb"
iwriter "github.com/mainflux/mainflux/writers/influxdb"
log "github.com/mainflux/mainflux/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testDB = "test"
chanID = "1"
subtopic = "topic"
msgsNum = 100
msgsValNum = 20
limit = 10
testDB = "test"
subtopic = "topic"
msgsNum = 100
limit = 10
valueFields = 5
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
)
var (
@ -32,38 +33,36 @@ var (
vb = true
vd = "dataValue"
sum float64 = 42
client influxdata.Client
)
var (
valueFields = 5
port string
client influxdata.Client
testLog, _ = log.New(os.Stdout, log.Info.String())
func TestReadAll(t *testing.T) {
writer := iwriter.New(client, testDB)
clientCfg = influxdata.HTTPConfig{
Username: "test",
Password: "test",
}
chanID, err := uuidProvider.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubID, err := uuidProvider.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pub2ID, err := uuidProvider.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m = senml.Message{
m := senml.Message{
Channel: chanID,
Publisher: "1",
Protocol: "mqtt",
Publisher: pubID,
Protocol: mqttProt,
Name: "name",
Unit: "U",
Time: 123456,
UpdateTime: 1234,
}
)
func TestReadAll(t *testing.T) {
writer := writer.New(client, testDB)
messages := []senml.Message{}
valSubtopicMsgs := []senml.Message{}
valueMsgs := []senml.Message{}
boolMsgs := []senml.Message{}
stringMsgs := []senml.Message{}
dataMsgs := []senml.Message{}
queryMsgs := []senml.Message{}
now := float64(time.Now().UTC().Second())
for i := 0; i < msgsNum; i++ {
@ -74,9 +73,8 @@ func TestReadAll(t *testing.T) {
count := i % valueFields
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &v
valSubtopicMsgs = append(valSubtopicMsgs, msg)
valueMsgs = append(valueMsgs, msg)
case 1:
msg.BoolValue = &vb
boolMsgs = append(boolMsgs, msg)
@ -88,15 +86,20 @@ func TestReadAll(t *testing.T) {
dataMsgs = append(dataMsgs, msg)
case 4:
msg.Sum = &sum
msg.Subtopic = subtopic
msg.Protocol = httpProt
msg.Publisher = pub2ID
msg.Name = msgName
queryMsgs = append(queryMsgs, msg)
}
messages = append(messages, msg)
}
err := writer.Save(messages)
err = writer.Save(messages)
require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err))
reader := reader.New(client, testDB)
reader := ireader.New(client, testDB)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err))
cases := map[string]struct {
@ -118,7 +121,7 @@ func TestReadAll(t *testing.T) {
},
},
"read message page for non-existent channel": {
chanID: "2",
chanID: "wrong",
offset: 0,
limit: limit,
page: readers.MessagesPage{
@ -157,10 +160,46 @@ func TestReadAll(t *testing.T) {
limit: limit,
query: map[string]string{"subtopic": subtopic},
page: readers.MessagesPage{
Total: uint64(len(valSubtopicMsgs)),
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(valSubtopicMsgs[0:limit]),
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with publisher": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"publisher": pub2ID},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with protocol": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"protocol": httpProt},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with name": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"name": msgName},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with value": {
@ -169,10 +208,10 @@ func TestReadAll(t *testing.T) {
limit: limit,
query: map[string]string{"v": fmt.Sprintf("%f", v)},
page: readers.MessagesPage{
Total: msgsValNum,
Total: uint64(len(valueMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(valSubtopicMsgs[0:limit]),
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with boolean value": {
@ -181,7 +220,7 @@ func TestReadAll(t *testing.T) {
limit: limit,
query: map[string]string{"vb": fmt.Sprintf("%t", vb)},
page: readers.MessagesPage{
Total: msgsValNum,
Total: uint64(len(boolMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(boolMsgs[0:limit]),
@ -193,7 +232,7 @@ func TestReadAll(t *testing.T) {
limit: limit,
query: map[string]string{"vs": vs},
page: readers.MessagesPage{
Total: msgsValNum,
Total: uint64(len(stringMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(stringMsgs[0:limit]),
@ -205,7 +244,7 @@ func TestReadAll(t *testing.T) {
limit: limit,
query: map[string]string{"vd": vd},
page: readers.MessagesPage{
Total: msgsValNum,
Total: uint64(len(dataMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(dataMsgs[0:limit]),

View File

@ -6,10 +6,21 @@ import (
"testing"
"time"
influxdata "github.com/influxdata/influxdb/client/v2"
influxdb "github.com/influxdata/influxdb/client/v2"
log "github.com/mainflux/mainflux/logger"
dockertest "github.com/ory/dockertest/v3"
)
var (
testLog, _ = log.New(os.Stdout, log.Info.String())
clientCfg = influxdata.HTTPConfig{
Username: "test",
Password: "test",
}
)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
@ -26,7 +37,7 @@ func TestMain(m *testing.M) {
testLog.Error(fmt.Sprintf("Could not start container: %s", err))
}
port = container.GetPort("8086/tcp")
port := container.GetPort("8086/tcp")
clientCfg.Addr = fmt.Sprintf("http://localhost:%s", port)
if err := pool.Retry(func() error {