diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index b53cf39f..6841418c 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -53,10 +53,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( Database: repo.database, } - var ret []readers.Message - resp, err := repo.client.Query(q) - if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } @@ -64,17 +61,18 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( return readers.MessagesPage{}, errors.Wrap(errReadMessages, resp.Error()) } - if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 { + if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 { return readers.MessagesPage{}, nil } + var messages []readers.Message result := resp.Results[0].Series[0] for _, v := range result.Values { msg, err := parseMessage(format, result.Columns, v) if err != nil { return readers.MessagesPage{}, err } - ret = append(ret, msg) + messages = append(messages, msg) } total, err := repo.count(format, condition) @@ -85,7 +83,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( page := readers.MessagesPage{ PageMetadata: rpm, Total: total, - Messages: ret, + Messages: messages, } return page, nil @@ -106,9 +104,9 @@ func (repo *influxRepository) count(measurement, condition string) (uint64, erro return 0, resp.Error() } - if len(resp.Results) < 1 || - len(resp.Results[0].Series) < 1 || - len(resp.Results[0].Series[0].Values) < 1 { + if len(resp.Results) == 0 || + len(resp.Results[0].Series) == 0 || + len(resp.Results[0].Series[0].Values) == 0 { return 0, nil } @@ -140,7 +138,10 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string { if err != nil { return condition } - json.Unmarshal(meta, &query) + + if err := json.Unmarshal(meta, &query); err != nil { + return condition + } for name, value := range query { switch name { @@ -239,19 +240,27 @@ func parseSenml(names []string, fields []interface{}) interface{} { msgField.SetString(s) } case float64: + fs, ok := fields[i].(string) + if !ok { + continue + } + if name == "time" { - t, err := time.Parse(time.RFC3339Nano, fields[i].(string)) + t, err := time.Parse(time.RFC3339Nano, fs) if err != nil { continue } - v := float64(t.UnixNano()) / float64(1e9) + v := float64(t.UnixNano()) / 1e9 msgField.SetFloat(v) continue } - val, _ := strconv.ParseFloat(fields[i].(string), 64) - msgField.SetFloat(val) + v, err := strconv.ParseFloat(fs, 64) + if err != nil { + continue + } + msgField.SetFloat(v) } } diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 034902a3..882a5ae9 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -2,6 +2,7 @@ package influxdb_test import ( "fmt" + "math/rand" "testing" "time" @@ -26,6 +27,7 @@ const ( mqttProt = "mqtt" httpProt = "http" msgName = "temperature" + offset = 21 format1 = "format1" format2 = "format2" @@ -61,7 +63,6 @@ func TestReadAll(t *testing.T) { Protocol: mqttProt, Name: "name", Unit: "U", - Time: 123456, UpdateTime: 1234, } @@ -71,7 +72,9 @@ func TestReadAll(t *testing.T) { stringMsgs := []senml.Message{} dataMsgs := []senml.Message{} queryMsgs := []senml.Message{} - now := float64(time.Now().UTC().Second()) + rand.Seed(time.Now().UnixNano()) + to := msgsNum + now := float64(rand.Intn(to) + offset) for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. @@ -335,24 +338,24 @@ func TestReadAll(t *testing.T) { chanID: chanID, pageMeta: readers.PageMetadata{ Offset: 0, - Limit: uint64(len(messages[0:21])), - From: messages[20].Time, + Limit: uint64(len(messages[0:offset])), + From: messages[offset-1].Time, }, page: readers.MessagesPage{ - Total: uint64(len(messages[0:21])), - Messages: fromSenml(messages[0:21]), + Total: uint64(len(messages[0:offset])), + Messages: fromSenml(messages[0:offset]), }, }, "read message with to": { chanID: chanID, pageMeta: readers.PageMetadata{ Offset: 0, - Limit: uint64(len(messages[21:])), - To: messages[20].Time, + Limit: uint64(len(messages[offset:])), + To: messages[offset-1].Time, }, page: readers.MessagesPage{ - Total: uint64(len(messages[21:])), - Messages: fromSenml(messages[21:]), + Total: uint64(len(messages[offset:])), + Messages: fromSenml(messages[offset:]), }, }, "read message with from/to": { @@ -498,6 +501,7 @@ func TestReadJSON(t *testing.T) { for desc, tc := range cases { result, err := reader.ReadAll(tc.chanID, tc.pageMeta) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) for i := 0; i < len(result.Messages); i++ { m := result.Messages[i] @@ -506,7 +510,6 @@ func TestReadJSON(t *testing.T) { result.Messages[i] = m } - assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected \n%v got \n%v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) } @@ -530,8 +533,7 @@ func fromJSON(msg []map[string]interface{}) []readers.Message { func toMap(msg json.Message) map[string]interface{} { return map[string]interface{}{ - "channel": msg.Channel, - // "created": msg.Created, + "channel": msg.Channel, "subtopic": msg.Subtopic, "publisher": msg.Publisher, "protocol": msg.Protocol,