diff --git a/twins/mocks/states.go b/twins/mocks/states.go index 5fabc7f3..f59f39f1 100644 --- a/twins/mocks/states.go +++ b/twins/mocks/states.go @@ -38,6 +38,16 @@ func (srm *stateRepositoryMock) Save(ctx context.Context, st twins.State) error return nil } +// UpdateState updates the state +func (srm *stateRepositoryMock) Update(ctx context.Context, st twins.State) error { + srm.mu.Lock() + defer srm.mu.Unlock() + + srm.states[key(st.TwinID, string(st.ID))] = st + + return nil +} + // CountStates returns the number of states related to twin func (srm *stateRepositoryMock) Count(ctx context.Context, tw twins.Twin) (int64, error) { return int64(len(srm.states)), nil diff --git a/twins/mongodb/states.go b/twins/mongodb/states.go index b7b1bf7d..dd0866c3 100644 --- a/twins/mongodb/states.go +++ b/twins/mongodb/states.go @@ -39,6 +39,19 @@ func (sr *stateRepository) Save(ctx context.Context, st twins.State) error { return nil } +// Update persists the state +func (sr *stateRepository) Update(ctx context.Context, st twins.State) error { + coll := sr.db.Collection(statesCollection) + + filter := bson.M{"id": st.ID, "twinid": st.TwinID} + update := bson.M{"$set": st} + if _, err := coll.UpdateOne(context.Background(), filter, update); err != nil { + return err + } + + return nil +} + // CountStates returns the number of states related to twin func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, error) { coll := sr.db.Collection(statesCollection) diff --git a/twins/service.go b/twins/service.go index 175ad3e1..d2786513 100644 --- a/twins/service.go +++ b/twins/service.go @@ -6,10 +6,12 @@ package twins import ( "context" "encoding/json" - "errors" "fmt" + "math" "time" + "github.com/mainflux/mainflux/errors" + "github.com/mainflux/mainflux" nats "github.com/mainflux/mainflux/twins/nats/publisher" "github.com/mainflux/senml" @@ -66,6 +68,14 @@ type Service interface { RemoveTwin(context.Context, string, string) error } +const ( + noop = iota + update + save + millisec = 1e6 + nanosec = 1e9 +) + var crudOp = map[string]string{ "createSucc": "create.success", "createFail": "create.failure", @@ -120,10 +130,13 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de twin.Created = time.Now() twin.Updated = time.Now() - if len(def.Attributes) == 0 { - def = Definition{} + if def.Attributes == nil { def.Attributes = []Attribute{} } + if def.Delta == 0 { + def.Delta = millisec + } + def.Created = time.Now() def.ID = 0 twin.Definitions = append(twin.Definitions, def) @@ -293,12 +306,18 @@ func (ts *twinsService) saveState(msg *mainflux.Message, id string) error { } for _, rec := range recs { - if save := prepareState(&st, &tw, rec, msg); !save { + action := prepareState(&st, &tw, rec, msg) + switch action { + case noop: return nil - } - - if err := ts.states.Save(context.TODO(), st); err != nil { - return fmt.Errorf("Updating state for %s failed: %s", msg.Publisher, err) + case update: + if err := ts.states.Update(context.TODO(), st); err != nil { + return fmt.Errorf("Update state for %s failed: %s", msg.Publisher, err) + } + case save: + if err := ts.states.Save(context.TODO(), st); err != nil { + return fmt.Errorf("Save state for %s failed: %s", msg.Publisher, err) + } } } @@ -308,11 +327,9 @@ func (ts *twinsService) saveState(msg *mainflux.Message, id string) error { return nil } -func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message) bool { +func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message) int { def := tw.Definitions[len(tw.Definitions)-1] st.TwinID = tw.ID - st.ID++ - st.Created = time.Now() st.Definition = def.ID if st.Payload == nil { @@ -326,20 +343,34 @@ func prepareState(st *State, tw *Twin, rec senml.Record, msg *mainflux.Message) } } - save := false + recSec := rec.BaseTime + rec.Time + recNano := recSec * nanosec + sec, dec := math.Modf(recSec) + recTime := time.Unix(int64(sec), int64(dec*nanosec)) + + action := noop for _, attr := range def.Attributes { if !attr.PersistState { continue } if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic { + action = update + delta := math.Abs(float64(st.Created.UnixNano()) - recNano) + if recNano == 0 || delta > float64(def.Delta) { + action = save + st.ID++ + st.Created = time.Now() + if recNano != 0 { + st.Created = recTime + } + } val := findValue(rec) st.Payload[attr.Name] = val - save = true break } } - return save + return action } func findValue(rec senml.Record) interface{} { diff --git a/twins/states.go b/twins/states.go index 6aa11046..dce1463b 100644 --- a/twins/states.go +++ b/twins/states.go @@ -29,6 +29,9 @@ type StateRepository interface { // Save persists the state Save(context.Context, State) error + // Update updates the state + Update(context.Context, State) error + // Count returns the number of states related to state Count(context.Context, Twin) (int64, error) diff --git a/twins/twins.go b/twins/twins.go index 33672eab..64871154 100644 --- a/twins/twins.go +++ b/twins/twins.go @@ -24,6 +24,7 @@ type Definition struct { ID int `json:"id"` Created time.Time `json:"created"` Attributes []Attribute `json:"attributes"` + Delta int64 `json:"delta"` } // Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and