NOISSUE - Refactor InfluxDB Reader: explicit check event + add safe conversion (#1460)

* refactore(reader/influxdb): explicit check event + add safe conversion

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion

Signed-off-by: tzzed <zerouali.t@gmail.com>

* Delete useless space.

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion, replace require.NoError(t, err) by assert.Nil

Signed-off-by: tzzed <zerouali.t@gmail.com>

* Fix tests when message time is randomly equal to 0 in tests cases.

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion use constant

Signed-off-by: tzzed <zerouali.t@gmail.com>

* use const offset

Signed-off-by: tzzed <zerouali.t@gmail.com>

Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
__touk__ 2021-11-08 12:26:11 +01:00 committed by GitHub
parent b570c38ed0
commit 9f5a319519
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 27 deletions

View File

@ -53,10 +53,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
Database: repo.database,
}
var ret []readers.Message
resp, err := repo.client.Query(q)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
@ -64,17 +61,18 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
return readers.MessagesPage{}, errors.Wrap(errReadMessages, resp.Error())
}
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
return readers.MessagesPage{}, nil
}
var messages []readers.Message
result := resp.Results[0].Series[0]
for _, v := range result.Values {
msg, err := parseMessage(format, result.Columns, v)
if err != nil {
return readers.MessagesPage{}, err
}
ret = append(ret, msg)
messages = append(messages, msg)
}
total, err := repo.count(format, condition)
@ -85,7 +83,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
page := readers.MessagesPage{
PageMetadata: rpm,
Total: total,
Messages: ret,
Messages: messages,
}
return page, nil
@ -106,9 +104,9 @@ func (repo *influxRepository) count(measurement, condition string) (uint64, erro
return 0, resp.Error()
}
if len(resp.Results) < 1 ||
len(resp.Results[0].Series) < 1 ||
len(resp.Results[0].Series[0].Values) < 1 {
if len(resp.Results) == 0 ||
len(resp.Results[0].Series) == 0 ||
len(resp.Results[0].Series[0].Values) == 0 {
return 0, nil
}
@ -140,7 +138,10 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
if err != nil {
return condition
}
json.Unmarshal(meta, &query)
if err := json.Unmarshal(meta, &query); err != nil {
return condition
}
for name, value := range query {
switch name {
@ -239,19 +240,27 @@ func parseSenml(names []string, fields []interface{}) interface{} {
msgField.SetString(s)
}
case float64:
fs, ok := fields[i].(string)
if !ok {
continue
}
if name == "time" {
t, err := time.Parse(time.RFC3339Nano, fields[i].(string))
t, err := time.Parse(time.RFC3339Nano, fs)
if err != nil {
continue
}
v := float64(t.UnixNano()) / float64(1e9)
v := float64(t.UnixNano()) / 1e9
msgField.SetFloat(v)
continue
}
val, _ := strconv.ParseFloat(fields[i].(string), 64)
msgField.SetFloat(val)
v, err := strconv.ParseFloat(fs, 64)
if err != nil {
continue
}
msgField.SetFloat(v)
}
}

View File

@ -2,6 +2,7 @@ package influxdb_test
import (
"fmt"
"math/rand"
"testing"
"time"
@ -26,6 +27,7 @@ const (
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
offset = 21
format1 = "format1"
format2 = "format2"
@ -61,7 +63,6 @@ func TestReadAll(t *testing.T) {
Protocol: mqttProt,
Name: "name",
Unit: "U",
Time: 123456,
UpdateTime: 1234,
}
@ -71,7 +72,9 @@ func TestReadAll(t *testing.T) {
stringMsgs := []senml.Message{}
dataMsgs := []senml.Message{}
queryMsgs := []senml.Message{}
now := float64(time.Now().UTC().Second())
rand.Seed(time.Now().UnixNano())
to := msgsNum
now := float64(rand.Intn(to) + offset)
for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum.
@ -335,24 +338,24 @@ func TestReadAll(t *testing.T) {
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[0:21])),
From: messages[20].Time,
Limit: uint64(len(messages[0:offset])),
From: messages[offset-1].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[0:21])),
Messages: fromSenml(messages[0:21]),
Total: uint64(len(messages[0:offset])),
Messages: fromSenml(messages[0:offset]),
},
},
"read message with to": {
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[21:])),
To: messages[20].Time,
Limit: uint64(len(messages[offset:])),
To: messages[offset-1].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[21:])),
Messages: fromSenml(messages[21:]),
Total: uint64(len(messages[offset:])),
Messages: fromSenml(messages[offset:]),
},
},
"read message with from/to": {
@ -498,6 +501,7 @@ func TestReadJSON(t *testing.T) {
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
for i := 0; i < len(result.Messages); i++ {
m := result.Messages[i]
@ -506,7 +510,6 @@ func TestReadJSON(t *testing.T) {
result.Messages[i] = m
}
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected \n%v got \n%v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
@ -530,8 +533,7 @@ func fromJSON(msg []map[string]interface{}) []readers.Message {
func toMap(msg json.Message) map[string]interface{} {
return map[string]interface{}{
"channel": msg.Channel,
// "created": msg.Created,
"channel": msg.Channel,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,