diff --git a/cmd/twins/main.go b/cmd/twins/main.go index d30487a1..da0dfa19 100644 --- a/cmd/twins/main.go +++ b/cmd/twins/main.go @@ -145,6 +145,7 @@ func main() { defer closer.Close() svc := newService(nc, ncTracer, mc, mcTracer, auth, dbTracer, db, logger) + errs := make(chan error, 2) go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs) @@ -257,6 +258,7 @@ func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn { func newService(nc *nats.Conn, ncTracer opentracing.Tracer, mc mqtt.Mqtt, mcTracer opentracing.Tracer, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, logger logger.Logger) twins.Service { twinRepo := twmongodb.NewTwinRepository(db) + stateRepo := twmongodb.NewStateRepository(db) idp := uuid.New() diff --git a/twins/api/http/endpoint.go b/twins/api/http/endpoint.go index 43017892..38e381e4 100644 --- a/twins/api/http/endpoint.go +++ b/twins/api/http/endpoint.go @@ -20,7 +20,6 @@ func addTwinEndpoint(svc twins.Service) endpoint.Endpoint { twin := twins.Twin{ Name: req.Name, - ThingID: req.ThingID, Metadata: req.Metadata, } saved, err := svc.AddTwin(ctx, req.token, twin, req.Definition) diff --git a/twins/api/http/endpoint_test.go b/twins/api/http/endpoint_test.go index 9d36a7ee..e3f7f564 100644 --- a/twins/api/http/endpoint_test.go +++ b/twins/api/http/endpoint_test.go @@ -124,8 +124,8 @@ func TestAddTwin(t *testing.T) { req: "{}", contentType: contentType, auth: token, - status: http.StatusBadRequest, - location: "", + status: http.StatusCreated, + location: "/twins/123e4567-e89b-12d3-a456-000000000002", }, { desc: "add twin with invalid auth token", diff --git a/twins/api/http/requests.go b/twins/api/http/requests.go index ccbd7bcc..509f127e 100644 --- a/twins/api/http/requests.go +++ b/twins/api/http/requests.go @@ -17,7 +17,6 @@ type apiReq interface { type addTwinReq struct { token string Name string `json:"name,omitempty"` - ThingID string `json:"thing_id"` Definition twins.Definition `json:"definition,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` } @@ -27,10 +26,6 @@ func (req addTwinReq) validate() error { return twins.ErrUnauthorizedAccess } - if req.ThingID == "" { - return twins.ErrMalformedEntity - } - if len(req.Name) > maxNameSize { return twins.ErrMalformedEntity } diff --git a/twins/api/logging.go b/twins/api/logging.go index 45b8c762..172c8cc9 100644 --- a/twins/api/logging.go +++ b/twins/api/logging.go @@ -79,9 +79,9 @@ func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset return lm.svc.ListTwins(ctx, token, offset, limit, name, metadata) } -func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) { +func (lm *loggingMiddleware) SaveStates(msg *mainflux.Message) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method save_state took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method save_states took %s to complete", time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -89,7 +89,7 @@ func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.SaveState(msg) + return lm.svc.SaveStates(msg) } func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) { diff --git a/twins/api/metrics.go b/twins/api/metrics.go index 4bb51521..aa72ca05 100644 --- a/twins/api/metrics.go +++ b/twins/api/metrics.go @@ -68,13 +68,13 @@ func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset return ms.svc.ListTwins(ctx, token, offset, limit, name, metadata) } -func (ms *metricsMiddleware) SaveState(msg *mainflux.Message) error { +func (ms *metricsMiddleware) SaveStates(msg *mainflux.Message) error { defer func(begin time.Time) { - ms.counter.With("method", "save_state").Add(1) - ms.latency.With("method", "save_state").Observe(time.Since(begin).Seconds()) + ms.counter.With("method", "save_states").Add(1) + ms.latency.With("method", "save_states").Observe(time.Since(begin).Seconds()) }(time.Now()) - return ms.svc.SaveState(msg) + return ms.svc.SaveStates(msg) } func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) { diff --git a/twins/mocks/twins.go b/twins/mocks/twins.go index a6b339e4..78b8732a 100644 --- a/twins/mocks/twins.go +++ b/twins/mocks/twins.go @@ -71,6 +71,21 @@ func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, id string) (twins return twins.Twin{}, twins.ErrNotFound } +func (trm *twinRepositoryMock) RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error) { + var ids []string + for _, twin := range trm.twins { + def := twin.Definitions[len(twin.Definitions)-1] + for _, attr := range def.Attributes { + if attr.Channel == channel && attr.Subtopic == subtopic { + ids = append(ids, twin.ID) + break + } + } + } + + return ids, nil +} + func (trm *twinRepositoryMock) RetrieveByThing(_ context.Context, thingid string) (twins.Twin, error) { trm.mu.Lock() defer trm.mu.Unlock() diff --git a/twins/mongodb/twins.go b/twins/mongodb/twins.go index 4226f3f6..299118cd 100644 --- a/twins/mongodb/twins.go +++ b/twins/mongodb/twins.go @@ -88,6 +88,57 @@ func (tr *twinRepository) RetrieveByThing(ctx context.Context, thingid string) ( return tw, nil } +func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error) { + coll := tr.db.Collection(twinsCollection) + + findOptions := options.Aggregate() + prj1 := bson.M{ + "$project": bson.M{ + "definition": bson.M{ + "$arrayElemAt": []interface{}{"$definitions.attributes", -1}, + }, + "id": true, + "_id": 0, + }, + } + match := bson.M{ + "$match": bson.M{ + "definition.channel": channel, + "definition.subtopic": subtopic, + }, + } + prj2 := bson.M{ + "$project": bson.M{ + "id": true, + }, + } + + cur, err := coll.Aggregate(ctx, []bson.M{prj1, match, prj2}, findOptions) + + var ids []string + if err != nil { + return ids, err + } + defer cur.Close(ctx) + + if err := cur.Err(); err != nil { + return ids, nil + } + + for cur.Next(ctx) { + var elem struct { + ID string `json:"id"` + } + err := cur.Decode(&elem) + if err != nil { + return ids, nil + } + ids = append(ids, elem.ID) + } + + return ids, nil +} + func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) { coll := tr.db.Collection(twinsCollection) diff --git a/twins/nats/subscriber.go b/twins/nats/subscriber.go index 8676f742..b3b81451 100644 --- a/twins/nats/subscriber.go +++ b/twins/nats/subscriber.go @@ -39,10 +39,11 @@ func Subscribe(nc *nats.Conn, mc mqtt.Mqtt, svc twins.Service, logger log.Logger logger: logger, svc: svc, } + ps.natsClient.QueueSubscribe(input, queue, ps.handleMsg) } -func (ps pubsub) handleMsg(m *nats.Msg) { +func (ps *pubsub) handleMsg(m *nats.Msg) { var msg mainflux.Message if err := proto.Unmarshal(m.Data, &msg); err != nil { ps.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err)) @@ -53,5 +54,8 @@ func (ps pubsub) handleMsg(m *nats.Msg) { return } - ps.svc.SaveState(&msg) + if err := ps.svc.SaveStates(&msg); err != nil { + ps.logger.Error(fmt.Sprintf("State save failed: %s", err)) + return + } } diff --git a/twins/service.go b/twins/service.go index bdb60618..04e3181b 100644 --- a/twins/service.go +++ b/twins/service.go @@ -54,8 +54,8 @@ type Service interface { // twin identified by the id. ListStates(context.Context, string, uint64, uint64, string) (StatesPage, error) - // SaveState persists state into database - SaveState(*mainflux.Message) error + // SaveStates persists states into database + SaveStates(*mainflux.Message) error // ListTwinsByThing retrieves data about subset of twins that represent // specified thing belong to the user identified by @@ -123,7 +123,7 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de if len(def.Attributes) == 0 { def = Definition{} - def.Attributes = make(map[string]Attribute) + def.Attributes = []Attribute{} } def.Created = time.Now() def.ID = 0 @@ -249,13 +249,36 @@ func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint return ts.twins.RetrieveAll(ctx, res.GetValue(), offset, limit, name, metadata) } -func (ts *twinsService) SaveState(msg *mainflux.Message) error { +func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) { + _, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return StatesPage{}, ErrUnauthorizedAccess + } + + return ts.states.RetrieveAll(ctx, offset, limit, id) +} + +func (ts *twinsService) SaveStates(msg *mainflux.Message) error { + ids, err := ts.twins.RetrieveByAttribute(context.TODO(), msg.Channel, msg.Subtopic) + if err != nil { + return err + } + + for _, id := range ids { + if err := ts.saveState(msg, id); err != nil { + return err + } + } + + return nil +} + +func (ts *twinsService) saveState(msg *mainflux.Message, id string) error { var b []byte - var id string var err error defer ts.mqttClient.Publish(&id, &err, crudOp["stateSucc"], crudOp["stateFail"], &b) - tw, err := ts.twins.RetrieveByThing(context.TODO(), msg.Publisher) + tw, err := ts.twins.RetrieveByID(context.TODO(), id) if err != nil { return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err) } @@ -284,15 +307,6 @@ func (ts *twinsService) SaveState(msg *mainflux.Message) error { return nil } -func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) { - _, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) - if err != nil { - return StatesPage{}, ErrUnauthorizedAccess - } - - return ts.states.RetrieveAll(ctx, offset, limit, id) -} - func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Message) bool { def := tw.Definitions[len(tw.Definitions)-1] st.TwinID = tw.ID @@ -304,19 +318,20 @@ func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Messag st.Payload = make(map[string]interface{}) } else { for k := range st.Payload { - if _, ok := def.Attributes[k]; !ok || !def.Attributes[k].PersistState { + idx := findAttribute(k, def.Attributes) + if idx < 0 || !def.Attributes[idx].PersistState { delete(st.Payload, k) } } } save := false - for k, a := range def.Attributes { - if !a.PersistState { + for _, attr := range def.Attributes { + if !attr.PersistState { continue } - if a.Channel == msg.Channel && a.Subtopic == msg.Subtopic { - st.Payload[k] = recs[0].Value + if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic { + st.Payload[attr.Name] = recs[0].Value save = true break } @@ -324,3 +339,12 @@ func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Messag return save } + +func findAttribute(name string, attrs []Attribute) (idx int) { + for idx, attr := range attrs { + if attr.Name == name { + return idx + } + } + return -1 +} diff --git a/twins/twins.go b/twins/twins.go index fa2c57e8..33672eab 100644 --- a/twins/twins.go +++ b/twins/twins.go @@ -13,6 +13,7 @@ type Metadata map[string]interface{} // Attribute stores individual attribute data type Attribute struct { + Name string `json:"name"` Channel string `json:"channel"` Subtopic string `json:"subtopic"` PersistState bool `json:"persist_state"` @@ -20,9 +21,9 @@ type Attribute struct { // Definition stores entity's attributes type Definition struct { - ID int `json:"id"` - Created time.Time `json:"created"` - Attributes map[string]Attribute `json:"attributes"` + ID int `json:"id"` + Created time.Time `json:"created"` + Attributes []Attribute `json:"attributes"` } // Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and @@ -66,6 +67,10 @@ type TwinRepository interface { // RetrieveByID retrieves the twin having the provided identifier. RetrieveByID(ctx context.Context, id string) (Twin, error) + // RetrieveByAttribute retrieves twin ids whose definition contains + // the attribute with given channel and subtopic + RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error) + // RetrieveAll retrieves the subset of things owned by the specified user. RetrieveAll(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error)