MF-1180 - Add redis based twins and states cache (#1184)

* Add twins redis cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add connectToRedis to twins main and twinCache to twins service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tracing to twins cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twins cache mock and test setup for redis cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add TestTwinSave to redis twins cache tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add remove twin redis cache test

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add channels param to CreateDefinition helper method in mocks

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add IDs test to redis twins cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Simplify senml rec array and attribute creation funcs by removing unnecessary params

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Align cache remove twin method with service remove twin method

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add cache funcs to twins save, update and remove

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add def SaveIDs to redis cache and ref to service SaveStates

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add TwinSaveIDs tests for redis cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add cache related env vars desc to README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twinid bson key constant

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Update method to cache

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Integrate uuid unification related changes

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use named arguments in interface method declarations

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add env vars to docker-compose file

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Make parameter names in interface methods and implementations consistent

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Wrap vars and consts in var and const blocks

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic 2020-06-05 11:42:16 +02:00 committed by GitHub
parent 97f8d65885
commit 340e685d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 886 additions and 182 deletions

7
.env
View File

@ -239,6 +239,7 @@ MF_TWINS_SINGLE_USER_TOKEN=""
MF_TWINS_CLIENT_TLS=""
MF_TWINS_CA_CERTS=""
MF_TWINS_MQTT_URL=tcp://mqtt-adapter:1883
MF_TWINS_THING_ID=
MF_TWINS_THING_KEY=
MF_TWINS_CHANNEL_ID=
MF_TWINS_CHANNEL_ID=""
MF_TWINS_CACHE_URL=localhost:6379
MF_TWINS_CACHE_PASS=""
MF_TWINS_CACHE_DB=0

View File

@ -16,18 +16,20 @@ import (
"time"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/authn/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/nats"
uuidProvider "github.com/mainflux/mainflux/pkg/uuid"
localusers "github.com/mainflux/mainflux/things/users"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/api"
twapi "github.com/mainflux/mainflux/twins/api/http"
twmongodb "github.com/mainflux/mainflux/twins/mongodb"
rediscache "github.com/mainflux/mainflux/twins/redis"
"github.com/mainflux/mainflux/twins/tracing"
uuidProvider "github.com/mainflux/mainflux/pkg/uuid"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
@ -48,6 +50,9 @@ const (
defDB = "mainflux-twins"
defDBHost = "localhost"
defDBPort = "27017"
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defSingleUserEmail = ""
defSingleUserToken = ""
defClientTLS = "false"
@ -65,6 +70,9 @@ const (
envDB = "MF_TWINS_DB"
envDBHost = "MF_TWINS_DB_HOST"
envDBPort = "MF_TWINS_DB_PORT"
envCacheURL = "MF_TWINS_CACHE_URL"
envCachePass = "MF_TWINS_CACHE_PASS"
envCacheDB = "MF_TWINS_CACHE_DB"
envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN"
envClientTLS = "MF_TWINS_CLIENT_TLS"
@ -82,6 +90,9 @@ type config struct {
serverCert string
serverKey string
dbCfg twmongodb.Config
cacheURL string
cachePass string
cacheDB string
singleUserEmail string
singleUserToken string
clientTLS bool
@ -101,20 +112,22 @@ func main() {
log.Fatalf(err.Error())
}
cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger)
cacheTracer, cacheCloser := initJaeger("twins_cache", cfg.jaegerURL, logger)
defer cacheCloser.Close()
db, err := twmongodb.Connect(cfg.dbCfg, logger)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger)
defer dbCloser.Close()
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()
auth, _ := createAuthClient(cfg, authTracer, logger)
dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger)
defer dbCloser.Close()
pubSub, err := nats.NewPubSub(cfg.natsURL, queue, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
@ -122,7 +135,7 @@ func main() {
}
defer pubSub.Close()
svc := newService(pubSub, cfg.channelID, auth, dbTracer, db, logger)
svc := newService(pubSub, cfg.channelID, auth, dbTracer, db, cacheTracer, cacheClient, logger)
tracer, closer := initJaeger("twins", cfg.jaegerURL, logger)
defer closer.Close()
@ -163,6 +176,9 @@ func loadConfig() config {
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
dbCfg: dbCfg,
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
clientTLS: tls,
@ -232,7 +248,21 @@ func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
return conn
}
func newService(ps messaging.PubSub, chanID string, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, logger logger.Logger) twins.Service {
func connectToRedis(cacheURL, cachePass, cacheDB string, logger logger.Logger) *redis.Client {
db, err := strconv.Atoi(cacheDB)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
os.Exit(1)
}
return redis.NewClient(&redis.Options{
Addr: cacheURL,
Password: cachePass,
DB: db,
})
}
func newService(ps messaging.PubSub, chanID string, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db)
twinRepo = tracing.TwinRepositoryMiddleware(dbTracer, twinRepo)
@ -240,8 +270,10 @@ func newService(ps messaging.PubSub, chanID string, users mainflux.AuthNServiceC
stateRepo = tracing.StateRepositoryMiddleware(dbTracer, stateRepo)
up := uuidProvider.New()
twinCache := rediscache.NewTwinCache(cacheClient)
twinCache = tracing.TwinCacheMiddleware(cacheTracer, twinCache)
svc := twins.New(ps, users, twinRepo, stateRepo, up, chanID, logger)
svc := twins.New(ps, users, twinRepo, twinCache, stateRepo, up, chanID, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,

View File

@ -47,6 +47,10 @@ services:
MF_TWINS_MQTT_URL: ${MF_TWINS_MQTT_URL}
MF_AUTHN_GRPC_URL: ${MF_AUTHN_GRPC_URL}
MF_AUTHN_GRPC_TIMEOUT: ${MF_AUTHN_GRPC_TIMEOUT}
MF_TWINS_CACHE_URL: ${MF_TWINS_CACHE_URL}
MF_TWINS_CACHE_PASS: ${MF_TWINS_CACHE_PASS}
MF_TWINS_CACHE_DB: ${MF_TWINS_CACHE_DB}
ports:
- ${MF_TWINS_HTTP_PORT}:${MF_TWINS_HTTP_PORT}
networks:

View File

@ -31,6 +31,10 @@ default values.
| MF_NATS_URL | Mainflux NATS broker URL | nats://localhost:4222 |
| MF_AUTHN_GRPC_URL | AuthN service gRPC URL | localhost:8181 |
| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1 |
| MF_TWINS_CACHE_URL | Cache database URL | localhost:6379 |
| MF_TWINS_CACHE_PASS | Cache database password | |
| MF_TWINS_CACHE_DB | Cache instance name | 0 |
## Deployment
@ -64,6 +68,9 @@ services:
MF_NATS_URL: [Mainflux NATS broker URL]
MF_AUTHN_GRPC_URL: [AuthN service gRPC URL]
MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds]
MF_TWINS_ES_URL: [Event store URL]
MF_TWINS_ES_PASS: [Event store password]
MF_TWINS_ES_DB: [Event store instance name]
```
To start the service outside of the container, execute the following shell

View File

@ -19,14 +19,14 @@ import (
)
const (
nanosec = 1e9
attrName1 = "temperature"
attrSubtopic1 = "engine"
attrName2 = "humidity"
attrSubtopic2 = "chassis"
nanosec = 1e9
publisher = "twins"
numRecs = 100
)
var (
subtopics = []string{"engine", "chassis", "wheel_2"}
channels = []string{"01ec3c3e-0e66-4e69-9751-a0545b44e08f", "48061e4f-7c23-4f5c-9012-0f9b7cd9d18d", "5b2180e4-e96b-4469-9dc1-b6745078d0b6"}
)
type stateRes struct {
@ -49,12 +49,13 @@ func TestListStates(t *testing.T) {
twin := twins.Twin{
Owner: email,
}
def := mocks.CreateDefinition([]string{attrName1, attrName2}, []string{attrSubtopic1, attrSubtopic2})
def := mocks.CreateDefinition(channels[0:2], subtopics[0:2])
tw, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
attr := def.Attributes[0]
recs := mocks.CreateSenML(100, attrName1)
var recs = make([]senml.Record, numRecs)
mocks.CreateSenML(numRecs, recs)
message, err := mocks.CreateMessage(attr, recs)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
err = svc.SaveStates(message)

View File

@ -14,7 +14,6 @@ import (
"strings"
"testing"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/twins"
httpapi "github.com/mainflux/mainflux/twins/api/http"
"github.com/mainflux/mainflux/twins/mocks"
@ -84,24 +83,17 @@ func (tr testRequest) make() (*http.Response, error) {
return tr.client.Do(req)
}
func newService(tokens map[string]string) twins.Service {
auth := mocks.NewAuthNServiceClient(tokens)
twinsRepo := mocks.NewTwinRepository()
statesRepo := mocks.NewStateRepository()
subs := map[string]string{"chanID": "chanID"}
broker := mocks.NewBroker(subs)
uuidProvider := uuid.NewMock()
return twins.New(broker, auth, twinsRepo, statesRepo, uuidProvider, "chanID", nil)
}
func newServer(svc twins.Service) *httptest.Server {
mux := httpapi.MakeHandler(mocktracer.New(), svc)
return httptest.NewServer(mux)
}
func toJSON(data interface{}) string {
jsonData, _ := json.Marshal(data)
return string(jsonData)
func toJSON(data interface{}) (string, error) {
jsonData, err := json.Marshal(data)
if err != nil {
return "", err
}
return string(jsonData), nil
}
func TestAddTwin(t *testing.T) {
@ -110,10 +102,12 @@ func TestAddTwin(t *testing.T) {
defer ts.Close()
tw := twinReq{}
data := toJSON(tw)
data, err := toJSON(tw)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
tw.Name = invalidName
invalidData := toJSON(tw)
invalidData, err := toJSON(tw)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
cases := []struct {
desc string
@ -218,11 +212,13 @@ func TestUpdateTwin(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
twin.Name = twinName
data := toJSON(twin)
data, err := toJSON(twin)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
tw := twin
tw.Name = invalidName
invalidData := toJSON(tw)
invalidData, err := toJSON(tw)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
cases := []struct {
desc string

View File

@ -7,8 +7,10 @@ import (
"github.com/mainflux/mainflux/twins"
)
const maxNameSize = 1024
const maxLimitSize = 100
const (
maxNameSize = 1024
maxLimitSize = 100
)
type apiReq interface {
validate() error

View File

@ -27,7 +27,7 @@ func LoggingMiddleware(svc twins.Service, logger log.Logger) twins.Service {
return &loggingMiddleware{logger, svc}
}
func (lm *loggingMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (saved twins.Twin, err error) {
func (lm *loggingMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (tw twins.Twin, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method add_twin for token %s and twin %s took %s to complete", token, twin.ID, time.Since(begin))
if err != nil {
@ -53,9 +53,9 @@ func (lm *loggingMiddleware) UpdateTwin(ctx context.Context, token string, twin
return lm.svc.UpdateTwin(ctx, token, twin, def)
}
func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) {
func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, twinID string) (tw twins.Twin, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method view_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin))
message := fmt.Sprintf("Method view_twin for token %s and twin %s took %s to complete", token, twinID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -63,10 +63,10 @@ func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, id string) (vi
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ViewTwin(ctx, token, id)
return lm.svc.ViewTwin(ctx, token, twinID)
}
func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.Page, err error) {
func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method list_twins for token %s took %s to complete", token, time.Since(begin))
if err != nil {
@ -92,7 +92,7 @@ func (lm *loggingMiddleware) SaveStates(msg *messaging.Message) (err error) {
return lm.svc.SaveStates(msg)
}
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (page twins.StatesPage, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method list_states for token %s took %s to complete", token, time.Since(begin))
if err != nil {
@ -102,12 +102,12 @@ func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offse
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ListStates(ctx, token, offset, limit, id)
return lm.svc.ListStates(ctx, token, offset, limit, twinID)
}
func (lm *loggingMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) {
func (lm *loggingMiddleware) RemoveTwin(ctx context.Context, token, twinID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method remove_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin))
message := fmt.Sprintf("Method remove_twin for token %s and twin %s took %s to complete", token, twinID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -115,5 +115,5 @@ func (lm *loggingMiddleware) RemoveTwin(ctx context.Context, token, id string) (
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.RemoveTwin(ctx, token, id)
return lm.svc.RemoveTwin(ctx, token, twinID)
}

View File

@ -50,16 +50,16 @@ func (ms *metricsMiddleware) UpdateTwin(ctx context.Context, token string, twin
return ms.svc.UpdateTwin(ctx, token, twin, def)
}
func (ms *metricsMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) {
func (ms *metricsMiddleware) ViewTwin(ctx context.Context, token, twinID string) (tw twins.Twin, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "view_twin").Add(1)
ms.latency.With("method", "view_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ViewTwin(ctx, token, id)
return ms.svc.ViewTwin(ctx, token, twinID)
}
func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.Page, err error) {
func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "list_twins").Add(1)
ms.latency.With("method", "list_twins").Observe(time.Since(begin).Seconds())
@ -77,20 +77,20 @@ func (ms *metricsMiddleware) SaveStates(msg *messaging.Message) error {
return ms.svc.SaveStates(msg)
}
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (st twins.StatesPage, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "list_states").Add(1)
ms.latency.With("method", "list_states").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ListStates(ctx, token, offset, limit, id)
return ms.svc.ListStates(ctx, token, offset, limit, twinID)
}
func (ms *metricsMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) {
func (ms *metricsMiddleware) RemoveTwin(ctx context.Context, token, twinID string) (err error) {
defer func(begin time.Time) {
ms.counter.With("method", "remove_twin").Add(1)
ms.latency.With("method", "remove_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.RemoveTwin(ctx, token, id)
return ms.svc.RemoveTwin(ctx, token, twinID)
}

View File

@ -10,29 +10,26 @@ import (
"github.com/mainflux/senml"
)
const (
publisher = "twins"
)
const publisher = "twins"
// NewService use mock dependencies to create real twins service
func NewService(tokens map[string]string) twins.Service {
auth := NewAuthNServiceClient(tokens)
twinsRepo := NewTwinRepository()
twinCache := NewTwinCache()
statesRepo := NewStateRepository()
uuidProvider := uuid.NewMock()
subs := map[string]string{"chanID": "chanID"}
broker := NewBroker(subs)
return twins.New(broker, auth, twinsRepo, statesRepo, uuidProvider, "chanID", nil)
return twins.New(broker, auth, twinsRepo, twinCache, statesRepo, uuidProvider, "chanID", nil)
}
// CreateDefinition creates twin definition
func CreateDefinition(names []string, subtopics []string) twins.Definition {
func CreateDefinition(channels []string, subtopics []string) twins.Definition {
var def twins.Definition
for i, v := range names {
id, _ := uuid.New().ID()
for i := range channels {
attr := twins.Attribute{
Name: v,
Channel: id,
Channel: channels[i],
Subtopic: subtopics[i],
PersistState: true,
}
@ -42,18 +39,12 @@ func CreateDefinition(names []string, subtopics []string) twins.Definition {
}
// CreateSenML creates SenML record array
func CreateSenML(n int, bn string) []senml.Record {
var recs []senml.Record
for i := 0; i < n; i++ {
rec := senml.Record{
BaseName: bn,
BaseTime: float64(time.Now().Unix()),
Time: float64(i),
Value: nil,
}
recs = append(recs, rec)
func CreateSenML(n int, recs []senml.Record) {
for i, rec := range recs {
rec.BaseTime = float64(time.Now().Unix())
rec.Time = float64(i)
rec.Value = nil
}
return recs
}
// CreateMessage creates Mainflux message using SenML record array

View File

@ -92,13 +92,13 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64,
}
// RetrieveLast returns the last state related to twin spec by id
func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, id string) (twins.State, error) {
func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, twinID string) (twins.State, error) {
srm.mu.Lock()
defer srm.mu.Unlock()
items := make([]twins.State, 0)
for _, v := range srm.states {
if v.TwinID == id {
if v.TwinID == twinID {
items = append(items, v)
}
}

View File

@ -57,12 +57,12 @@ func (trm *twinRepositoryMock) Update(ctx context.Context, twin twins.Twin) erro
return nil
}
func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, id string) (twins.Twin, error) {
func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, twinID string) (twins.Twin, error) {
trm.mu.Lock()
defer trm.mu.Unlock()
for k, v := range trm.twins {
if id == v.ID {
if twinID == v.ID {
return trm.twins[k], nil
}
}
@ -132,12 +132,12 @@ func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offs
return page, nil
}
func (trm *twinRepositoryMock) Remove(ctx context.Context, id string) error {
func (trm *twinRepositoryMock) Remove(ctx context.Context, twinID string) error {
trm.mu.Lock()
defer trm.mu.Unlock()
for k, v := range trm.twins {
if id == v.ID {
if twinID == v.ID {
delete(trm.twins, k)
return nil
}
@ -145,3 +145,120 @@ func (trm *twinRepositoryMock) Remove(ctx context.Context, id string) error {
return nil
}
type twinCacheMock struct {
mu sync.Mutex
attrIds map[string]map[string]bool
idAttrs map[string]map[string]bool
}
// NewTwinCache returns mock cache instance.
func NewTwinCache() twins.TwinCache {
return &twinCacheMock{
attrIds: make(map[string]map[string]bool),
idAttrs: make(map[string]map[string]bool),
}
}
func (tcm *twinCacheMock) Save(_ context.Context, twin twins.Twin) error {
tcm.mu.Lock()
defer tcm.mu.Unlock()
if len(twin.Definitions) < 1 {
return nil
}
def := twin.Definitions[len(twin.Definitions)-1]
tcm.save(def, twin.ID)
return nil
}
func (tcm *twinCacheMock) SaveIDs(ctx context.Context, channel, subtopic string, ids []string) error {
tcm.mu.Lock()
defer tcm.mu.Unlock()
for _, id := range ids {
attrKey := channel + subtopic
if _, ok := tcm.attrIds[attrKey]; !ok {
tcm.attrIds[attrKey] = make(map[string]bool)
}
tcm.attrIds[attrKey][id] = true
if _, ok := tcm.idAttrs[id]; !ok {
tcm.idAttrs[id] = make(map[string]bool)
}
tcm.idAttrs[id][attrKey] = true
}
return nil
}
func (tcm *twinCacheMock) Update(_ context.Context, twin twins.Twin) error {
tcm.mu.Lock()
defer tcm.mu.Unlock()
if err := tcm.remove(twin.ID); err != nil {
return nil
}
if len(twin.Definitions) < 1 {
return nil
}
def := twin.Definitions[len(twin.Definitions)-1]
tcm.save(def, twin.ID)
return nil
}
func (tcm *twinCacheMock) IDs(_ context.Context, channel, subtopic string) ([]string, error) {
tcm.mu.Lock()
defer tcm.mu.Unlock()
var ids []string
idsMap, ok := tcm.attrIds[channel+subtopic]
if !ok {
return ids, nil
}
for k := range idsMap {
ids = append(ids, k)
}
return ids, nil
}
func (tcm *twinCacheMock) Remove(_ context.Context, twinID string) error {
tcm.mu.Lock()
defer tcm.mu.Unlock()
return tcm.remove(twinID)
}
func (tcm *twinCacheMock) remove(twinID string) error {
attrKeys, ok := tcm.idAttrs[twinID]
if !ok {
return nil
}
delete(tcm.idAttrs, twinID)
for attrKey := range attrKeys {
delete(tcm.attrIds[attrKey], twinID)
}
return nil
}
func (tcm *twinCacheMock) save(def twins.Definition, twinID string) {
for _, attr := range def.Attributes {
attrKey := attr.Channel + attr.Subtopic
if _, ok := tcm.attrIds[attrKey]; !ok {
tcm.attrIds[attrKey] = make(map[string]bool)
}
tcm.attrIds[attrKey][twinID] = true
idKey := twinID
if _, ok := tcm.idAttrs[idKey]; !ok {
tcm.idAttrs[idKey] = make(map[string]bool)
}
tcm.idAttrs[idKey][attrKey] = true
}
}

View File

@ -12,7 +12,10 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
const statesCollection string = "states"
const (
statesCollection string = "states"
twinid = "twinid"
)
type stateRepository struct {
db *mongo.Database
@ -43,7 +46,7 @@ func (sr *stateRepository) Save(ctx context.Context, st twins.State) error {
func (sr *stateRepository) Update(ctx context.Context, st twins.State) error {
coll := sr.db.Collection(statesCollection)
filter := bson.M{"id": st.ID, "twinid": st.TwinID}
filter := bson.M{"id": st.ID, twinid: st.TwinID}
update := bson.M{"$set": st}
if _, err := coll.UpdateOne(context.Background(), filter, update); err != nil {
return err
@ -56,7 +59,7 @@ func (sr *stateRepository) Update(ctx context.Context, st twins.State) error {
func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, error) {
coll := sr.db.Collection(statesCollection)
filter := bson.D{{"twinid", tw.ID}}
filter := bson.M{twinid: tw.ID}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return 0, err
@ -66,14 +69,14 @@ func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, err
}
// RetrieveAll retrieves the subset of states related to twin specified by id
func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (twins.StatesPage, error) {
func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit uint64, twinID string) (twins.StatesPage, error) {
coll := sr.db.Collection(statesCollection)
findOptions := options.Find()
findOptions.SetSkip(int64(offset))
findOptions.SetLimit(int64(limit))
filter := bson.D{{"twinid", id}}
filter := bson.M{twinid: twinID}
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
@ -101,10 +104,10 @@ func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit
}
// RetrieveLast returns the last state related to twin spec by id
func (sr *stateRepository) RetrieveLast(ctx context.Context, id string) (twins.State, error) {
func (sr *stateRepository) RetrieveLast(ctx context.Context, twinID string) (twins.State, error) {
coll := sr.db.Collection(statesCollection)
filter := bson.D{{"twinid", id}}
filter := bson.M{twinid: twinID}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return twins.State{}, err

View File

@ -51,8 +51,8 @@ func (tr *twinRepository) Update(ctx context.Context, tw twins.Twin) error {
coll := tr.db.Collection(twinsCollection)
filter := bson.D{{"id", tw.ID}}
update := bson.D{{"$set", tw}}
filter := bson.M{"id": tw.ID}
update := bson.M{"$set": tw}
res, err := coll.UpdateOne(context.Background(), filter, update)
if err != nil {
return err
@ -65,11 +65,11 @@ func (tr *twinRepository) Update(ctx context.Context, tw twins.Twin) error {
return nil
}
func (tr *twinRepository) RetrieveByID(_ context.Context, id string) (twins.Twin, error) {
func (tr *twinRepository) RetrieveByID(_ context.Context, twinID string) (twins.Twin, error) {
coll := tr.db.Collection(twinsCollection)
var tw twins.Twin
filter := bson.D{{"id", id}}
filter := bson.M{"id": twinID}
if err := coll.FindOne(context.Background(), filter).Decode(&tw); err != nil {
return tw, twins.ErrNotFound
}
@ -135,16 +135,16 @@ func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset
findOptions.SetSkip(int64(offset))
findOptions.SetLimit(int64(limit))
filter := bson.D{}
filter := bson.M{}
if owner != "" {
filter = append(filter, bson.E{"owner", owner})
filter["owner"] = owner
}
if name != "" {
filter = append(filter, bson.E{"name", name})
filter["name"] = name
}
if len(metadata) > 0 {
filter = append(filter, bson.E{"metadata", metadata})
filter["metadata"] = metadata
}
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
@ -171,10 +171,10 @@ func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset
}, nil
}
func (tr *twinRepository) Remove(ctx context.Context, id string) error {
func (tr *twinRepository) Remove(ctx context.Context, twinID string) error {
coll := tr.db.Collection(twinsCollection)
filter := bson.D{{"id", id}}
filter := bson.M{"id": twinID}
res, err := coll.DeleteOne(context.Background(), filter)
if err != nil {
return err

53
twins/redis/setup_test.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis_test
import (
"fmt"
"log"
"os"
"testing"
"github.com/go-redis/redis"
dockertest "github.com/ory/dockertest/v3"
)
const (
wrongID = 0
wrongValue = "wrong-value"
)
var redisClient *redis.Client
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
container, err := pool.Run("redis", "5.0-alpine", nil)
if err != nil {
log.Fatalf("Could not start container: %s", err)
}
if err := pool.Retry(func() error {
redisClient = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")),
Password: "",
DB: 0,
})
return redisClient.Ping().Err()
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
code := m.Run()
if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}
os.Exit(code)
}

123
twins/redis/twins.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"fmt"
"github.com/go-redis/redis"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/twins"
)
const (
prefix = "twin"
)
var (
// ErrRedisTwinSave indicates error while saving Twin in redis cache
ErrRedisTwinSave = errors.New("failed to save twin in redis cache")
// ErrRedisTwinUpdate indicates error while saving Twin in redis cache
ErrRedisTwinUpdate = errors.New("failed to update twin in redis cache")
// ErrRedisTwinIDs indicates error while geting Twin IDs from redis cache
ErrRedisTwinIDs = errors.New("failed to get twin id from redis cache")
// ErrRedisTwinRemove indicates error while removing Twin from redis cache
ErrRedisTwinRemove = errors.New("failed to remove twin from redis cache")
)
var _ twins.TwinCache = (*twinCache)(nil)
type twinCache struct {
client *redis.Client
}
// NewTwinCache returns redis twin cache implementation.
func NewTwinCache(client *redis.Client) twins.TwinCache {
return &twinCache{
client: client,
}
}
func (tc *twinCache) Save(_ context.Context, twin twins.Twin) error {
return tc.save(twin)
}
func (tc *twinCache) Update(_ context.Context, twin twins.Twin) error {
if err := tc.remove(twin.ID); err != nil {
return errors.Wrap(ErrRedisTwinUpdate, err)
}
if err := tc.save(twin); err != nil {
return errors.Wrap(ErrRedisTwinUpdate, err)
}
return nil
}
func (tc *twinCache) SaveIDs(_ context.Context, channel, subtopic string, ids []string) error {
for _, id := range ids {
if err := tc.client.SAdd(attrKey(channel, subtopic), id).Err(); err != nil {
return errors.Wrap(ErrRedisTwinSave, err)
}
if err := tc.client.SAdd(twinKey(id), attrKey(channel, subtopic)).Err(); err != nil {
return errors.Wrap(ErrRedisTwinSave, err)
}
}
return nil
}
func (tc *twinCache) IDs(_ context.Context, channel, subtopic string) ([]string, error) {
ids, err := tc.client.SMembers(attrKey(channel, subtopic)).Result()
if err != nil {
return nil, errors.Wrap(ErrRedisTwinIDs, err)
}
return ids, nil
}
func (tc *twinCache) Remove(_ context.Context, twinID string) error {
return tc.remove(twinID)
}
func (tc *twinCache) save(twin twins.Twin) error {
if len(twin.Definitions) < 1 {
return nil
}
attributes := twin.Definitions[len(twin.Definitions)-1].Attributes
for _, attr := range attributes {
if err := tc.client.SAdd(attrKey(attr.Channel, attr.Subtopic), twin.ID).Err(); err != nil {
return errors.Wrap(ErrRedisTwinSave, err)
}
if err := tc.client.SAdd(twinKey(twin.ID), attrKey(attr.Channel, attr.Subtopic)).Err(); err != nil {
return errors.Wrap(ErrRedisTwinSave, err)
}
}
return nil
}
func (tc *twinCache) remove(twinID string) error {
twinKey := twinKey(twinID)
attrKeys, err := tc.client.SMembers(twinKey).Result()
if err != nil {
return errors.Wrap(ErrRedisTwinRemove, err)
}
if err := tc.client.Del(twinKey).Err(); err != nil {
return errors.Wrap(ErrRedisTwinRemove, err)
}
for _, attrKey := range attrKeys {
if err := tc.client.SRem(attrKey, twinID).Err(); err != nil {
return errors.Wrap(ErrRedisTwinRemove, err)
}
}
return nil
}
func twinKey(twinID string) string {
return fmt.Sprintf("%s:%s", prefix, twinID)
}
func attrKey(channel, subtopic string) string {
return fmt.Sprintf("%s:%s-%s", prefix, channel, subtopic)
}

299
twins/redis/twins_test.go Normal file
View File

@ -0,0 +1,299 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis_test
import (
"context"
"fmt"
"testing"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mocks"
"github.com/mainflux/mainflux/twins/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
subtopics = []string{"engine", "chassis", "wheel_2"}
channels = []string{"01ec3c3e-0e66-4e69-9751-a0545b44e08f", "48061e4f-7c23-4f5c-9012-0f9b7cd9d18d", "5b2180e4-e96b-4469-9dc1-b6745078d0b6"}
)
func TestTwinSave(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
twin1, err := createTwin(channels[0:2], subtopics[0:2])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin2, err := createTwin(channels[1:3], subtopics[1:3])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
twin twins.Twin
err error
}{
{
desc: "Save twin to cache",
twin: twin1,
err: nil,
},
{
desc: "Save already cached twin to cache",
twin: twin1,
err: nil,
},
{
desc: "Save another twin to cache",
twin: twin2,
err: nil,
},
{
desc: "Save already cached twin to cache",
twin: twin2,
err: nil,
},
}
for _, tc := range cases {
ctx := context.Background()
err := twinCache.Save(ctx, tc.twin)
assert.Nil(t, err, fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
def := tc.twin.Definitions[len(tc.twin.Definitions)-1]
for _, attr := range def.Attributes {
ids, err := twinCache.IDs(ctx, attr.Channel, attr.Subtopic)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
assert.Contains(t, ids, tc.twin.ID, fmt.Sprintf("%s: id %s not found in %v", tc.desc, tc.twin.ID, ids))
}
}
}
func TestTwinSaveIDs(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
twinIDs := []string{"7956f132-0b42-488d-9bd1-0f6dd9d77f98", "a2210c42-1eaf-41ad-b8c1-813317719ed9", "6e815c79-a159-41b0-9ff0-cfa14430e07e"}
cases := []struct {
desc string
channel string
subtopic string
ids []string
err error
}{
{
desc: "Save ids to cache",
channel: channels[0],
subtopic: subtopics[0],
ids: twinIDs,
err: nil,
},
{
desc: "Save empty ids array to cache",
channel: channels[2],
subtopic: subtopics[2],
ids: []string{},
err: nil,
},
{
desc: "Save already saved ids to cache",
channel: channels[0],
subtopic: subtopics[0],
ids: twinIDs,
err: nil,
},
{
desc: "Save ids to cache",
channel: channels[1],
subtopic: subtopics[1],
ids: twinIDs[0:2],
err: nil,
},
}
for _, tc := range cases {
ctx := context.Background()
err := twinCache.SaveIDs(ctx, tc.channel, tc.subtopic, tc.ids)
assert.Nil(t, err, fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
ids, err := twinCache.IDs(ctx, tc.channel, tc.subtopic)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
assert.ElementsMatch(t, ids, tc.ids, fmt.Sprintf("%s: ids %v not found in %v", tc.desc, tc.ids, ids))
}
}
func TestTwinUpdate(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
ctx := context.Background()
var tws []twins.Twin
for i := range channels {
tw, err := createTwin(channels[i:i+1], subtopics[i:i+1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
err := twinCache.Save(ctx, tws[0])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws[1].ID = tws[0].ID
cases := []struct {
desc string
twinID string
twin twins.Twin
err error
}{
{
desc: "Update saved twin",
twinID: tws[0].ID,
twin: tws[1],
err: nil,
},
{
desc: "Update twin with same definition",
twinID: tws[0].ID,
twin: tws[1],
err: nil,
},
{
desc: "Update unsaved twin definition",
twinID: tws[2].ID,
twin: tws[2],
err: nil,
},
}
for _, tc := range cases {
err := twinCache.Update(ctx, tc.twin)
assert.Nil(t, err, fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
attr := tc.twin.Definitions[0].Attributes[0]
ids, err := twinCache.IDs(ctx, attr.Channel, attr.Subtopic)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
assert.Contains(t, ids, tc.twinID, fmt.Sprintf("%s: ids %v do not contain id %s", tc.desc, ids, tc.twinID))
}
}
func TestTwinIDs(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
ctx := context.Background()
var tws []twins.Twin
for i := 0; i < len(channels); i++ {
tw, err := createTwin(channels[0:1], subtopics[0:1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
for i := 0; i < len(channels); i++ {
tw, err := createTwin(channels[1:2], subtopics[1:2])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
nonExistAttr := twins.Attribute{
Channel: channels[2],
Subtopic: subtopics[0],
PersistState: true,
}
cases := []struct {
desc string
ids []string
attr twins.Attribute
err error
}{
{
desc: "Get twin IDs from cache for subset of ids",
ids: []string{tws[0].ID, tws[1].ID, tws[2].ID},
attr: tws[0].Definitions[0].Attributes[0],
err: nil,
},
{
desc: "Get twin IDs from cache for subset of ids",
ids: []string{tws[3].ID, tws[4].ID, tws[5].ID},
attr: tws[3].Definitions[0].Attributes[0],
err: nil,
},
{
desc: "Get twin IDs from cache for non existing attribute",
ids: []string{},
attr: nonExistAttr,
err: nil,
},
}
for _, tc := range cases {
ids, err := twinCache.IDs(ctx, tc.attr.Channel, tc.attr.Subtopic)
assert.Nil(t, err, fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
assert.ElementsMatch(t, ids, tc.ids, fmt.Sprintf("%s: expected ids %v got ids %v", tc.desc, tc.ids, ids))
}
}
func TestTwinRemove(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
ctx := context.Background()
var tws []twins.Twin
for i := range channels {
tw, err := createTwin(channels[i:i+1], subtopics[i:i+1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
cases := []struct {
desc string
twin twins.Twin
err error
}{
{
desc: "Remove twin from cache",
twin: tws[0],
err: nil,
},
{
desc: "Remove already removed twin from cache",
twin: tws[0],
err: nil,
},
{
desc: "Remove another twin from cache",
twin: tws[1],
err: nil,
},
}
for _, tc := range cases {
err := twinCache.Remove(ctx, tc.twin.ID)
assert.Nil(t, err, fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
def := tc.twin.Definitions[len(tc.twin.Definitions)-1]
for _, attr := range def.Attributes {
ids, err := twinCache.IDs(ctx, attr.Channel, attr.Subtopic)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
assert.NotContains(t, ids, tc.twin.ID, fmt.Sprintf("%s: id %s found in %v", tc.desc, tc.twin.ID, ids))
}
}
}
func createTwin(channels []string, subtopics []string) (twins.Twin, error) {
id, err := uuid.New().ID()
if err != nil {
return twins.Twin{}, err
}
return twins.Twin{
ID: id,
Definitions: []twins.Definition{mocks.CreateDefinition(channels, subtopics)},
}, nil
}

View File

@ -18,9 +18,7 @@ import (
"github.com/mainflux/senml"
)
const (
publisher = "twins"
)
const publisher = "twins"
var (
// ErrMalformedEntity indicates malformed entity specification (e.g.
@ -50,11 +48,11 @@ type Service interface {
// ViewTwin retrieves data about twin with the provided
// ID belonging to the user identified by the provided key.
ViewTwin(ctx context.Context, token, id string) (tw Twin, err error)
ViewTwin(ctx context.Context, token, twinID string) (tw Twin, err error)
// RemoveTwin removes the twin identified with the provided ID, that
// belongs to the user identified by the provided key.
RemoveTwin(ctx context.Context, token, id string) (err error)
RemoveTwin(ctx context.Context, token, twinID string) (err error)
// ListTwins retrieves data about subset of twins that belongs to the
// user identified by the provided key.
@ -62,7 +60,7 @@ type Service interface {
// ListStates retrieves data about subset of states that belongs to the
// twin identified by the id.
ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error)
ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (StatesPage, error)
// SaveStates persists states into database
SaveStates(msg *messaging.Message) error
@ -96,19 +94,21 @@ type twinsService struct {
states StateRepository
uuidProvider mainflux.UUIDProvider
channelID string
twinCache TwinCache
logger logger.Logger
}
var _ Service = (*twinsService)(nil)
// New instantiates the twins service implementation.
func New(publisher messaging.Publisher, auth mainflux.AuthNServiceClient, twins TwinRepository, sr StateRepository, up mainflux.UUIDProvider, chann string, logger logger.Logger) Service {
func New(publisher messaging.Publisher, auth mainflux.AuthNServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp mainflux.UUIDProvider, chann string, logger logger.Logger) Service {
return &twinsService{
publisher: publisher,
auth: auth,
twins: twins,
twinCache: tcache,
states: sr,
uuidProvider: up,
uuidProvider: idp,
channelID: chann,
logger: logger,
}
@ -154,7 +154,7 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de
id = twin.ID
b, err = json.Marshal(twin)
return twin, nil
return twin, ts.twinCache.Save(ctx, twin)
}
func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) {
@ -205,19 +205,19 @@ func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin,
id = twin.ID
b, err = json.Marshal(tw)
return nil
return ts.twinCache.Update(ctx, twin)
}
func (ts *twinsService) ViewTwin(ctx context.Context, token, id string) (tw Twin, err error) {
func (ts *twinsService) ViewTwin(ctx context.Context, token, twinID string) (tw Twin, err error) {
var b []byte
defer ts.publish(&id, &err, crudOp["getSucc"], crudOp["getFail"], &b)
defer ts.publish(&twinID, &err, crudOp["getSucc"], crudOp["getFail"], &b)
_, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return Twin{}, ErrUnauthorizedAccess
}
twin, err := ts.twins.RetrieveByID(ctx, id)
twin, err := ts.twins.RetrieveByID(ctx, twinID)
if err != nil {
return Twin{}, err
}
@ -227,20 +227,20 @@ func (ts *twinsService) ViewTwin(ctx context.Context, token, id string) (tw Twin
return twin, nil
}
func (ts *twinsService) RemoveTwin(ctx context.Context, token, id string) (err error) {
func (ts *twinsService) RemoveTwin(ctx context.Context, token, twinID string) (err error) {
var b []byte
defer ts.publish(&id, &err, crudOp["removeSucc"], crudOp["removeFail"], &b)
defer ts.publish(&twinID, &err, crudOp["removeSucc"], crudOp["removeFail"], &b)
_, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return ErrUnauthorizedAccess
}
if err := ts.twins.Remove(ctx, id); err != nil {
if err := ts.twins.Remove(ctx, twinID); err != nil {
return err
}
return nil
return ts.twinCache.Remove(ctx, twinID)
}
func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata Metadata) (Page, error) {
@ -252,20 +252,36 @@ func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint
return ts.twins.RetrieveAll(ctx, res.GetValue(), offset, limit, name, metadata)
}
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) {
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (StatesPage, error) {
_, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return StatesPage{}, ErrUnauthorizedAccess
}
return ts.states.RetrieveAll(ctx, offset, limit, id)
return ts.states.RetrieveAll(ctx, offset, limit, twinID)
}
func (ts *twinsService) SaveStates(msg *messaging.Message) error {
ids, err := ts.twins.RetrieveByAttribute(context.TODO(), msg.Channel, msg.Subtopic)
var ids []string
ctx := context.TODO()
channel, subtopic := msg.Channel, msg.Subtopic
ids, err := ts.twinCache.IDs(ctx, channel, subtopic)
if err != nil {
return err
}
if len(ids) < 1 {
ids, err = ts.twins.RetrieveByAttribute(ctx, channel, subtopic)
if err != nil {
return err
}
if len(ids) < 1 {
return nil
}
if err := ts.twinCache.SaveIDs(ctx, channel, subtopic, ids); err != nil {
return err
}
}
for _, id := range ids {
if err := ts.saveState(msg, id); err != nil {
@ -276,12 +292,13 @@ func (ts *twinsService) SaveStates(msg *messaging.Message) error {
return nil
}
func (ts *twinsService) saveState(msg *messaging.Message, id string) error {
func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error {
var b []byte
var err error
defer ts.publish(&id, &err, crudOp["stateSucc"], crudOp["stateFail"], &b)
defer ts.publish(&twinID, &err, crudOp["stateSucc"], crudOp["stateFail"], &b)
tw, err := ts.twins.RetrieveByID(context.TODO(), id)
ctx := context.TODO()
tw, err := ts.twins.RetrieveByID(ctx, twinID)
if err != nil {
return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err)
}
@ -291,7 +308,7 @@ func (ts *twinsService) saveState(msg *messaging.Message, id string) error {
return fmt.Errorf("Unmarshal payload for %s failed: %s", msg.Publisher, err)
}
st, err := ts.states.RetrieveLast(context.TODO(), tw.ID)
st, err := ts.states.RetrieveLast(ctx, tw.ID)
if err != nil {
return fmt.Errorf("Retrieve last state for %s failed: %s", msg.Publisher, err)
}
@ -302,17 +319,17 @@ func (ts *twinsService) saveState(msg *messaging.Message, id string) error {
case noop:
return nil
case update:
if err := ts.states.Update(context.TODO(), st); err != nil {
if err := ts.states.Update(ctx, st); err != nil {
return fmt.Errorf("Update state for %s failed: %s", msg.Publisher, err)
}
case save:
if err := ts.states.Save(context.TODO(), st); err != nil {
if err := ts.states.Save(ctx, st); err != nil {
return fmt.Errorf("Save state for %s failed: %s", msg.Publisher, err)
}
}
}
id = msg.Publisher
twinID = msg.Publisher
b = msg.Payload
return nil

View File

@ -8,7 +8,6 @@ import (
"fmt"
"testing"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mocks"
"github.com/mainflux/senml"
@ -17,30 +16,19 @@ import (
)
const (
twinName = "name"
wrongID = ""
token = "token"
wrongToken = "wrong-token"
email = "user@example.com"
natsURL = "nats://localhost:4222"
attrName1 = "temperature"
attrSubtopic1 = "engine"
attrName2 = "humidity"
attrSubtopic2 = "chassis"
attrName3 = "speed"
attrSubtopic3 = "wheel_2"
numRecs = 100
twinName = "name"
wrongID = ""
token = "token"
wrongToken = "wrong-token"
email = "user@example.com"
natsURL = "nats://localhost:4222"
numRecs = 100
)
func newService(tokens map[string]string) twins.Service {
auth := mocks.NewAuthNServiceClient(tokens)
twinsRepo := mocks.NewTwinRepository()
statesRepo := mocks.NewStateRepository()
uuidProvider := uuid.NewMock()
subs := map[string]string{"chanID": "chanID"}
broker := mocks.NewBroker(subs)
return twins.New(broker, auth, twinsRepo, statesRepo, uuidProvider, "chanID", nil)
}
var (
subtopics = []string{"engine", "chassis", "wheel_2"}
channels = []string{"01ec3c3e-0e66-4e69-9751-a0545b44e08f", "48061e4f-7c23-4f5c-9012-0f9b7cd9d18d", "5b2180e4-e96b-4469-9dc1-b6745078d0b6"}
)
func TestAddTwin(t *testing.T) {
svc := mocks.NewService(map[string]string{token: email})
@ -259,13 +247,15 @@ func TestSaveStates(t *testing.T) {
svc := mocks.NewService(map[string]string{token: email})
twin := twins.Twin{Owner: email}
def := mocks.CreateDefinition([]string{attrName1, attrName2}, []string{attrSubtopic1, attrSubtopic2})
def := mocks.CreateDefinition(channels[0:2], subtopics[0:2])
attr := def.Attributes[0]
attrSansTwin := mocks.CreateDefinition([]string{attrName3}, []string{attrSubtopic3}).Attributes[0]
attrSansTwin := mocks.CreateDefinition(channels[2:3], subtopics[2:3]).Attributes[0]
tw, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
recs := mocks.CreateSenML(numRecs, attrName1)
var recs = make([]senml.Record, numRecs)
mocks.CreateSenML(numRecs, recs)
var ttlAdded uint64
cases := []struct {
@ -323,17 +313,18 @@ func TestListStates(t *testing.T) {
svc := mocks.NewService(map[string]string{token: email})
twin := twins.Twin{Owner: email}
def := mocks.CreateDefinition([]string{attrName1, attrName2}, []string{attrSubtopic1, attrSubtopic2})
def := mocks.CreateDefinition(channels[0:2], subtopics[0:2])
attr := def.Attributes[0]
tw, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
tw2, err := svc.AddTwin(context.Background(), token,
twins.Twin{Owner: email},
mocks.CreateDefinition([]string{attrName3}, []string{attrSubtopic3}))
mocks.CreateDefinition(channels[2:3], subtopics[2:3]))
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
recs := mocks.CreateSenML(numRecs, attrName1)
var recs = make([]senml.Record, numRecs)
mocks.CreateSenML(numRecs, recs)
message, err := mocks.CreateMessage(attr, recs)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
err = svc.SaveStates(message)

View File

@ -27,17 +27,17 @@ type StatesPage struct {
// StateRepository specifies a state persistence API.
type StateRepository interface {
// Save persists the state
Save(context.Context, State) error
Save(ctx context.Context, state State) error
// Update updates the state
Update(context.Context, State) error
Update(ctx context.Context, state State) error
// Count returns the number of states related to state
Count(context.Context, Twin) (int64, error)
Count(ctx context.Context, twin Twin) (int64, error)
// RetrieveAll retrieves the subset of states related to twin specified by id
RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (StatesPage, error)
RetrieveAll(ctx context.Context, offset uint64, limit uint64, twinID string) (StatesPage, error)
// RetrieveLast retrieves the last saved state
RetrieveLast(ctx context.Context, id string) (State, error)
RetrieveLast(ctx context.Context, twinID string) (State, error)
}

View File

@ -18,9 +18,7 @@ const (
retrieveLastStateOp = "retrieve_states_by_attribute"
)
var (
_ twins.StateRepository = (*stateRepositoryMiddleware)(nil)
)
var _ twins.StateRepository = (*stateRepositoryMiddleware)(nil)
type stateRepositoryMiddleware struct {
tracer opentracing.Tracer
@ -60,18 +58,18 @@ func (trm stateRepositoryMiddleware) Count(ctx context.Context, tw twins.Twin) (
return trm.repo.Count(ctx, tw)
}
func (trm stateRepositoryMiddleware) RetrieveAll(ctx context.Context, offset, limit uint64, id string) (twins.StatesPage, error) {
func (trm stateRepositoryMiddleware) RetrieveAll(ctx context.Context, offset, limit uint64, twinID string) (twins.StatesPage, error) {
span := createSpan(ctx, trm.tracer, retrieveAllStatesOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return trm.repo.RetrieveAll(ctx, offset, limit, id)
return trm.repo.RetrieveAll(ctx, offset, limit, twinID)
}
func (trm stateRepositoryMiddleware) RetrieveLast(ctx context.Context, id string) (twins.State, error) {
func (trm stateRepositoryMiddleware) RetrieveLast(ctx context.Context, twinID string) (twins.State, error) {
span := createSpan(ctx, trm.tracer, retrieveAllStatesOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return trm.repo.RetrieveLast(ctx, id)
return trm.repo.RetrieveLast(ctx, twinID)
}

View File

@ -12,6 +12,7 @@ import (
const (
saveTwinOp = "save_twin"
saveTwinsOp = "save_twins"
updateTwinOp = "update_twin"
retrieveTwinByIDOp = "retrieve_twin_by_id"
retrieveAllTwinsOp = "retrieve_all_twins"
@ -19,17 +20,14 @@ const (
removeTwinOp = "remove_twin"
)
var (
_ twins.TwinRepository = (*twinRepositoryMiddleware)(nil)
)
var _ twins.TwinRepository = (*twinRepositoryMiddleware)(nil)
type twinRepositoryMiddleware struct {
tracer opentracing.Tracer
repo twins.TwinRepository
}
// TwinRepositoryMiddleware tracks request and their latency, and adds spans
// to context.
// TwinRepositoryMiddleware tracks request and their latency, and adds spans to context.
func TwinRepositoryMiddleware(tracer opentracing.Tracer, repo twins.TwinRepository) twins.TwinRepository {
return twinRepositoryMiddleware{
tracer: tracer,
@ -53,12 +51,12 @@ func (trm twinRepositoryMiddleware) Update(ctx context.Context, tw twins.Twin) e
return trm.repo.Update(ctx, tw)
}
func (trm twinRepositoryMiddleware) RetrieveByID(ctx context.Context, id string) (twins.Twin, error) {
func (trm twinRepositoryMiddleware) RetrieveByID(ctx context.Context, twinID string) (twins.Twin, error) {
span := createSpan(ctx, trm.tracer, retrieveTwinByIDOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return trm.repo.RetrieveByID(ctx, id)
return trm.repo.RetrieveByID(ctx, twinID)
}
func (trm twinRepositoryMiddleware) RetrieveAll(ctx context.Context, owner string, offset, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
@ -77,12 +75,65 @@ func (trm twinRepositoryMiddleware) RetrieveByAttribute(ctx context.Context, cha
return trm.repo.RetrieveByAttribute(ctx, channel, subtopic)
}
func (trm twinRepositoryMiddleware) Remove(ctx context.Context, id string) error {
func (trm twinRepositoryMiddleware) Remove(ctx context.Context, twinID string) error {
span := createSpan(ctx, trm.tracer, removeTwinOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return trm.repo.Remove(ctx, id)
return trm.repo.Remove(ctx, twinID)
}
type twinCacheMiddleware struct {
tracer opentracing.Tracer
cache twins.TwinCache
}
// TwinCacheMiddleware tracks request and their latency, and adds spans to context.
func TwinCacheMiddleware(tracer opentracing.Tracer, cache twins.TwinCache) twins.TwinCache {
return twinCacheMiddleware{
tracer: tracer,
cache: cache,
}
}
func (tcm twinCacheMiddleware) Save(ctx context.Context, twin twins.Twin) error {
span := createSpan(ctx, tcm.tracer, saveTwinOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return tcm.cache.Save(ctx, twin)
}
func (tcm twinCacheMiddleware) SaveIDs(ctx context.Context, channel, subtopic string, ids []string) error {
span := createSpan(ctx, tcm.tracer, saveTwinsOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return tcm.cache.SaveIDs(ctx, channel, subtopic, ids)
}
func (tcm twinCacheMiddleware) Update(ctx context.Context, twin twins.Twin) error {
span := createSpan(ctx, tcm.tracer, updateTwinOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return tcm.cache.Update(ctx, twin)
}
func (tcm twinCacheMiddleware) IDs(ctx context.Context, channel, subtopic string) ([]string, error) {
span := createSpan(ctx, tcm.tracer, retrieveTwinsByAttributeOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return tcm.cache.IDs(ctx, channel, subtopic)
}
func (tcm twinCacheMiddleware) Remove(ctx context.Context, twinID string) error {
span := createSpan(ctx, tcm.tracer, removeTwinOp)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return tcm.cache.Remove(ctx, twinID)
}
func createSpan(ctx context.Context, tracer opentracing.Tracer, opName string) opentracing.Span {

View File

@ -57,22 +57,40 @@ type Page struct {
// TwinRepository specifies a twin persistence API.
type TwinRepository interface {
// Save persists the twin
Save(context.Context, Twin) (string, error)
Save(ctx context.Context, twin Twin) (string, error)
// Update performs an update to the existing twin. A non-nil error is
// returned to indicate operation failure.
Update(context.Context, Twin) error
Update(ctx context.Context, twin Twin) error
// RetrieveByID retrieves the twin having the provided identifier.
RetrieveByID(ctx context.Context, id string) (Twin, error)
RetrieveByID(ctx context.Context, twinID string) (Twin, error)
// RetrieveByAttribute retrieves twin ids whose definition contains
// the attribute with given channel and subtopic
RetrieveByAttribute(ctx context.Context, channel, subtopic string) ([]string, error)
// RetrieveAll retrieves the subset of twins owned by the specified user.
RetrieveAll(context.Context, string, uint64, uint64, string, Metadata) (Page, error)
RetrieveAll(ctx context.Context, owner string, offset, limit uint64, name string, metadata Metadata) (Page, error)
// Remove removes the twin having the provided identifier.
Remove(ctx context.Context, id string) error
Remove(ctx context.Context, twinID string) error
}
// TwinCache contains twin caching interface.
type TwinCache interface {
// Save stores twin ID as element of channel-subtopic keyed set and vice versa.
Save(ctx context.Context, twin Twin) error
// SaveIDs stores twin IDs as elements of channel-subtopic keyed set and vice versa.
SaveIDs(ctx context.Context, channel, subtopic string, twinIDs []string) error
// Update updates update twin id and channel-subtopic attributes mapping
Update(ctx context.Context, twin Twin) error
// ID returns twin IDs for given attribute.
IDs(ctx context.Context, channel, subtopic string) ([]string, error)
// Removes twin from cache based on twin id.
Remove(ctx context.Context, twinID string) error
}