From b2ccbaec2724e848a50ea53445d09145c271285f Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Fri, 25 Dec 2020 19:23:54 +0100 Subject: [PATCH] MF-1061 - Implement InfluxDB filters value, v, vb, vs, vd, from, to (#1312) * MF-1061 - Implement InfluxDB filters value, v, vb, vs, vd, from, to Signed-off-by: Manuel Imperiale * Use time filters as float64 instead of int64 Signed-off-by: Manuel Imperiale * Fix reviews Signed-off-by: Manuel Imperiale * Remove unnecessary cast Signed-off-by: Manuel Imperiale * Use a const for limit in tests Signed-off-by: Manuel Imperiale * Fix typo Signed-off-by: Manuel Imperiale * Revert float64 cast when dividing Signed-off-by: Manuel Imperiale * Remove value filter in favour to v Signed-off-by: Manuel Imperiale * Use v, vb, vs, vd Signed-off-by: Manuel Imperiale * Use v, vb, vs, vd Signed-off-by: Manuel Imperiale * Rm unecessary cast Signed-off-by: Manuel Imperiale --- readers/api/endpoint_test.go | 46 +++++++++-- readers/api/transport.go | 2 +- readers/influxdb/messages.go | 24 ++++++ readers/influxdb/messages_test.go | 132 +++++++++++++++++++++++------- readers/openapi.yml | 48 +++++++++++ 5 files changed, 213 insertions(+), 39 deletions(-) diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go index afde4373..4f06dbfc 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/endpoint_test.go @@ -27,11 +27,11 @@ const ( ) var ( - v float64 = 5 - stringV = "value" - boolV = true - dataV = "base64" - sum float64 = 42 + v float64 = 5 + vs = "value" + vb = true + vd = "dataValue" + sum float64 = 42 ) func newService() readers.MessageRepository { @@ -49,11 +49,11 @@ func newService() readers.MessageRepository { case 0: msg.Value = &v case 1: - msg.BoolValue = &boolV + msg.BoolValue = &vb case 2: - msg.StringValue = &stringV + msg.StringValue = &vs case 3: - msg.DataValue = &dataV + msg.DataValue = &vd case 4: msg.Sum = &sum } @@ -166,6 +166,36 @@ func TestReadAll(t *testing.T) { token: token, status: http.StatusOK, }, + "read page with value": { + url: fmt.Sprintf("%s/channels/%s/messages?v=%f", ts.URL, chanID, v), + token: token, + status: http.StatusOK, + }, + "read page with boolean value": { + url: fmt.Sprintf("%s/channels/%s/messages?vb=%t", ts.URL, chanID, vb), + token: token, + status: http.StatusOK, + }, + "read page with string value": { + url: fmt.Sprintf("%s/channels/%s/messages?vs=%s", ts.URL, chanID, vd), + token: token, + status: http.StatusOK, + }, + "read page with data value": { + url: fmt.Sprintf("%s/channels/%s/messages?vd=%s", ts.URL, chanID, vd), + token: token, + status: http.StatusOK, + }, + "read page with from": { + url: fmt.Sprintf("%s/channels/%s/messages?from=1608651539.673909", ts.URL, chanID), + token: token, + status: http.StatusOK, + }, + "read page with to": { + url: fmt.Sprintf("%s/channels/%s/messages?to=1508651539.673909", ts.URL, chanID), + token: token, + status: http.StatusOK, + }, } for desc, tc := range cases { diff --git a/readers/api/transport.go b/readers/api/transport.go index e8130b39..7fce57e6 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -30,7 +30,7 @@ var ( errInvalidRequest = errors.New("received invalid request") errUnauthorizedAccess = errors.New("missing or invalid credentials provided") auth mainflux.ThingsServiceClient - queryFields = []string{"subtopic", "publisher", "protocol", "name", "value", "v", "vs", "vb", "vd"} + queryFields = []string{"subtopic", "publisher", "protocol", "name", "v", "vs", "vb", "vd", "from", "to"} ) // MakeHandler returns a HTTP handler for API endpoints. diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index 1f588996..1f2951c3 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -131,6 +131,30 @@ func fmtCondition(chanID string, query map[string]string) string { "protocol": condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, strings.Replace(value, "\"", "\\\"", -1)) + case "v": + condition = fmt.Sprintf(`%s AND value = %s`, condition, value) + case "vb": + condition = fmt.Sprintf(`%s AND boolValue = %s`, condition, value) + case "vs": + condition = fmt.Sprintf(`%s AND "stringValue"='%s'`, condition, + strings.Replace(value, "\"", "\\\"", -1)) + case "vd": + condition = fmt.Sprintf(`%s AND "dataValue"='%s'`, condition, + strings.Replace(value, "\"", "\\\"", -1)) + case "from": + fVal, err := strconv.ParseFloat(value, 64) + if err != nil { + continue + } + iVal := int64(fVal * 1e9) + condition = fmt.Sprintf(`%s AND time >= %d`, condition, iVal) + case "to": + fVal, err := strconv.ParseFloat(value, 64) + if err != nil { + continue + } + iVal := int64(fVal * 1e9) + condition = fmt.Sprintf(`%s AND time < %d`, condition, iVal) } } return condition diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 1702ba3d..33ab8a56 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -18,18 +18,21 @@ import ( ) const ( - testDB = "test" - chanID = "1" - subtopic = "topic" - msgsNum = 101 + testDB = "test" + chanID = "1" + subtopic = "topic" + msgsNum = 100 + fromToNum = 4 + msgsValNum = 20 + limit = 10 ) var ( - v float64 = 5 - stringV = "value" - boolV = true - dataV = "base64" - sum float64 = 42 + v float64 = 5 + vs = "value" + vb = true + vd = "dataValue" + sum float64 = 42 ) var ( @@ -58,31 +61,37 @@ func TestReadAll(t *testing.T) { writer := writer.New(client, testDB) messages := []senml.Message{} - subtopicMsgs := []senml.Message{} + valSubtopicMsgs := []senml.Message{} + boolMsgs := []senml.Message{} + stringMsgs := []senml.Message{} + dataMsgs := []senml.Message{} + now := time.Now().UnixNano() for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. - count := i % valueFields msg := m + msg.Time = float64(now)/1e9 - float64(i) + + count := i % valueFields switch count { case 0: msg.Subtopic = subtopic msg.Value = &v + valSubtopicMsgs = append(valSubtopicMsgs, msg) case 1: - msg.BoolValue = &boolV + msg.BoolValue = &vb + boolMsgs = append(boolMsgs, msg) case 2: - msg.StringValue = &stringV + msg.StringValue = &vs + stringMsgs = append(stringMsgs, msg) case 3: - msg.DataValue = &dataV + msg.DataValue = &vd + dataMsgs = append(dataMsgs, msg) case 4: msg.Sum = &sum } - msg.Time = float64(now)/float64(1e9) - float64(i) messages = append(messages, msg) - if count == 0 { - subtopicMsgs = append(subtopicMsgs, msg) - } } err := writer.Save(messages...) @@ -101,34 +110,34 @@ func TestReadAll(t *testing.T) { "read message page for existing channel": { chanID: chanID, offset: 0, - limit: 10, + limit: limit, page: readers.MessagesPage{ Total: msgsNum, Offset: 0, - Limit: 10, - Messages: messages[0:10], + Limit: limit, + Messages: messages[0:limit], }, }, "read message page for non-existent channel": { chanID: "2", offset: 0, - limit: 10, + limit: limit, page: readers.MessagesPage{ Total: 0, Offset: 0, - Limit: 10, + Limit: limit, Messages: []senml.Message{}, }, }, "read message last page": { chanID: chanID, offset: 95, - limit: 10, + limit: limit, page: readers.MessagesPage{ Total: msgsNum, Offset: 95, - Limit: 10, - Messages: messages[95:101], + Limit: limit, + Messages: messages[95:msgsNum], }, }, "read message with non-existent subtopic": { @@ -146,13 +155,76 @@ func TestReadAll(t *testing.T) { "read message with subtopic": { chanID: chanID, offset: 0, - limit: 10, + limit: limit, query: map[string]string{"subtopic": subtopic}, page: readers.MessagesPage{ - Total: uint64(len(subtopicMsgs)), + Total: uint64(len(valSubtopicMsgs)), Offset: 0, - Limit: 10, - Messages: subtopicMsgs[0:10], + Limit: limit, + Messages: valSubtopicMsgs[0:limit], + }, + }, + "read message with value": { + chanID: chanID, + offset: 0, + limit: limit, + query: map[string]string{"v": fmt.Sprintf("%f", v)}, + page: readers.MessagesPage{ + Total: msgsValNum, + Offset: 0, + Limit: limit, + Messages: valSubtopicMsgs[0:limit], + }, + }, + "read message with boolean value": { + chanID: chanID, + offset: 0, + limit: limit, + query: map[string]string{"vb": fmt.Sprintf("%t", vb)}, + page: readers.MessagesPage{ + Total: msgsValNum, + Offset: 0, + Limit: limit, + Messages: boolMsgs[0:limit], + }, + }, + "read message with string value": { + chanID: chanID, + offset: 0, + limit: limit, + query: map[string]string{"vs": vs}, + page: readers.MessagesPage{ + Total: msgsValNum, + Offset: 0, + Limit: limit, + Messages: stringMsgs[0:limit], + }, + }, + "read message with data value": { + chanID: chanID, + offset: 0, + limit: limit, + query: map[string]string{"vd": vd}, + page: readers.MessagesPage{ + Total: msgsValNum, + Offset: 0, + Limit: limit, + Messages: dataMsgs[0:limit], + }, + }, + "read message with from/to": { + chanID: chanID, + offset: 0, + limit: limit, + query: map[string]string{ + "from": fmt.Sprintf("%f", messages[fromToNum].Time), + "to": fmt.Sprintf("%f", messages[0].Time), + }, + page: readers.MessagesPage{ + Total: fromToNum, + Offset: 0, + Limit: limit, + Messages: messages[1:5], }, }, } diff --git a/readers/openapi.yml b/readers/openapi.yml index ae047992..8629de61 100644 --- a/readers/openapi.yml +++ b/readers/openapi.yml @@ -22,6 +22,12 @@ paths: - $ref: "#/components/parameters/Offset" - $ref: "#/components/parameters/Publisher" - $ref: "#/components/parameters/Name" + - $ref: "#/components/parameters/Value" + - $ref: "#/components/parameters/BoolValue" + - $ref: "#/components/parameters/StringValue" + - $ref: "#/components/parameters/DataValue" + - $ref: "#/components/parameters/From" + - $ref: "#/components/parameters/To" responses: '200': $ref: "#/components/responses/MessagesPageRes" @@ -140,6 +146,48 @@ components: schema: type: string required: false + Value: + name: v + description: SenML message value. + in: query + schema: + type: string + required: false + BoolValue: + name: vb + description: SenML message bool value. + in: query + schema: + type: boolean + required: false + StringValue: + name: vs + description: SenML message string value. + in: query + schema: + type: string + required: false + DataValue: + name: vd + description: SenML message data value. + in: query + schema: + type: string + required: false + From: + name: from + description: SenML message time in nanoseconds (integer part represents seconds). + in: query + schema: + type: float + required: false + To: + name: to + description: SenML message time in nanoseconds (integer part represents seconds). + in: query + schema: + type: float + required: false responses: MessagesPageRes: