MF-1421 - Make flattening of JSON transformer only available on InfluxDB (#1432)

* MF-1421 - Add a flag for making flattening JSON transformer optional

Signed-off-by: Burak Sekili <buraksekili@gmail.com>

* Add test cases for JSON transformer without flattening

Signed-off-by: Burak Sekili <buraksekili@gmail.com>

* Add a comment for Transform

Signed-off-by: Burak Sekili <buraksekili@gmail.com>

* Separate TestTransformJSON into two tests

Signed-off-by: Burak Sekili <buraksekili@gmail.com>

* Replace flatten flag

Signed-off-by: Burak Sekili <buraksekili@gmail.com>

* Remove unnecessary flattening while reading a message

Signed-off-by: Burak Sekili <buraksekili@gmail.com>
This commit is contained in:
Burak Sekili 2021-07-22 12:20:47 +03:00 committed by GitHub
parent 19f0437f57
commit 5ac1203b55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 102 additions and 45 deletions

View File

@ -83,6 +83,12 @@ func (repo *influxRepo) jsonPoints(pts influxdata.BatchPoints, msgs json.Message
for i, m := range msgs.Data { for i, m := range msgs.Data {
t := time.Unix(0, m.Created+int64(i)) t := time.Unix(0, m.Created+int64(i))
flat, err := json.Flatten(m.Payload)
if err != nil {
return nil, errors.Wrap(json.ErrTransform, err)
}
m.Payload = flat
// Copy first-level fields so that the original Payload is unchanged. // Copy first-level fields so that the original Payload is unchanged.
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for k, v := range m.Payload { for k, v := range m.Payload {

View File

@ -9,6 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/gofrs/uuid" "github.com/gofrs/uuid"
influxdata "github.com/influxdata/influxdb/client/v2" influxdata "github.com/influxdata/influxdb/client/v2"
writer "github.com/mainflux/mainflux/consumers/writers/influxdb" writer "github.com/mainflux/mainflux/consumers/writers/influxdb"
@ -159,22 +161,78 @@ func TestSaveJSON(t *testing.T) {
}, },
} }
invalidKeySepMsg := msg
invalidKeySepMsg.Payload = map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
"field_6/field_7": "value",
}
invalidKeyNameMsg := msg
invalidKeyNameMsg.Payload = map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
"publisher": "value",
}
now := time.Now().Unix() now := time.Now().Unix()
msgs := json.Messages{ msgs := json.Messages{
Format: "some_json", Format: "some_json",
} }
invalidKeySepMsgs := json.Messages{
Format: "some_json",
}
invalidKeyNameMsgs := json.Messages{
Format: "some_json",
}
for i := 0; i < streamsSize; i++ { for i := 0; i < streamsSize; i++ {
msg.Created = now + int64(i) msg.Created = now + int64(i)
msgs.Data = append(msgs.Data, msg) msgs.Data = append(msgs.Data, msg)
invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg)
invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg)
} }
err = repo.Consume(msgs) cases := []struct {
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) desc string
msgs json.Messages
err error
}{
{
desc: "consume valid json messages",
msgs: msgs,
err: nil,
},
{
desc: "consume invalid json messages containing invalid key separator",
msgs: invalidKeySepMsgs,
err: json.ErrInvalidKey,
},
{
desc: "consume invalid json messages containing invalid key name",
msgs: invalidKeySepMsgs,
err: json.ErrInvalidKey,
},
}
for _, tc := range cases {
err = repo.Consume(tc.msgs)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
row, err := queryDB(selectMsgs) row, err := queryDB(selectMsgs)
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
count := len(row) count := len(row)
assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count)) assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count))
}
} }

View File

