Mainflux.mainflux/normalizer/normalizer.go

74 lines
1.6 KiB
Go

// Package writer provides message writer concept definitions.
package normalizer
import (
"github.com/cisco/senml"
"github.com/mainflux/mainflux/writer"
)
var _ Service = (*normalizerService)(nil)
// Message represents a message emitted by the mainflux adapters layer.
type Message struct {
Channel string `json:"channel"`
Publisher string `json:"publisher"`
Protocol string `json:"protocol"`
ContentType string `json:"content_type"`
Payload []byte `json:"payload"`
}
type normalizerService struct {
mr writer.MessageRepository
}
// NewService instantiates the domain service implementation.
func NewService(mr writer.MessageRepository) Service {
return &normalizerService{mr}
}
func (ns *normalizerService) Send(msgs []writer.Message) {
for _, msg := range msgs {
ns.mr.Save(msg)
}
}
func Normalize(msg Message) (msgs []writer.Message, err error) {
var s, n senml.SenML
if s, err = senml.Decode(msg.Payload, senml.JSON); err != nil {
return nil, err
}
n = senml.Normalize(s)
msgs = make([]writer.Message, len(n.Records))
for k, v := range n.Records {
m := writer.Message{}
m.Channel = msg.Channel
m.Publisher = msg.Publisher
m.Protocol = msg.Protocol
m.Version = v.BaseVersion
m.Name = v.Name
m.Unit = v.Unit
if v.Value != nil {
m.Value = *v.Value
}
m.StringValue = v.StringValue
if v.BoolValue != nil {
m.BoolValue = *v.BoolValue
}
m.DataValue = v.DataValue
if v.Sum != nil {
m.ValueSum = *v.Sum
}
m.Time = v.Time
m.UpdateTime = v.UpdateTime
m.Link = v.Link
msgs[k] = m
}
return msgs, nil
}