MF-1061 - Implement v, vb, vs, vd and from/to cassandra-reader filters (#1325)

* MF-1061 - Implement v, vb, vs, vd and from/to cassandra-reader filters

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Revert JSON implementation

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2021-01-12 20:39:31 +01:00 committed by GitHub
parent 6b7dc54c8b
commit 8e5a9cfc9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 205 additions and 71 deletions

View File

@ -6,6 +6,7 @@ package cassandra
import (
"encoding/json"
"fmt"
"strconv"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux/pkg/errors"
@ -43,16 +44,18 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
// Remove format filter and format the rest properly.
delete(query, format)
names := []string{}
vals := []interface{}{chanID}
for name, val := range query {
names = append(names, name)
vals = append(vals, val)
}
vals = append(vals, offset+limit)
q, vals := buildQuery(chanID, offset, limit, query)
selectCQL := buildSelectQuery(table, chanID, offset, limit, names)
countCQL := buildCountQuery(table, chanID, names)
selectCQL := fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, name, unit,
value, string_value, bool_value, data_value, sum, time,
update_time FROM messages WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`, q)
if table != defTable {
selectCQL = fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, created, payload FROM %s WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`, table, q)
}
countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, defTable, q)
iter := cr.session.Query(selectCQL, vals...).Iter()
defer iter.Close()
@ -106,17 +109,11 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
return page, nil
}
func buildSelectQuery(table, chanID string, offset, limit uint64, names []string) string {
func buildQuery(chanID string, offset, limit uint64, query map[string]string) (string, []interface{}) {
var condCQL string
cql := `SELECT channel, subtopic, publisher, protocol, name, unit,
value, string_value, bool_value, data_value, sum, time,
update_time FROM messages WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`
if table != defTable {
cql = fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, created, payload FROM %s WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`, table, "%s")
}
for _, name := range names {
vals := []interface{}{chanID}
for name, val := range query {
switch name {
case
"channel",
@ -124,30 +121,47 @@ func buildSelectQuery(table, chanID string, offset, limit uint64, names []string
"publisher",
"name",
"protocol":
vals = append(vals, val)
condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name)
case "v":
fVal, err := strconv.ParseFloat(val, 64)
if err != nil {
continue
}
vals = append(vals, fVal)
condCQL = fmt.Sprintf(`%s AND value = ?`, condCQL)
case "vb":
bVal, err := strconv.ParseBool(val)
if err != nil {
continue
}
vals = append(vals, bVal)
condCQL = fmt.Sprintf(`%s AND bool_value = ?`, condCQL)
case "vs":
vals = append(vals, val)
condCQL = fmt.Sprintf(`%s AND string_value = ?`, condCQL)
case "vd":
vals = append(vals, val)
condCQL = fmt.Sprintf(`%s AND data_value = ?`, condCQL)
case "from":
fVal, err := strconv.ParseFloat(val, 64)
if err != nil {
continue
}
vals = append(vals, fVal)
condCQL = fmt.Sprintf(`%s AND time >= ?`, condCQL)
case "to":
fVal, err := strconv.ParseFloat(val, 64)
if err != nil {
continue
}
vals = append(vals, fVal)
condCQL = fmt.Sprintf(`%s AND time < ?`, condCQL)
}
}
vals = append(vals, offset+limit)
return fmt.Sprintf(cql, condCQL)
}
func buildCountQuery(table, chanID string, names []string) string {
var condCQL string
cql := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, table, "%s")
for _, name := range names {
switch name {
case
"channel",
"subtopic",
"publisher",
"name",
"protocol":
condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name)
}
}
return fmt.Sprintf(cql, condCQL)
return condCQL, vals
}
type jsonMessage struct {

View File

@ -10,6 +10,7 @@ import (
writer "github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
reader "github.com/mainflux/mainflux/readers/cassandra"
"github.com/stretchr/testify/assert"
@ -18,27 +19,23 @@ import (
const (
keyspace = "mainflux"
chanID = "1"
subtopic = "subtopic"
msgsNum = 42
msgsNum = 100
limit = 10
valueFields = 5
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
)
var (
addr = "localhost"
msg = senml.Message{
Channel: chanID,
Publisher: "1",
Protocol: "mqtt",
}
)
var (
v float64 = 5
stringV = "value"
boolV = true
dataV = "base64"
sum float64 = 42
v float64 = 5
vs = "value"
vb = true
vd = "base64"
sum float64 = 42
)
func TestReadSenml(t *testing.T) {
@ -50,32 +47,56 @@ func TestReadSenml(t *testing.T) {
defer session.Close()
writer := writer.New(session)
chanID, err := uuid.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubID, err := uuid.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pub2ID, err := uuid.New().ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m := senml.Message{
Channel: chanID,
Publisher: pubID,
Protocol: mqttProt,
}
messages := []senml.Message{}
subtopicMsgs := []senml.Message{}
now := time.Now().Unix()
valueMsgs := []senml.Message{}
boolMsgs := []senml.Message{}
stringMsgs := []senml.Message{}
dataMsgs := []senml.Message{}
queryMsgs := []senml.Message{}
now := float64(time.Now().Unix())
for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum.
msg := m
msg.Time = now - float64(i)
count := i % valueFields
msg.Subtopic = ""
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &v
valueMsgs = append(valueMsgs, msg)
case 1:
msg.BoolValue = &boolV
msg.BoolValue = &vb
boolMsgs = append(boolMsgs, msg)
case 2:
msg.StringValue = &stringV
msg.StringValue = &vs
stringMsgs = append(stringMsgs, msg)
case 3:
msg.DataValue = &dataV
msg.DataValue = &vd
dataMsgs = append(dataMsgs, msg)
case 4:
msg.Sum = &sum
msg.Subtopic = subtopic
msg.Protocol = httpProt
msg.Publisher = pub2ID
msg.Name = msgName
queryMsgs = append(queryMsgs, msg)
}
msg.Time = float64(now - int64(i))
messages = append(messages, msg)
if count == 0 {
subtopicMsgs = append(subtopicMsgs, msg)
}
}
err = writer.Consume(messages)
@ -117,13 +138,13 @@ func TestReadSenml(t *testing.T) {
},
"read message last page": {
chanID: chanID,
offset: 40,
limit: 5,
offset: 95,
limit: limit,
page: readers.MessagesPage{
Total: msgsNum,
Offset: 40,
Limit: 5,
Messages: fromSenml(messages[40:42]),
Offset: 95,
Limit: limit,
Messages: fromSenml(messages[95:msgsNum]),
},
},
"read message with non-existent subtopic": {
@ -144,10 +165,109 @@ func TestReadSenml(t *testing.T) {
limit: msgsNum,
query: map[string]string{"subtopic": subtopic},
page: readers.MessagesPage{
Total: uint64(len(subtopicMsgs)),
Total: uint64(len(queryMsgs)),
Offset: 5,
Limit: msgsNum,
Messages: fromSenml(subtopicMsgs[5:]),
Messages: fromSenml(queryMsgs[5:]),
},
},
"read message with publisher": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"publisher": pub2ID},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with protocol": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"protocol": httpProt},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with name": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"name": msgName},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"v": fmt.Sprintf("%f", v)},
page: readers.MessagesPage{
Total: uint64(len(valueMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with boolean value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vb": fmt.Sprintf("%t", vb)},
page: readers.MessagesPage{
Total: uint64(len(boolMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(boolMsgs[0:limit]),
},
},
"read message with string value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vs": vs},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(stringMsgs[0:limit]),
},
},
"read message with data value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vd": vd},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Offset: 0,
Limit: limit,
Messages: fromSenml(dataMsgs[0:limit]),
},
},
"read message with from/to": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{
"from": fmt.Sprintf("%f", messages[5].Time),
"to": fmt.Sprintf("%f", messages[0].Time),
},
page: readers.MessagesPage{
Total: 5,
Offset: 0,
Limit: limit,
Messages: fromSenml(messages[1:6]),
},
},
}