diff --git a/consumers/writers/influxdb/fields.go b/consumers/writers/influxdb/fields.go index 32bba4f6..1bd83b48 100644 --- a/consumers/writers/influxdb/fields.go +++ b/consumers/writers/influxdb/fields.go @@ -1,19 +1,16 @@ package influxdb import ( - "strconv" - "github.com/mainflux/mainflux/pkg/transformers/senml" ) type fields map[string]interface{} func senmlFields(msg senml.Message) fields { - updateTime := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64) ret := fields{ "protocol": msg.Protocol, "unit": msg.Unit, - "updateTime": updateTime, + "updateTime": msg.UpdateTime, } switch { diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index 536967da..cc201118 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -3,10 +3,9 @@ package influxdb import ( "encoding/json" "fmt" - "reflect" "strconv" - "strings" "time" + "unicode" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/readers" @@ -24,6 +23,11 @@ const ( var _ readers.MessageRepository = (*influxRepository)(nil) +var ( + errResultSet = errors.New("invalid result set") + errResultTime = errors.New("invalid result time") +) + type influxRepository struct { database string client influxdata.Client @@ -170,99 +174,63 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string { return condition } -// ParseMessage and parseValues are util methods. Since InfluxDB client returns -// results in form of rows and columns, this obscure message conversion is needed -// to return actual []broker.Message from the query result. -func parseValues(value interface{}, name string, msg *senml.Message) { - if name == "sum" && value != nil { - if valSum, ok := value.(json.Number); ok { - sum, err := valSum.Float64() - if err != nil { - return - } - - msg.Sum = &sum - } - return - } - - if strings.HasSuffix(strings.ToLower(name), "value") { - switch value.(type) { - case bool: - v := value.(bool) - msg.BoolValue = &v - case json.Number: - num, err := value.(json.Number).Float64() - if err != nil { - return - } - msg.Value = &num - case string: - if strings.HasPrefix(name, "string") { - v := value.(string) - msg.StringValue = &v - return - } - - if strings.HasPrefix(name, "data") { - v := value.(string) - msg.DataValue = &v - } - } - } -} - func parseMessage(measurement string, names []string, fields []interface{}) (interface{}, error) { switch measurement { case defMeasurement: - return parseSenml(names, fields), nil + return parseSenml(names, fields) default: return parseJSON(names, fields) } } -func parseSenml(names []string, fields []interface{}) interface{} { - m := senml.Message{} - v := reflect.ValueOf(&m).Elem() +func underscore(names []string) { for i, name := range names { - parseValues(fields[i], name, &m) - msgField := v.FieldByName(strings.Title(name)) - if !msgField.IsValid() { + var buff []rune + idx := 0 + for i, c := range name { + if unicode.IsUpper(c) { + buff = append(buff, []rune(name[idx:i])...) + buff = append(buff, []rune{'_', unicode.ToLower(c)}...) + idx = i + 1 + continue + } + } + buff = append(buff, []rune(name[idx:])...) + names[i] = string(buff) + } +} + +func parseSenml(names []string, fields []interface{}) (interface{}, error) { + m := make(map[string]interface{}) + if len(names) > len(fields) { + return nil, errResultSet + } + underscore(names) + for i, name := range names { + if name == "time" { + val, ok := fields[i].(string) + if !ok { + return nil, errResultTime + } + t, err := time.Parse(time.RFC3339Nano, val) + if err != nil { + return nil, err + } + v := float64(t.UnixNano()) / 1e9 + m[name] = v continue } - - f := msgField.Interface() - switch f.(type) { - case string: - if s, ok := fields[i].(string); ok { - msgField.SetString(s) - } - case float64: - fs, ok := fields[i].(string) - if !ok { - continue - } - - if name == "time" { - t, err := time.Parse(time.RFC3339Nano, fs) - if err != nil { - continue - } - - v := float64(t.UnixNano()) / 1e9 - msgField.SetFloat(v) - continue - } - - v, err := strconv.ParseFloat(fs, 64) - if err != nil { - continue - } - msgField.SetFloat(v) - } + m[name] = fields[i] } - - return m + data, err := json.Marshal(m) + if err != nil { + return nil, err + } + msg := senml.Message{} + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return msg, nil } func parseJSON(names []string, fields []interface{}) (interface{}, error) {