@ -17,9 +17,9 @@ const sep = "/"
var keys = [...]string{"publisher", "protocol", "channel", "subtopic"} var keys = [...]string{"publisher", "protocol", "channel", "subtopic"}
var ( var (
// ErrTransform reprents an error during parsing message. // ErrTransform represents an error during parsing message.
ErrTransform = errors.New("unable to parse JSON object") ErrTransform = errors.New("unable to parse JSON object")
errInvalidKey = errors.New("invalid object key") ErrInvalidKey = errors.New("invalid object key")
errUnknownFormat = errors.New("unknown format of JSON message") errUnknownFormat = errors.New("unknown format of JSON message")
errInvalidFormat = errors.New("invalid JSON object") errInvalidFormat = errors.New("invalid JSON object")
errInvalidNestedJSON = errors.New("invalid nested JSON object") errInvalidNestedJSON = errors.New("invalid nested JSON object")
@ -32,6 +32,7 @@ func New() transformers.Transformer {
return funcTransformer(transformer) return funcTransformer(transformer)
} }
// Transform transforms Mainflux message to a list of JSON messages.
func (fh funcTransformer) Transform(msg messaging.Message) (interface{}, error) { func (fh funcTransformer) Transform(msg messaging.Message) (interface{}, error) {
return fh(msg) return fh(msg)
} }
@ -58,11 +59,7 @@ func transformer(msg messaging.Message) (interface{}, error) {
} }
switch p := payload.(type) { switch p := payload.(type) {
case map[string]interface{}: case map[string]interface{}:
flat, err := Flatten(p) ret.Payload = p
if err != nil {
return nil, errors.Wrap(ErrTransform, err)
}
ret.Payload = flat
return Messages{[]Message{ret}, format}, nil return Messages{[]Message{ret}, format}, nil
case []interface{}: case []interface{}:
res := []Message{} res := []Message{}
@ -72,12 +69,8 @@ func transformer(msg messaging.Message) (interface{}, error) {
if !ok { if !ok {
return nil, errors.Wrap(ErrTransform, errInvalidNestedJSON) return nil, errors.Wrap(ErrTransform, errInvalidNestedJSON)
} }
flat, err := Flatten(v)
if err != nil {
return nil, errors.Wrap(ErrTransform, err)
}
newMsg := ret newMsg := ret
newMsg.Payload = flat newMsg.Payload = v
res = append(res, newMsg) res = append(res, newMsg)
} }
return Messages{res, format}, nil return Messages{res, format}, nil
@ -86,7 +79,7 @@ func transformer(msg messaging.Message) (interface{}, error) {
} }
} }
// ParseFlat receives flat map that reprents complex JSON objects and returns // ParseFlat receives flat map that represents complex JSON objects and returns
// the corresponding complex JSON object with nested maps. It's the opposite // the corresponding complex JSON object with nested maps. It's the opposite
// of the Flatten function. // of the Flatten function.
func ParseFlat(flat interface{}) interface{} { func ParseFlat(flat interface{}) interface{} {
@ -97,14 +90,14 @@ func ParseFlat(flat interface{}) interface{} {
if value == nil { if value == nil {
continue continue
} }
keys := strings.Split(key, sep) subKeys := strings.Split(key, sep)
n := len(keys) n := len(subKeys)
if n == 1 { if n == 1 {
msg[key] = value msg[key] = value
continue continue
} }
current := msg current := msg
for i, k := range keys { for i, k := range subKeys {
if _, ok := current[k]; !ok { if _, ok := current[k]; !ok {
current[k] = make(map[string]interface{}) current[k] = make(map[string]interface{})
} }
@ -127,11 +120,11 @@ func Flatten(m map[string]interface{}) (map[string]interface{}, error) {
func flatten(prefix string, m, m1 map[string]interface{}) (map[string]interface{}, error) { func flatten(prefix string, m, m1 map[string]interface{}) (map[string]interface{}, error) {
for k, v := range m1 { for k, v := range m1 {
if strings.Contains(k, sep) { if strings.Contains(k, sep) {
return nil, errInvalidKey return nil, ErrInvalidKey
} }
for _, key := range keys { for _, key := range keys {
if k == key { if k == key {
return nil, errInvalidKey return nil, ErrInvalidKey
} }
} }
switch val := v.(type) { switch val := v.(type) {

View File

@ -17,7 +17,7 @@ import (
const ( const (
validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]` listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]`
invalidPayload = `{"key1": "val1", "key2": 123, "key3/1": "val3", "key4": {"key5": "val5"}}` invalidPayload = `{"key1": }`
) )
func TestTransformJSON(t *testing.T) { func TestTransformJSON(t *testing.T) {
@ -37,7 +37,7 @@ func TestTransformJSON(t *testing.T) {
listMsg := msg listMsg := msg
listMsg.Payload = []byte(listPayload) listMsg.Payload = []byte(listPayload)
jsonMsg := json.Messages{ jsonMsgs := json.Messages{
Data: []json.Message{ Data: []json.Message{
{ {
Channel: msg.Channel, Channel: msg.Channel,
@ -46,10 +46,12 @@ func TestTransformJSON(t *testing.T) {
Protocol: msg.Protocol, Protocol: msg.Protocol,
Created: msg.Created, Created: msg.Created,
Payload: map[string]interface{}{ Payload: map[string]interface{}{
"key1": "val1", "key1": "val1",
"key2": float64(123), "key2": float64(123),
"key3": "val3", "key3": "val3",
"key4/key5": "val5", "key4": map[string]interface{}{
"key5": "val5",
},
}, },
}, },
}, },
@ -68,10 +70,12 @@ func TestTransformJSON(t *testing.T) {
Protocol: msg.Protocol, Protocol: msg.Protocol,
Created: msg.Created, Created: msg.Created,
Payload: map[string]interface{}{ Payload: map[string]interface{}{
"key1": "val1", "key1": "val1",
"key2": float64(123), "key2": float64(123),
"keylist3": "val3", "keylist3": "val3",
"key4/key5": "val5", "key4": map[string]interface{}{
"key5": "val5",
},
}, },
}, },
{ {
@ -81,10 +85,12 @@ func TestTransformJSON(t *testing.T) {
Protocol: msg.Protocol, Protocol: msg.Protocol,
Created: msg.Created, Created: msg.Created,
Payload: map[string]interface{}{ Payload: map[string]interface{}{
"key1": "val1", "key1": "val1",
"key2": float64(123), "key2": float64(123),
"key3": "val3", "key3": "val3",
"key4/key5": "val5", "key4": map[string]interface{}{
"key5": "val5",
},
}, },
}, },
}, },
@ -100,7 +106,7 @@ func TestTransformJSON(t *testing.T) {
{ {
desc: "test transform JSON", desc: "test transform JSON",
msg: msg, msg: msg,
json: jsonMsg, json: jsonMsgs,
err: nil, err: nil,
}, },
{ {

View File

@ -9,7 +9,6 @@ import (
"github.com/gocql/gocql" "github.com/gocql/gocql"
"github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/errors"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers"
) )
@ -107,7 +106,6 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
if err != nil { if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
} }
m["payload"] = jsont.ParseFlat(m["payload"])
page.Messages = append(page.Messages, m) page.Messages = append(page.Messages, m)
} }
} }

View File

@ -8,7 +8,6 @@ import (
"encoding/json" "encoding/json"
"github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/errors"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
@ -75,7 +74,6 @@ func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (re
if err := cursor.Decode(&m); err != nil { if err := cursor.Decode(&m); err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
} }
m["payload"] = jsont.ParseFlat(m["payload"])
messages = append(messages, m) messages = append(messages, m)
} }

View File

@ -10,7 +10,6 @@ import (
"github.com/jmoiron/sqlx" // required for DB access "github.com/jmoiron/sqlx" // required for DB access
"github.com/lib/pq" "github.com/lib/pq"
"github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/errors"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers"
) )
@ -105,7 +104,6 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
if err != nil { if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
} }
m["payload"] = jsont.ParseFlat(m["payload"])
page.Messages = append(page.Messages, m) page.Messages = append(page.Messages, m)
} }