NOISSUE - Update state based on SenML time value (#1075)

* Update state based on SenML time value

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use Modf to parse SenML rec time

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Update to State in mocks

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Delta to Twin Definition and iota consts for state actions

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use action consts for switch statement

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic 2020-03-18 19:56:39 +01:00 committed by GitHub
parent c91fe0d453
commit 19503742a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 14 deletions

View File

@ -38,6 +38,16 @@ func (srm *stateRepositoryMock) Save(ctx context.Context, st twins.State) error
return nil
}
// UpdateState updates the state
func (srm *stateRepositoryMock) Update(ctx context.Context, st twins.State) error {
srm.mu.Lock()
defer srm.mu.Unlock()
srm.states[key(st.TwinID, string(st.ID))] = st
return nil
}
// CountStates returns the number of states related to twin
func (srm *stateRepositoryMock) Count(ctx context.Context, tw twins.Twin) (int64, error) {
return int64(len(srm.states)), nil

View File

@ -39,6 +39,19 @@ func (sr *stateRepository) Save(ctx context.Context, st twins.State) error {
return nil
}
// Update persists the state
func (sr *stateRepository) Update(ctx context.Context, st twins.State) error {
coll := sr.db.Collection(statesCollection)
filter := bson.M{"id": st.ID, "twinid": st.TwinID}
update := bson.M{"$set": st}
if _, err := coll.UpdateOne(context.Background(), filter, update); err != nil {
return err
}
return nil
}
// CountStates returns the number of states related to twin
func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, error) {
coll := sr.db.Collection(statesCollection)

View File

@ -6,10 +6,12 @@ package twins
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"time"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux"
nats "github.com/mainflux/mainflux/twins/nats/publisher"
"github.com/mainflux/senml"
@ -66,6 +68,14 @@ type Service interface {
RemoveTwin(context.Context, string, string) error
}
const (
noop = iota
update
save
millisec = 1e6
nanosec = 1e9
)
var crudOp = map[string]string{
"createSucc": "create.success",
"createFail": "create.failure",
@ -120,10 +130,13 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de
twin.Created = time.Now()
twin.Updated = time.Now()
if len(def.Attributes) == 0 {
def = Definition{}
if def.Attributes == nil {
def.Attributes = []Attribute{}
}
if def.Delta == 0 {
def.Delta = millisec
}
def.Created = time.Now()
def.ID = 0
twin.Definitions = append(twin.Definitions, def)
@ -293,12 +306,18 @@ func (ts *twinsService) saveState(msg *mainflux.Message, id string) error {
}
for _, rec := range recs {
if save := prepareState(&st, &tw, rec, msg); !save {
action := prepareState(&st, &tw, rec, msg)
switch action {
case noop:
return nil
}
if err := ts.states.Save(context.TODO(), st); err != nil {
return fmt.Errorf("Updating state for %s failed: %s", msg.Publisher, err)
case update:
if err := ts.states.Update(context.TODO(), st); err != nil {
return fmt.Errorf("Update state for %s failed: %s", msg.Publisher, err)
}
case save:
if err := ts.states.Save(context.TODO(), st); err != nil {
return fmt.Errorf("Save state for %s failed: %s", msg.Publisher, err)
}
}
}
@ -308,11 +327,9 @@ func (ts *twinsService) saveState(msg *mainflux.Message, id string) error {
return nil
}
func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message) bool {
func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message) int {
def := tw.Definitions[len(tw.Definitions)-1]
st.TwinID = tw.ID
st.ID++
st.Created = time.Now()
st.Definition = def.ID
if st.Payload == nil {
@ -326,20 +343,34 @@ func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message)
}
}
save := false
recSec := rec.BaseTime + rec.Time
recNano := recSec * nanosec
sec, dec := math.Modf(recSec)
recTime := time.Unix(int64(sec), int64(dec*nanosec))
action := noop
for _, attr := range def.Attributes {
if !attr.PersistState {
continue
}
if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic {
action = update
delta := math.Abs(float64(st.Created.UnixNano()) - recNano)
if recNano == 0 || delta > float64(def.Delta) {
action = save
st.ID++
st.Created = time.Now()
if recNano != 0 {
st.Created = recTime
}
}
val := findValue(rec)
st.Payload[attr.Name] = val
save = true
break
}
}
return save
return action
}
func findValue(rec senml.Record) interface{} {

View File

@ -29,6 +29,9 @@ type StateRepository interface {
// Save persists the state
Save(context.Context, State) error
// Update updates the state
Update(context.Context, State) error
// Count returns the number of states related to state
Count(context.Context, Twin) (int64, error)

View File

@ -24,6 +24,7 @@ type Definition struct {
ID int `json:"id"`
Created time.Time `json:"created"`
Attributes []Attribute `json:"attributes"`
Delta int64 `json:"delta"`
}
// Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and