NOISSUE - Add aggregate attribute-based search for twin retrieval (#1027)

* Add attribute map for twin retrieval

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Restructure attributes from map[string] to []

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove RAM attribute map and use mongo aggregation

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Update tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove attribute map service property

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic 2020-02-04 23:25:51 +01:00 committed by GitHub
parent 3f6a0cd14b
commit f785116a6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 135 additions and 40 deletions

View File

@ -145,6 +145,7 @@ func main() {
defer closer.Close()
svc := newService(nc, ncTracer, mc, mcTracer, auth, dbTracer, db, logger)
errs := make(chan error, 2)
go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs)
@ -257,6 +258,7 @@ func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
func newService(nc *nats.Conn, ncTracer opentracing.Tracer, mc mqtt.Mqtt, mcTracer opentracing.Tracer, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db)
stateRepo := twmongodb.NewStateRepository(db)
idp := uuid.New()

View File

@ -20,7 +20,6 @@ func addTwinEndpoint(svc twins.Service) endpoint.Endpoint {
twin := twins.Twin{
Name: req.Name,
ThingID: req.ThingID,
Metadata: req.Metadata,
}
saved, err := svc.AddTwin(ctx, req.token, twin, req.Definition)

View File

@ -124,8 +124,8 @@ func TestAddTwin(t *testing.T) {
req: "{}",
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
location: "",
status: http.StatusCreated,
location: "/twins/123e4567-e89b-12d3-a456-000000000002",
},
{
desc: "add twin with invalid auth token",

View File

@ -17,7 +17,6 @@ type apiReq interface {
type addTwinReq struct {
token string
Name string `json:"name,omitempty"`
ThingID string `json:"thing_id"`
Definition twins.Definition `json:"definition,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
@ -27,10 +26,6 @@ func (req addTwinReq) validate() error {
return twins.ErrUnauthorizedAccess
}
if req.ThingID == "" {
return twins.ErrMalformedEntity
}
if len(req.Name) > maxNameSize {
return twins.ErrMalformedEntity
}

View File

@ -79,9 +79,9 @@ func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset
return lm.svc.ListTwins(ctx, token, offset, limit, name, metadata)
}
func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) {
func (lm *loggingMiddleware) SaveStates(msg *mainflux.Message) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method save_state took %s to complete", time.Since(begin))
message := fmt.Sprintf("Method save_states took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -89,7 +89,7 @@ func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) {
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.SaveState(msg)
return lm.svc.SaveStates(msg)
}
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {

View File

@ -68,13 +68,13 @@ func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset
return ms.svc.ListTwins(ctx, token, offset, limit, name, metadata)
}
func (ms *metricsMiddleware) SaveState(msg *mainflux.Message) error {
func (ms *metricsMiddleware) SaveStates(msg *mainflux.Message) error {
defer func(begin time.Time) {
ms.counter.With("method", "save_state").Add(1)
ms.latency.With("method", "save_state").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "save_states").Add(1)
ms.latency.With("method", "save_states").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.SaveState(msg)
return ms.svc.SaveStates(msg)
}
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {

View File

@ -71,6 +71,21 @@ func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, id string) (twins
return twins.Twin{}, twins.ErrNotFound
}
func (trm *twinRepositoryMock) RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error) {
var ids []string
for _, twin := range trm.twins {
def := twin.Definitions[len(twin.Definitions)-1]
for _, attr := range def.Attributes {
if attr.Channel == channel && attr.Subtopic == subtopic {
ids = append(ids, twin.ID)
break
}
}
}
return ids, nil
}
func (trm *twinRepositoryMock) RetrieveByThing(_ context.Context, thingid string) (twins.Twin, error) {
trm.mu.Lock()
defer trm.mu.Unlock()

View File

@ -88,6 +88,57 @@ func (tr *twinRepository) RetrieveByThing(ctx context.Context, thingid string) (
return tw, nil
}
func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error) {
coll := tr.db.Collection(twinsCollection)
findOptions := options.Aggregate()
prj1 := bson.M{
"$project": bson.M{
"definition": bson.M{
"$arrayElemAt": []interface{}{"$definitions.attributes", -1},
},
"id": true,
"_id": 0,
},
}
match := bson.M{
"$match": bson.M{
"definition.channel": channel,
"definition.subtopic": subtopic,
},
}
prj2 := bson.M{
"$project": bson.M{
"id": true,
},
}
cur, err := coll.Aggregate(ctx, []bson.M{prj1, match, prj2}, findOptions)
var ids []string
if err != nil {
return ids, err
}
defer cur.Close(ctx)
if err := cur.Err(); err != nil {
return ids, nil
}
for cur.Next(ctx) {
var elem struct {
ID string `json:"id"`
}
err := cur.Decode(&elem)
if err != nil {
return ids, nil
}
ids = append(ids, elem.ID)
}
return ids, nil
}
func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) {
coll := tr.db.Collection(twinsCollection)

View File

@ -39,10 +39,11 @@ func Subscribe(nc *nats.Conn, mc mqtt.Mqtt, svc twins.Service, logger log.Logger
logger: logger,
svc: svc,
}
ps.natsClient.QueueSubscribe(input, queue, ps.handleMsg)
}
func (ps pubsub) handleMsg(m *nats.Msg) {
func (ps *pubsub) handleMsg(m *nats.Msg) {
var msg mainflux.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err))
@ -53,5 +54,8 @@ func (ps pubsub) handleMsg(m *nats.Msg) {
return
}
ps.svc.SaveState(&msg)
if err := ps.svc.SaveStates(&msg); err != nil {
ps.logger.Error(fmt.Sprintf("State save failed: %s", err))
return
}
}

View File

@ -54,8 +54,8 @@ type Service interface {
// twin identified by the id.
ListStates(context.Context, string, uint64, uint64, string) (StatesPage, error)
// SaveState persists state into database
SaveState(*mainflux.Message) error
// SaveStates persists states into database
SaveStates(*mainflux.Message) error
// ListTwinsByThing retrieves data about subset of twins that represent
// specified thing belong to the user identified by
@ -123,7 +123,7 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de
if len(def.Attributes) == 0 {
def = Definition{}
def.Attributes = make(map[string]Attribute)
def.Attributes = []Attribute{}
}
def.Created = time.Now()
def.ID = 0
@ -249,13 +249,36 @@ func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint
return ts.twins.RetrieveAll(ctx, res.GetValue(), offset, limit, name, metadata)
}
func (ts *twinsService) SaveState(msg *mainflux.Message) error {
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) {
_, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return StatesPage{}, ErrUnauthorizedAccess
}
return ts.states.RetrieveAll(ctx, offset, limit, id)
}
func (ts *twinsService) SaveStates(msg *mainflux.Message) error {
ids, err := ts.twins.RetrieveByAttribute(context.TODO(), msg.Channel, msg.Subtopic)
if err != nil {
return err
}
for _, id := range ids {
if err := ts.saveState(msg, id); err != nil {
return err
}
}
return nil
}
func (ts *twinsService) saveState(msg *mainflux.Message, id string) error {
var b []byte
var id string
var err error
defer ts.mqttClient.Publish(&id, &err, crudOp["stateSucc"], crudOp["stateFail"], &b)
tw, err := ts.twins.RetrieveByThing(context.TODO(), msg.Publisher)
tw, err := ts.twins.RetrieveByID(context.TODO(), id)
if err != nil {
return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err)
}
@ -284,15 +307,6 @@ func (ts *twinsService) SaveState(msg *mainflux.Message) error {
return nil
}
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) {
_, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return StatesPage{}, ErrUnauthorizedAccess
}
return ts.states.RetrieveAll(ctx, offset, limit, id)
}
func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Message) bool {
def := tw.Definitions[len(tw.Definitions)-1]
st.TwinID = tw.ID
@ -304,19 +318,20 @@ func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Messag
st.Payload = make(map[string]interface{})
} else {
for k := range st.Payload {
if _, ok := def.Attributes[k]; !ok || !def.Attributes[k].PersistState {
idx := findAttribute(k, def.Attributes)
if idx < 0 || !def.Attributes[idx].PersistState {
delete(st.Payload, k)
}
}
}
save := false
for k, a := range def.Attributes {
if !a.PersistState {
for _, attr := range def.Attributes {
if !attr.PersistState {
continue
}
if a.Channel == msg.Channel && a.Subtopic == msg.Subtopic {
st.Payload[k] = recs[0].Value
if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic {
st.Payload[attr.Name] = recs[0].Value
save = true
break
}
@ -324,3 +339,12 @@ func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Messag
return save
}
func findAttribute(name string, attrs []Attribute) (idx int) {
for idx, attr := range attrs {
if attr.Name == name {
return idx
}
}
return -1
}

View File

@ -13,6 +13,7 @@ type Metadata map[string]interface{}
// Attribute stores individual attribute data
type Attribute struct {
Name string `json:"name"`
Channel string `json:"channel"`
Subtopic string `json:"subtopic"`
PersistState bool `json:"persist_state"`
@ -20,9 +21,9 @@ type Attribute struct {
// Definition stores entity's attributes
type Definition struct {
ID int `json:"id"`
Created time.Time `json:"created"`
Attributes map[string]Attribute `json:"attributes"`
ID int `json:"id"`
Created time.Time `json:"created"`
Attributes []Attribute `json:"attributes"`
}
// Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and
@ -66,6 +67,10 @@ type TwinRepository interface {
// RetrieveByID retrieves the twin having the provided identifier.
RetrieveByID(ctx context.Context, id string) (Twin, error)
// RetrieveByAttribute retrieves twin ids whose definition contains
// the attribute with given channel and subtopic
RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error)
// RetrieveAll retrieves the subset of things owned by the specified user.
RetrieveAll(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error)