MF-730 - Add digital twin service for things (#855)

* Add starter kit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add http

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add cmd/main.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove reference to things from README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add starter kit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add http

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add cmd/main.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove reference to things from README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix env vars in README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix env vars in README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename kit to mfxkit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename kit to mfxkit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add docker compose related files

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add docker compose related files

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Dockerfile to mfxkit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Dockerfile to mfxkit

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twin service to docker-compose.yml

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twin service to docker-compose.yml

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mongo db connection

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mongo db connection

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add TwinRepository mockup

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix docker env vars

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix docker env vars

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twins repo mongodb routines

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twins repo mongodb routines

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mongo db docker test suite

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mongo db docker test suite

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add idp and toDBTwin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add idp and toDBTwin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add test cases to repo twin save test

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add test cases to repo twin save test

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add users grpc

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add users grpc

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add methods and tests for udpate and update key

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add methods and tests for udpate and update key

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add methods and tests for remove twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add methods and tests for remove twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add svc methods to loggin and metrics

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add svc methods to loggin and metrics

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add AddTwin endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add AddTwin endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add update endpoints

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add update endpoints

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add view and remove endpoints

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add view and remove endpoints

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twin repo mock

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add twin repo mock

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add owner arg to twins repo methods

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add owner arg to twins repo methods

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mock idp service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mock users service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt client to twins service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt client to twins service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add protocol to mqtt string var

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add protocol to mqtt string var

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add time and attributes related data to Twin struct

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add time and attributes related data to Twin struct

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt publish JSON serialized twin to service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt publish JSON serialized twin to service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for save and update twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for save and update twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for update key and view thing

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for update key and view thing

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix test error for view twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix test error for view twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for remove twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add endpoint tests for remove twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add NATS client

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add NATS client

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add normalizer to nats

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add normalizer to nats

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Refactor nats publish() and remove normalizer

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Refactor nats publish() and remove normalizer

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add ListThingsByChannel() and RetrieveByChannel()

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add ListThingsByChannel() and RetrieveByChannel()

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Twin struct directly in mongodb

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Twin struct directly in mongodb

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Change limit type to uint64

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Change limit type to uint64

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Compose nats subject from msg ch and subtopic

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Compose nats subject from msg ch and subtopic

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt notifs for thing creation and key update

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt notifs for thing creation and key update

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add RetrieveAll to twinRepository and ListTwins to service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add RetrieveAll to twinRepository and ListTwins to service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for RetrieveAll and ListTwins

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for RetrieveAll and ListTwins

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix Service interface error

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove test Ping endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt topic to service config

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove UpdateKey request and add fields to Twin related requests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Attributes and State add and view

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add ListTwins endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix service and database tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add ListTwinsByThing endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove twin directory

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add States and Definitions to Twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add defer publish to AddTwin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add refactored and deferred mqtt client publish to Service methods

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add paho subservice

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add paho client to nats

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mqtt publish and state update

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add Publish wrapper

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use paho Publish wrapper

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Save states in separate collection

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Replace []byte payload by []map[string]interface{}

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add offset to PageMetadata

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename TwinsPage to Page

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add StateRepository

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add StateRepository, mongodb StateRepository adapter and ListStates endpoint

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add RetrieveLast twin to states repository

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Parse def in nats to update state attrib

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add prepareState() helper

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Replace list things by id by view thing by id

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add state repo mock

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix service tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix mongo twins repo tests and remove owner from RetrieveByID params

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix mongo twins repo tests and remove owner from Remove params

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Replace ChannelID in Attribute by Channel

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix endpoint tests for add and update twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix endpoint tests for view and remove twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use new auth service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix mocks auth and mongodb tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix licensing info

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix 'for for'

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Replace short dec by var for default vals and add string map for mqtt op info

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Use Record from mainflux/senml and rename broker to nats in main.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove Key from Twin

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove redundant id uniqueness check from mongodb

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add attrib name to attrib update info

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix typos and remove isZeroOfUnderlyingType() helper

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename paho to mqtt

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix env vars in main.go and README.md

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Move twins/api/twins to twins/api and rename twins- prefix to tw- in main.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove flows for manual testing and revert to master docker-compose.yml

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove publish from nats and fix tests by updating package names

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename users.go to authn.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix crud op names

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Change id to twinID

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Inline if err != nil and change <prefix>ID to <prefix>_id

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix metadata db search test

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Rename mqtt and nats related files to publisher and subscriber respectively

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Move save state logic from nats to service

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Fix endpoint tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove close check from main.go

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Complete deploy section

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add unit tests for states

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove debugging artefacts and unneeded comments to exported funcs

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add port num to defAuthnURL

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic 2020-01-10 17:25:36 +01:00 committed by Drasko DRASKOVIC
parent 5834d364ad
commit b3991b8497
34 changed files with 4114 additions and 0 deletions

296
cmd/twins/main.go Normal file
View File

@ -0,0 +1,296 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/mainflux/mainflux/twins/mqtt"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/authn/api/grpc"
"github.com/mainflux/mainflux/logger"
localusers "github.com/mainflux/mainflux/things/users"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/api"
twapi "github.com/mainflux/mainflux/twins/api/http"
twmongodb "github.com/mainflux/mainflux/twins/mongodb"
twnats "github.com/mainflux/mainflux/twins/nats"
"github.com/mainflux/mainflux/twins/uuid"
nats "github.com/nats-io/go-nats"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
defLogLevel = "info"
defHTTPPort = "9021"
defJaegerURL = ""
defServerCert = ""
defServerKey = ""
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "27017"
defSingleUserEmail = ""
defSingleUserToken = ""
defClientTLS = "false"
defCACerts = ""
defMqttURL = "tcp://localhost:1883"
defThingID = ""
defThingKey = ""
defChannelID = ""
defNatsURL = nats.DefaultURL
defAuthnTimeout = "1" // in seconds
defAuthnURL = "localhost:8181"
envLogLevel = "MF_TWINS_LOG_LEVEL"
envHTTPPort = "MF_TWINS_HTTP_PORT"
envJaegerURL = "MF_JAEGER_URL"
envServerCert = "MF_TWINS_SERVER_CERT"
envServerKey = "MF_TWINS_SERVER_KEY"
envDBName = "MF_TWINS_DB_NAME"
envDBHost = "MF_TWINS_DB_HOST"
envDBPort = "MF_TWINS_DB_PORT"
envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN"
envClientTLS = "MF_TWINS_CLIENT_TLS"
envCACerts = "MF_TWINS_CA_CERTS"
envMqttURL = "MF_TWINS_MQTT_URL"
envThingID = "MF_TWINS_THING_ID"
envThingKey = "MF_TWINS_THING_KEY"
envChannelID = "MF_TWINS_CHANNEL_ID"
envNatsURL = "MF_NATS_URL"
envAuthnTimeout = "MF_AUTHN_TIMEOUT"
envAuthnURL = "MF_AUTHN_URL"
)
type config struct {
logLevel string
httpPort string
jaegerURL string
serverCert string
serverKey string
dbCfg twmongodb.Config
singleUserEmail string
singleUserToken string
clientTLS bool
caCerts string
mqttURL string
thingID string
thingKey string
channelID string
NatsURL string
authnTimeout time.Duration
authnURL string
}
func main() {
cfg := loadConfig()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
db, err := twmongodb.Connect(cfg.dbCfg, logger)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()
auth, _ := createAuthClient(cfg, authTracer, logger)
dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger)
defer dbCloser.Close()
pc := mqtt.Connect(cfg.mqttURL, cfg.thingID, cfg.thingKey, logger)
mc := mqtt.New(pc, cfg.channelID)
mcTracer, mcCloser := initJaeger("twins_mqtt", cfg.jaegerURL, logger)
defer mcCloser.Close()
nc, err := nats.Connect(cfg.NatsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()
ncTracer, ncCloser := initJaeger("twins_nats", cfg.jaegerURL, logger)
defer ncCloser.Close()
tracer, closer := initJaeger("twins", cfg.jaegerURL, logger)
defer closer.Close()
svc := newService(nc, ncTracer, mc, mcTracer, auth, dbTracer, db, logger)
errs := make(chan error, 2)
go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err = <-errs
logger.Error(fmt.Sprintf("Twins service terminated: %s", err))
}
func loadConfig() config {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}
timeout, err := strconv.ParseInt(mainflux.Env(envAuthnTimeout, defAuthnTimeout), 10, 64)
if err != nil {
log.Fatalf("Invalid %s value: %s", envAuthnTimeout, err.Error())
}
dbCfg := twmongodb.Config{
Name: mainflux.Env(envDBName, defDBName),
Host: mainflux.Env(envDBHost, defDBHost),
Port: mainflux.Env(envDBPort, defDBPort),
}
return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
dbCfg: dbCfg,
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
mqttURL: mainflux.Env(envMqttURL, defMqttURL),
thingID: mainflux.Env(envThingID, defThingID),
channelID: mainflux.Env(envChannelID, defChannelID),
thingKey: mainflux.Env(envThingKey, defThingKey),
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
authnURL: mainflux.Env(envAuthnURL, defAuthnURL),
authnTimeout: time.Duration(timeout) * time.Second,
}
}
func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}
tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}
return tracer, closer
}
func createAuthClient(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthNServiceClient, func() error) {
if cfg.singleUserEmail != "" && cfg.singleUserToken != "" {
return localusers.NewSingleUserService(cfg.singleUserEmail, cfg.singleUserToken), nil
}
conn := connectToAuth(cfg, logger)
return authapi.NewClient(tracer, conn, cfg.authnTimeout), conn.Close
}
func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}
conn, err := grpc.Dial(cfg.authnURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}
return conn
}
func newService(nc *nats.Conn, ncTracer opentracing.Tracer, mc mqtt.Mqtt, mcTracer opentracing.Tracer, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db)
stateRepo := twmongodb.NewStateRepository(db)
idp := uuid.New()
svc := twins.New(nc, mc, users, twinRepo, stateRepo, idp)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "twins",
Subsystem: "api",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "twins",
Subsystem: "api",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
twnats.Subscribe(nc, mc, svc, logger)
return svc
}
func startHTTPServer(handler http.Handler, port string, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("Twins service started using https on port %s with cert %s key %s",
port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, handler)
return
}
logger.Info(fmt.Sprintf("Twins service started using http on port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, handler)
}

2
go.mod
View File

@ -36,6 +36,7 @@ require (
github.com/mainflux/senml v1.0.0
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.3 // indirect
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.6.0
github.com/nats-io/nuid v1.0.0 // indirect
github.com/onsi/ginkgo v1.10.3 // indirect
@ -54,6 +55,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.5.0
github.com/stretchr/testify v1.4.0
github.com/tidwall/pretty v1.0.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect

15
go.sum
View File

@ -8,6 +8,7 @@ github.com/Microsoft/go-winio v0.4.7/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyv
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -22,6 +23,7 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cisco/senml v0.0.0-20181031221301-910a55054e16 h1:gVMpF6unIWOG8JgCt3XhlYdT3lRFDcMMVJdMesU+TQY=
github.com/cisco/senml v0.0.0-20181031221301-910a55054e16/go.mod h1:pLFASTQEf1nGfvoMwxmOjLeImwMKQMx18w38UcI9ZfI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/containerd/continuity v0.0.0-20180416230128-c6cef3483023 h1:ydDbSX89iFHufaVN8xlS22aWpajSFfmXL+fQNWnhrIg=
github.com/containerd/continuity v0.0.0-20180416230128-c6cef3483023/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
@ -64,6 +66,7 @@ github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80n
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-redis/redis v6.15.0+incompatible h1:/Wib9cA7CF3SQxBZRMHyQvqzlwzc8PJGDMkRfqQebSE=
github.com/go-redis/redis v6.15.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.7.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
@ -81,6 +84,7 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -92,6 +96,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gopcua/opcua v0.1.6 h1:B9SVRKQGzcWcwP2QPYN93Uku32+3wL+v5cgzBxE6V5I=
github.com/gopcua/opcua v0.1.6/go.mod h1:INwnDoRxmNWAt7+tzqxuGqQkSF2c1C69VAL0c2q6AcY=
@ -107,6 +112,7 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hokaccha/go-prettyjson v0.0.0-20180920040306-f579f869bbfe h1:MCgzztuoH5LZNr9AkIaicIDvCfACu11KUCCZQnRHDC0=
github.com/hokaccha/go-prettyjson v0.0.0-20180920040306-f579f869bbfe/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
@ -143,6 +149,7 @@ github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRU
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
@ -150,13 +157,17 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44=
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70=
github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
@ -228,6 +239,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQGFt7E53bPYqEgug/AoBtY=
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
@ -314,6 +327,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gorp.v1 v1.7.1 h1:GBB9KrWRATQZh95HJyVGUZrWwOPswitEYEyqlK8JbAA=
gopkg.in/gorp.v1 v1.7.1/go.mod h1:Wo3h+DBQZIxATwftsglhdD/62zRFPhGhTiu5jUJmCaw=
@ -322,6 +336,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ory-am/dockertest.v3 v3.3.2 h1:NgIHJacfXajJResc7luKYPF/F2kul6MXqbleEjv4PAY=
gopkg.in/ory-am/dockertest.v3 v3.3.2/go.mod h1:s9mmoLkaGeAh97qygnNj4xWkiN7e1SKekYC6CovU+ek=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

98
twins/README.md Normal file
View File

@ -0,0 +1,98 @@
# Twins
Service twins is used for CRUD and update of digital twins. Twin is a semantic
representation of a real world entity, be it device, application or something
else. It holds the sequence of attribute based definitions of a real world thing
and refers to the time series of definition based states that hold the
historical data about the represented real world thing.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|----------------------------|----------------------------------------------------------------------|-----------------------|
| MF_TWINS_LOG_LEVEL | Log level for twin service (debug, info, warn, error) | error |
| MF_TWINS_HTTP_PORT | Twins service HTTP port | 9021 |
| MF_TWINS_SERVER_CERT | Path to server certificate in PEM format | |
| MF_TWINS_SERVER_KEY | Path to server key in PEM format | |
| MF_JAEGER_URL | Jaeger server URL | |
| MF_TWINS_DB_NAME | Database name | mainflux |
| MF_TWINS_DB_HOST | Database host address | localhost |
| MF_TWINS_DB_PORT | Database host port | 27017 |
| MF_TWINS_SINGLE_USER_EMAIL | User email for single user mode (no gRPC communication with users) | |
| MF_TWINS_SINGLE_USER_TOKEN | User token for single user mode that should be passed in auth header | |
| MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | |
| MF_TWINS_MQTT_URL | Mqtt broker URL for twin CRUD and states update notifications | tcp://localhost:1883 |
| MF_TWINS_THING_ID | ID of thing representing twins service & mqtt user | |
| MF_TWINS_THING_KEY | Key of thing representing twins service & mqtt pass | |
| MF_TWINS_CHANNEL_ID | Mqtt notifications topic | |
| MF_NATS_URL | Mainflux NATS broker URL | nats://127.0.0.1:4222 |
| MF_AUTHN_GRPC_PORT | Authn service gRPC port | 8181 |
| MF_AUTHN_TIMEOUT | Authn gRPC request timeout in seconds | 1 |
| MF_AUTHN_URL | Authn service URL | localhost:8181 |
## Deployment
The service itself is distributed as Docker container. The following snippet
provides a compose file template that can be used to deploy the service container
locally:
```yaml
version: "3"
services:
twins:
image: mainflux/twins:[version]
container_name: [instance name]
ports:
- [host machine port]:[configured HTTP port]
environment:
MF_TWINS_LOG_LEVEL: [Twins log level]
MF_TWINS_HTTP_PORT: [Service HTTP port]
MF_TWINS_SERVER_CERT: [String path to server cert in pem format]
MF_TWINS_SERVER_KEY: [String path to server key in pem format]
MF_JAEGER_URL: [Jaeger server URL]
MF_TWINS_DB_NAME: [Database name]
MF_TWINS_DB_HOST: [Database host address]
MF_TWINS_DB_PORT: [Database host port]
MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode]
MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode]
MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on]
MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format]
MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states]
MF_TWINS_THING_ID: [ID of thing representing twins service]
MF_TWINS_THING_KEY: [Key of thing representing twins service]
MF_TWINS_CHANNEL_ID: [Mqtt notifications topic]
MF_NATS_URL: [Mainflux NATS broker URL]
MF_AUTHN_GRPC_PORT: [Authn service gRPC port]
MF_AUTHN_TIMEOUT: [Authn gRPC request timeout in seconds]
MF_AUTHN_URL: [Authn service URL]
```
To start the service outside of the container, execute the following shell script:
```bash
# download the latest version of the service
go get github.com/mainflux/mainflux
cd $GOPATH/src/github.com/mainflux/mainflux
# compile the twins
make twins
# copy binary to bin
make install
# set the environment variables and run the service
MF_TWINS_LOG_LEVEL: [Twins log level] MF_TWINS_HTTP_PORT: [Service HTTP port] MF_TWINS_SERVER_CERT: [String path to server cert in pem format] MF_TWINS_SERVER_KEY: [String path to server key in pem format] MF_JAEGER_URL: [Jaeger server URL] MF_TWINS_DB_NAME: [Database name] MF_TWINS_DB_HOST: [Database host address] MF_TWINS_DB_PORT: [Database host port] MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode] MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] MF_TWINS_THING_ID: [ID of thing representing twins service] MF_TWINS_THING_KEY: [Key of thing representing twins service] MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] MF_NATS_URL: [Mainflux NATS broker URL] MF_AUTHN_GRPC_PORT: [Authn service gRPC port] MF_AUTHN_TIMEOUT: [Authn gRPC request timeout in seconds] MF_AUTHN_URL: [Authn service URL] $GOBIN/mainflux-twins
```
## Usage
For more information about service capabilities and its usage, please check out
the [API documentation](swagger.yaml).
[doc]: http://mainflux.readthedocs.io

6
twins/api/doc.go Normal file
View File

@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package api contains API-related concerns: endpoint definitions, middlewares
// and all resource representations.
package api

5
twins/api/http/doc.go Normal file
View File

@ -0,0 +1,5 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package http contains implementation of kit service HTTP API.
package http

211
twins/api/http/endpoint.go Normal file
View File

@ -0,0 +1,211 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package http
import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/mainflux/mainflux/twins"
)
func addTwinEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(addTwinReq)
if err := req.validate(); err != nil {
return nil, err
}
twin := twins.Twin{
Name: req.Name,
ThingID: req.ThingID,
Metadata: req.Metadata,
}
saved, err := svc.AddTwin(ctx, req.token, twin, req.Definition)
if err != nil {
return nil, err
}
res := twinRes{
id: saved.ID,
created: true,
}
return res, nil
}
}
func updateTwinEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(updateTwinReq)
if err := req.validate(); err != nil {
return nil, err
}
twin := twins.Twin{
ID: req.id,
Name: req.Name,
ThingID: req.ThingID,
Metadata: req.Metadata,
}
if err := svc.UpdateTwin(ctx, req.token, twin, req.Definition); err != nil {
return nil, err
}
res := twinRes{id: req.id, created: false}
return res, nil
}
}
func viewTwinEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(viewTwinReq)
if err := req.validate(); err != nil {
return nil, err
}
twin, err := svc.ViewTwin(ctx, req.token, req.id)
if err != nil {
return nil, err
}
res := viewTwinRes{
Owner: twin.Owner,
ID: twin.ID,
Name: twin.Name,
ThingID: twin.ThingID,
Created: twin.Created,
Updated: twin.Updated,
Revision: twin.Revision,
Definitions: twin.Definitions,
Metadata: twin.Metadata,
}
return res, nil
}
}
func viewTwinByThingEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(viewTwinReq)
if err := req.validate(); err != nil {
return nil, err
}
twin, err := svc.ViewTwinByThing(ctx, req.token, req.id)
if err != nil {
return nil, err
}
res := viewTwinRes{
Owner: twin.Owner,
ID: twin.ID,
Name: twin.Name,
ThingID: twin.ThingID,
Created: twin.Created,
Updated: twin.Updated,
Revision: twin.Revision,
Definitions: twin.Definitions,
Metadata: twin.Metadata,
}
return res, nil
}
}
func listTwinsEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(listReq)
if err := req.validate(); err != nil {
return nil, err
}
page, err := svc.ListTwins(ctx, req.token, req.offset, req.limit, req.name, req.metadata)
if err != nil {
return nil, err
}
res := twinsPageRes{
pageRes: pageRes{
Total: page.Total,
Offset: page.Offset,
Limit: page.Limit,
},
Twins: []viewTwinRes{},
}
for _, twin := range page.Twins {
view := viewTwinRes{
Owner: twin.Owner,
ID: twin.ID,
Name: twin.Name,
ThingID: twin.ThingID,
Created: twin.Created,
Updated: twin.Updated,
Revision: twin.Revision,
Definitions: twin.Definitions,
Metadata: twin.Metadata,
}
res.Twins = append(res.Twins, view)
}
return res, nil
}
}
func removeTwinEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(viewTwinReq)
err := req.validate()
if err != nil {
return nil, err
}
if err := svc.RemoveTwin(ctx, req.token, req.id); err != nil {
return nil, err
}
return removeRes{}, nil
}
}
func listStatesEndpoint(svc twins.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(listStatesReq)
if err := req.validate(); err != nil {
return nil, err
}
page, err := svc.ListStates(ctx, req.token, req.offset, req.limit, req.id)
if err != nil {
return nil, err
}
res := statesPageRes{
pageRes: pageRes{
Total: page.Total,
Offset: page.Offset,
Limit: page.Limit,
},
States: []viewStateRes{},
}
for _, state := range page.States {
view := viewStateRes{
TwinID: state.TwinID,
ID: state.ID,
Definition: state.Definition,
Created: state.Created,
Payload: state.Payload,
}
res.States = append(res.States, view)
}
return res, nil
}
}

View File

@ -0,0 +1,468 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package http_test
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/mainflux/twins"
httpapi "github.com/mainflux/mainflux/twins/api/http"
"github.com/mainflux/mainflux/twins/mocks"
twmqtt "github.com/mainflux/mainflux/twins/mqtt"
nats "github.com/nats-io/go-nats"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
contentType = "application/json"
email = "user@example.com"
token = "token"
wrongValue = "wrong_value"
thingID = "5b68df78-86f7-48a6-ac4f-bb24dd75c39e"
wrongID = 0
maxNameSize = 1024
natsURL = "nats://localhost:4222"
topic = "topic"
)
var invalidName = strings.Repeat("m", maxNameSize+1)
type testRequest struct {
client *http.Client
method string
url string
contentType string
token string
body io.Reader
}
func (tr testRequest) make() (*http.Response, error) {
req, err := http.NewRequest(tr.method, tr.url, tr.body)
if err != nil {
return nil, err
}
if tr.token != "" {
req.Header.Set("Authorization", tr.token)
}
if tr.contentType != "" {
req.Header.Set("Content-Type", tr.contentType)
}
return tr.client.Do(req)
}
func newService(tokens map[string]string) twins.Service {
auth := mocks.NewAuthNServiceClient(tokens)
twinsRepo := mocks.NewTwinRepository()
statesRepo := mocks.NewStateRepository()
idp := mocks.NewIdentityProvider()
nc, _ := nats.Connect(natsURL)
opts := mqtt.NewClientOptions()
pc := mqtt.NewClient(opts)
mc := twmqtt.New(pc, topic)
return twins.New(nc, mc, auth, twinsRepo, statesRepo, idp)
}
func newServer(svc twins.Service) *httptest.Server {
mux := httpapi.MakeHandler(mocktracer.New(), svc)
return httptest.NewServer(mux)
}
func toJSON(data interface{}) string {
jsonData, _ := json.Marshal(data)
return string(jsonData)
}
func TestAddTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
ts := newServer(svc)
defer ts.Close()
// tw := twins.Twin{ThingID: thingID}
tw := twinReq{ThingID: thingID}
data := toJSON(tw)
tw.Name = invalidName
invalidData := toJSON(tw)
cases := []struct {
desc string
req string
contentType string
auth string
status int
location string
}{
{
desc: "add valid twin",
req: data,
contentType: contentType,
auth: token,
status: http.StatusCreated,
location: "/twins/123e4567-e89b-12d3-a456-000000000001",
},
{
desc: "add twin with empty JSON request",
req: "{}",
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
location: "",
},
{
desc: "add twin with invalid auth token",
req: data,
contentType: contentType,
auth: wrongValue,
status: http.StatusForbidden,
location: "",
},
{
desc: "add twin with empty auth token",
req: data,
contentType: contentType,
auth: "",
status: http.StatusForbidden,
location: "",
},
{
desc: "add twin with invalid request format",
req: "}",
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
location: "",
},
{
desc: "add twin with empty request",
req: "",
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
location: "",
},
{
desc: "add twin without content type",
req: data,
contentType: "",
auth: token,
status: http.StatusUnsupportedMediaType,
location: "",
},
{
desc: "add twin with invalid name",
req: invalidData,
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
location: "",
},
}
for _, tc := range cases {
req := testRequest{
client: ts.Client(),
method: http.MethodPost,
url: fmt.Sprintf("%s/twins", ts.URL),
contentType: tc.contentType,
token: tc.auth,
body: strings.NewReader(tc.req),
}
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
location := res.Header.Get("Location")
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
assert.Equal(t, tc.location, location, fmt.Sprintf("%s: expected location %s got %s", tc.desc, tc.location, location))
}
}
func TestUpdateTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
ts := newServer(svc)
defer ts.Close()
twin := twins.Twin{ThingID: thingID}
def := twins.Definition{}
data := toJSON(twin)
stw, _ := svc.AddTwin(context.Background(), token, twin, def)
tw := twin
tw.Name = invalidName
invalidData := toJSON(tw)
cases := []struct {
desc string
req string
id string
contentType string
auth string
status int
}{
{
desc: "update existing twin",
req: data,
id: stw.ID,
contentType: contentType,
auth: token,
status: http.StatusOK,
},
{
desc: "update twin with empty JSON request",
req: "{}",
id: stw.ID,
contentType: contentType,
auth: token,
status: http.StatusOK,
},
{
desc: "update non-existent twin",
req: data,
id: strconv.FormatUint(wrongID, 10),
contentType: contentType,
auth: token,
status: http.StatusNotFound,
},
{
desc: "update twin with invalid user token",
req: data,
id: stw.ID,
contentType: contentType,
auth: wrongValue,
status: http.StatusForbidden,
},
{
desc: "update twin with empty user token",
req: data,
id: stw.ID,
contentType: contentType,
auth: "",
status: http.StatusForbidden,
},
{
desc: "update twin with invalid data format",
req: "{",
id: stw.ID,
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
},
{
desc: "update twin with empty request",
req: "",
id: stw.ID,
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
},
{
desc: "update twin without content type",
req: data,
id: stw.ID,
contentType: "",
auth: token,
status: http.StatusUnsupportedMediaType,
},
{
desc: "update twin with invalid name",
req: invalidData,
contentType: contentType,
auth: token,
status: http.StatusBadRequest,
},
}
for _, tc := range cases {
req := testRequest{
client: ts.Client(),
method: http.MethodPut,
url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id),
contentType: tc.contentType,
token: tc.auth,
body: strings.NewReader(tc.req),
}
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
}
}
func TestViewTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
ts := newServer(svc)
defer ts.Close()
def := twins.Definition{}
twin := twins.Twin{ThingID: thingID}
stw, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
twres := twinRes{
Owner: stw.Owner,
Name: stw.Name,
ID: stw.ID,
ThingID: stw.ThingID,
Revision: stw.Revision,
Created: stw.Created,
Updated: stw.Updated,
Definitions: stw.Definitions,
Metadata: stw.Metadata,
}
data := toJSON(twres)
cases := []struct {
desc string
id string
auth string
status int
res string
}{
{
desc: "view existing twin",
id: stw.ID,
auth: token,
status: http.StatusOK,
res: data,
},
{
desc: "view non-existent twin",
id: strconv.FormatUint(wrongID, 10),
auth: token,
status: http.StatusNotFound,
res: "",
},
{
desc: "view twin by passing invalid token",
id: stw.ID,
auth: wrongValue,
status: http.StatusForbidden,
res: "",
},
{
desc: "view twin by passing empty id",
id: "",
auth: token,
status: http.StatusBadRequest,
res: "",
},
{
desc: "view twin by passing empty token",
id: stw.ID,
auth: "",
status: http.StatusForbidden,
res: "",
},
}
for _, tc := range cases {
req := testRequest{
client: ts.Client(),
method: http.MethodGet,
url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id),
token: tc.auth,
}
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
body, err := ioutil.ReadAll(res.Body)
data := strings.Trim(string(body), "\n")
assert.Equal(t, tc.res, data, fmt.Sprintf("%s: expected body %s got %s", tc.desc, tc.res, data))
}
}
func TestRemoveTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
ts := newServer(svc)
defer ts.Close()
def := twins.Definition{}
twin := twins.Twin{ThingID: thingID}
stw, _ := svc.AddTwin(context.Background(), token, twin, def)
cases := []struct {
desc string
id string
auth string
status int
}{
{
desc: "delete existing twin",
id: stw.ID,
auth: token,
status: http.StatusNoContent,
},
{
desc: "delete non-existent twin",
id: strconv.FormatUint(wrongID, 10),
auth: token,
status: http.StatusNoContent,
},
{
desc: "delete twin by passing empty id",
id: "",
auth: token,
status: http.StatusBadRequest,
},
{
desc: "delete twin with invalid token",
id: stw.ID,
auth: wrongValue,
status: http.StatusForbidden,
},
{
desc: "delete twin with empty token",
id: stw.ID,
auth: "",
status: http.StatusForbidden,
},
}
for _, tc := range cases {
req := testRequest{
client: ts.Client(),
method: http.MethodDelete,
url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id),
token: tc.auth,
}
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
}
}
type twinReq struct {
token string
Name string `json:"name,omitempty"`
ThingID string `json:"thing_id"`
Definition twins.Definition `json:"definition,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
type twinRes struct {
Owner string `json:"owner"`
Name string `json:"name,omitempty"`
ID string `json:"id"`
ThingID string `json:"thing_id"`
Revision int `json:"revision"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Definitions []twins.Definition `json:"definitions"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}

128
twins/api/http/requests.go Normal file
View File

@ -0,0 +1,128 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package http
import (
"github.com/mainflux/mainflux/twins"
)
const maxNameSize = 1024
const maxLimitSize = 100
type apiReq interface {
validate() error
}
type addTwinReq struct {
token string
Name string `json:"name,omitempty"`
ThingID string `json:"thing_id"`
Definition twins.Definition `json:"definition,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (req addTwinReq) validate() error {
if req.token == "" {
return twins.ErrUnauthorizedAccess
}
if req.ThingID == "" {
return twins.ErrMalformedEntity
}
if len(req.Name) > maxNameSize {
return twins.ErrMalformedEntity
}
return nil
}
type updateTwinReq struct {
token string
id string
Name string `json:"name,omitempty"`
ThingID string `json:"thing_id,omitempty"`
Definition twins.Definition `json:"definition,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (req updateTwinReq) validate() error {
if req.token == "" {
return twins.ErrUnauthorizedAccess
}
if req.id == "" {
return twins.ErrMalformedEntity
}
if len(req.Name) > maxNameSize {
return twins.ErrMalformedEntity
}
return nil
}
type viewTwinReq struct {
token string
id string
}
func (req viewTwinReq) validate() error {
if req.token == "" {
return twins.ErrUnauthorizedAccess
}
if req.id == "" {
return twins.ErrMalformedEntity
}
return nil
}
type listReq struct {
token string
offset uint64
limit uint64
name string
metadata map[string]interface{}
}
func (req *listReq) validate() error {
if req.token == "" {
return twins.ErrUnauthorizedAccess
}
if req.limit == 0 || req.limit > maxLimitSize {
return twins.ErrMalformedEntity
}
if len(req.name) > maxNameSize {
return twins.ErrMalformedEntity
}
return nil
}
type listStatesReq struct {
token string
offset uint64
limit uint64
id string
}
func (req *listStatesReq) validate() error {
if req.token == "" {
return twins.ErrUnauthorizedAccess
}
if req.id == "" {
return twins.ErrMalformedEntity
}
if req.limit == 0 || req.limit > maxLimitSize {
return twins.ErrMalformedEntity
}
return nil
}

147
twins/api/http/responses.go Normal file
View File

@ -0,0 +1,147 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package http
import (
"fmt"
"net/http"
"time"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/twins"
)
var (
_ mainflux.Response = (*twinRes)(nil)
_ mainflux.Response = (*viewTwinRes)(nil)
_ mainflux.Response = (*viewStateRes)(nil)
_ mainflux.Response = (*twinsPageRes)(nil)
_ mainflux.Response = (*statesPageRes)(nil)
_ mainflux.Response = (*removeRes)(nil)
)
type twinRes struct {
id string
created bool
}
func (res twinRes) Code() int {
if res.created {
return http.StatusCreated
}
return http.StatusOK
}
func (res twinRes) Headers() map[string]string {
if res.created {
return map[string]string{
"Location": fmt.Sprintf("/twins/%s", res.id),
}
}
return map[string]string{}
}
func (res twinRes) Empty() bool {
return true
}
type viewTwinRes struct {
Owner string `json:"owner,omitempty"`
ID string `json:"id"`
ThingID string `json:"thing_id"`
Name string `json:"name,omitempty"`
Revision int `json:"revision"`
Created time.Time `json:"created"`
Updated time.Time `json:"updated"`
Definitions []twins.Definition `json:"definitions,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (res viewTwinRes) Code() int {
return http.StatusOK
}
func (res viewTwinRes) Headers() map[string]string {
return map[string]string{}
}
func (res viewTwinRes) Empty() bool {
return false
}
type viewStateRes struct {
TwinID string `json:"twin_id"`
ID int64 `json:"id"`
Definition int `json:"definition"`
Created time.Time `json:"created"`
Payload map[string]interface{} `json:"payload"`
}
func (res viewStateRes) Code() int {
return http.StatusOK
}
func (res viewStateRes) Headers() map[string]string {
return map[string]string{}
}
func (res viewStateRes) Empty() bool {
return false
}
type pageRes struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
}
type twinsPageRes struct {
pageRes
Twins []viewTwinRes `json:"twins"`
}
func (res twinsPageRes) Code() int {
return http.StatusOK
}
func (res twinsPageRes) Headers() map[string]string {
return map[string]string{}
}
func (res twinsPageRes) Empty() bool {
return false
}
type statesPageRes struct {
pageRes
States []viewStateRes `json:"states"`
}
func (res statesPageRes) Code() int {
return http.StatusOK
}
func (res statesPageRes) Headers() map[string]string {
return map[string]string{}
}
func (res statesPageRes) Empty() bool {
return false
}
type removeRes struct{}
func (res removeRes) Code() int {
return http.StatusNoContent
}
func (res removeRes) Headers() map[string]string {
return map[string]string{}
}
func (res removeRes) Empty() bool {
return true
}

303
twins/api/http/transport.go Normal file
View File

@ -0,0 +1,303 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package http
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"strconv"
"strings"
kitot "github.com/go-kit/kit/tracing/opentracing"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/go-zoo/bone"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/twins"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
contentType = "application/json"
offset = "offset"
limit = "limit"
name = "name"
metadata = "metadata"
defLimit = 10
defOffset = 0
)
var (
errUnsupportedContentType = errors.New("unsupported content type")
errInvalidQueryParams = errors.New("invalid query params")
)
// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(tracer opentracing.Tracer, svc twins.Service) http.Handler {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(encodeError),
}
r := bone.New()
r.Post("/twins", kithttp.NewServer(
kitot.TraceServer(tracer, "add_twin")(addTwinEndpoint(svc)),
decodeTwinCreation,
encodeResponse,
opts...,
))
r.Put("/twins/:id", kithttp.NewServer(
kitot.TraceServer(tracer, "update_twin")(updateTwinEndpoint(svc)),
decodeTwinUpdate,
encodeResponse,
opts...,
))
r.Get("/twins/:id", kithttp.NewServer(
kitot.TraceServer(tracer, "view_twin")(viewTwinEndpoint(svc)),
decodeView,
encodeResponse,
opts...,
))
r.Delete("/twins/:id", kithttp.NewServer(
kitot.TraceServer(tracer, "remove_twin")(removeTwinEndpoint(svc)),
decodeView,
encodeResponse,
opts...,
))
r.Get("/twins", kithttp.NewServer(
kitot.TraceServer(tracer, "list_twins")(listTwinsEndpoint(svc)),
decodeList,
encodeResponse,
opts...,
))
r.Get("/things/:id", kithttp.NewServer(
kitot.TraceServer(tracer, "view_twin_by_thing")(viewTwinByThingEndpoint(svc)),
decodeViewTwinByThing,
encodeResponse,
opts...,
))
r.Get("/states/:id", kithttp.NewServer(
kitot.TraceServer(tracer, "list_states")(listStatesEndpoint(svc)),
decodeListStates,
encodeResponse,
opts...,
))
r.GetFunc("/version", mainflux.Version("twins"))
r.Handle("/metrics", promhttp.Handler())
return r
}
func decodeTwinCreation(_ context.Context, r *http.Request) (interface{}, error) {
if !strings.Contains(r.Header.Get("Content-Type"), contentType) {
return nil, errUnsupportedContentType
}
req := addTwinReq{token: r.Header.Get("Authorization")}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
return req, nil
}
func decodeTwinUpdate(_ context.Context, r *http.Request) (interface{}, error) {
if !strings.Contains(r.Header.Get("Content-Type"), contentType) {
return nil, errUnsupportedContentType
}
req := updateTwinReq{
token: r.Header.Get("Authorization"),
id: bone.GetValue(r, "id"),
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
return req, nil
}
func decodeView(_ context.Context, r *http.Request) (interface{}, error) {
req := viewTwinReq{
token: r.Header.Get("Authorization"),
id: bone.GetValue(r, "id"),
}
return req, nil
}
func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
l, err := readUintQuery(r, limit, defLimit)
if err != nil {
return nil, err
}
o, err := readUintQuery(r, offset, defOffset)
if err != nil {
return nil, err
}
n, err := readStringQuery(r, name)
if err != nil {
return nil, err
}
m, err := readMetadataQuery(r, "metadata")
if err != nil {
return nil, err
}
req := listReq{
token: r.Header.Get("Authorization"),
limit: l,
offset: o,
name: n,
metadata: m,
}
return req, nil
}
func decodeListStates(_ context.Context, r *http.Request) (interface{}, error) {
l, err := readUintQuery(r, limit, defLimit)
if err != nil {
return nil, err
}
o, err := readUintQuery(r, offset, defOffset)
if err != nil {
return nil, err
}
req := listStatesReq{
token: r.Header.Get("Authorization"),
limit: l,
offset: o,
id: bone.GetValue(r, "id"),
}
return req, nil
}
func decodeViewTwinByThing(_ context.Context, r *http.Request) (interface{}, error) {
req := viewTwinReq{
token: r.Header.Get("Authorization"),
id: bone.GetValue(r, "id"),
}
return req, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", contentType)
if ar, ok := response.(mainflux.Response); ok {
for k, v := range ar.Headers() {
w.Header().Set(k, v)
}
w.WriteHeader(ar.Code())
if ar.Empty() {
return nil
}
}
return json.NewEncoder(w).Encode(response)
}
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", contentType)
switch err {
case twins.ErrMalformedEntity:
w.WriteHeader(http.StatusBadRequest)
case twins.ErrUnauthorizedAccess:
w.WriteHeader(http.StatusForbidden)
case twins.ErrNotFound:
w.WriteHeader(http.StatusNotFound)
case twins.ErrConflict:
w.WriteHeader(http.StatusUnprocessableEntity)
case errUnsupportedContentType:
w.WriteHeader(http.StatusUnsupportedMediaType)
case errInvalidQueryParams:
w.WriteHeader(http.StatusBadRequest)
case io.ErrUnexpectedEOF:
w.WriteHeader(http.StatusBadRequest)
case io.EOF:
w.WriteHeader(http.StatusBadRequest)
default:
switch err.(type) {
case *json.SyntaxError:
w.WriteHeader(http.StatusBadRequest)
case *json.UnmarshalTypeError:
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusInternalServerError)
}
}
}
func readUintQuery(r *http.Request, key string, def uint64) (uint64, error) {
vals := bone.GetQuery(r, key)
if len(vals) > 1 {
return 0, errInvalidQueryParams
}
if len(vals) == 0 {
return def, nil
}
strval := vals[0]
val, err := strconv.ParseUint(strval, 10, 64)
if err != nil {
return 0, errInvalidQueryParams
}
return val, nil
}
func readStringQuery(r *http.Request, key string) (string, error) {
vals := bone.GetQuery(r, key)
if len(vals) > 1 {
return "", errInvalidQueryParams
}
if len(vals) == 0 {
return "", nil
}
return vals[0], nil
}
func readMetadataQuery(r *http.Request, key string) (map[string]interface{}, error) {
vals := bone.GetQuery(r, key)
if len(vals) > 1 {
return nil, errInvalidQueryParams
}
if len(vals) == 0 {
return nil, nil
}
m := make(map[string]interface{})
err := json.Unmarshal([]byte(vals[0]), &m)
if err != nil {
return nil, err
}
return m, nil
}

132
twins/api/logging.go Normal file
View File

@ -0,0 +1,132 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// +build !test
package api
import (
"context"
"fmt"
"time"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/twins"
)
var _ twins.Service = (*loggingMiddleware)(nil)
type loggingMiddleware struct {
logger log.Logger
svc twins.Service
}
// LoggingMiddleware adds logging facilities to the core service.
func LoggingMiddleware(svc twins.Service, logger log.Logger) twins.Service {
return &loggingMiddleware{logger, svc}
}
func (lm *loggingMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (saved twins.Twin, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method add_twin for token %s and twin %s took %s to complete", token, twin.ID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.AddTwin(ctx, token, twin, def)
}
func (lm *loggingMiddleware) UpdateTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method update_twin for token %s and twin %s took %s to complete", token, twin.ID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.UpdateTwin(ctx, token, twin, def)
}
func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method view_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ViewTwin(ctx, token, id)
}
func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.TwinsPage, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method list_twins for token %s took %s to complete", token, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ListTwins(ctx, token, offset, limit, name, metadata)
}
func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method save_state took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.SaveState(msg)
}
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method list_states for token %s took %s to complete", token, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ListStates(ctx, token, offset, limit, id)
}
func (lm *loggingMiddleware) ViewTwinByThing(ctx context.Context, token, thingid string) (tw twins.Twin, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method view_twin_by_thing for token %s and thing %s took %s to complete", token, thingid, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.ViewTwinByThing(ctx, token, thingid)
}
func (lm *loggingMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method remove_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.RemoveTwin(ctx, token, id)
}

105
twins/api/metrics.go Normal file
View File

@ -0,0 +1,105 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// +build !test
package api
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/twins"
)
var _ twins.Service = (*metricsMiddleware)(nil)
type metricsMiddleware struct {
counter metrics.Counter
latency metrics.Histogram
svc twins.Service
}
// MetricsMiddleware instruments core service by tracking request count and
// latency.
func MetricsMiddleware(svc twins.Service, counter metrics.Counter, latency metrics.Histogram) twins.Service {
return &metricsMiddleware{
counter: counter,
latency: latency,
svc: svc,
}
}
func (ms *metricsMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (saved twins.Twin, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "add_twin").Add(1)
ms.latency.With("method", "add_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.AddTwin(ctx, token, twin, def)
}
func (ms *metricsMiddleware) UpdateTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (err error) {
defer func(begin time.Time) {
ms.counter.With("method", "update_twin").Add(1)
ms.latency.With("method", "update_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.UpdateTwin(ctx, token, twin, def)
}
func (ms *metricsMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "view_twin").Add(1)
ms.latency.With("method", "view_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ViewTwin(ctx, token, id)
}
func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.TwinsPage, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "list_twins").Add(1)
ms.latency.With("method", "list_twins").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ListTwins(ctx, token, offset, limit, name, metadata)
}
func (ms *metricsMiddleware) SaveState(msg *mainflux.Message) error {
defer func(begin time.Time) {
ms.counter.With("method", "save_state").Add(1)
ms.latency.With("method", "save_state").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.SaveState(msg)
}
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) {
defer func(begin time.Time) {
ms.counter.With("method", "list_states").Add(1)
ms.latency.With("method", "list_states").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ListStates(ctx, token, offset, limit, id)
}
func (ms *metricsMiddleware) ViewTwinByThing(ctx context.Context, token, thingid string) (twins.Twin, error) {
defer func(begin time.Time) {
ms.counter.With("method", "view_twin_by_thing").Add(1)
ms.latency.With("method", "view_twin_by_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.ViewTwinByThing(ctx, token, thingid)
}
func (ms *metricsMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) {
defer func(begin time.Time) {
ms.counter.With("method", "remove_twin").Add(1)
ms.latency.With("method", "remove_twin").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.RemoveTwin(ctx, token, id)
}

11
twins/doc.go Normal file
View File

@ -0,0 +1,11 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package twins contains the domain concept definitions needed to support
// Mainflux twins service functionality. Twin is a semantic representation
// of a real world entity, be it device, application or something else.
// It holds the sequence of attribute based definitions of a real world
// thing and refers to the time series of definition based states that
// hold the historical data about the represented real world thing.
package twins

13
twins/idp.go Normal file
View File

@ -0,0 +1,13 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package twins
// IdentityProvider specifies an API for generating unique identifiers.
type IdentityProvider interface {
// ID generates the unique identifier.
ID() (string, error)
// IsValid checks whether string is a valid uuid4.
IsValid(u4 string) error
}

34
twins/mocks/authn.go Normal file
View File

@ -0,0 +1,34 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
"context"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/users"
"google.golang.org/grpc"
)
var _ mainflux.AuthNServiceClient = (*authNServiceClient)(nil)
type authNServiceClient struct {
users map[string]string
}
// NewAuthNServiceClient creates mock of auth service.
func NewAuthNServiceClient(users map[string]string) mainflux.AuthNServiceClient {
return &authNServiceClient{users}
}
func (svc authNServiceClient) Identify(ctx context.Context, in *mainflux.Token, opts ...grpc.CallOption) (*mainflux.UserID, error) {
if id, ok := svc.users[in.Value]; ok {
return &mainflux.UserID{Value: id}, nil
}
return nil, users.ErrUnauthorizedAccess
}
func (c *authNServiceClient) Issue(ctx context.Context, in *mainflux.IssueReq, opts ...grpc.CallOption) (*mainflux.Token, error) {
return new(mainflux.Token), nil
}

13
twins/mocks/commons.go Normal file
View File

@ -0,0 +1,13 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import "fmt"
// Since mocks will store data in map, and they need to resemble the real
// identifiers as much as possible, a key will be created as combination of
// owner and their id. This will allow searching by prefix or suffix.
func key(owner string, id string) string {
return fmt.Sprintf("%s-%s", owner, id)
}

43
twins/mocks/idp.go Normal file
View File

@ -0,0 +1,43 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
"fmt"
"strings"
"sync"
"github.com/mainflux/mainflux/twins"
)
const u4Pref = "123e4567-e89b-12d3-a456-"
var _ twins.IdentityProvider = (*identityProviderMock)(nil)
type identityProviderMock struct {
mu sync.Mutex
counter int
}
func (idp *identityProviderMock) ID() (string, error) {
idp.mu.Lock()
defer idp.mu.Unlock()
idp.counter++
return fmt.Sprintf("%s%012d", u4Pref, idp.counter), nil
}
func (idp *identityProviderMock) IsValid(u4 string) error {
if !strings.Contains(u4Pref, u4) {
return twins.ErrMalformedEntity
}
return nil
}
// NewIdentityProvider creates "mirror" identity provider, i.e. generated
// token will hold value provided by the caller.
func NewIdentityProvider() twins.IdentityProvider {
return &identityProviderMock{}
}

101
twins/mocks/states.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"github.com/mainflux/mainflux/twins"
)
var _ twins.StateRepository = (*stateRepositoryMock)(nil)
type stateRepositoryMock struct {
mu sync.Mutex
counter uint64
states map[string]twins.State
}
// NewStateRepository creates in-memory twin repository.
func NewStateRepository() twins.StateRepository {
return &stateRepositoryMock{
states: make(map[string]twins.State),
}
}
// SaveState persists the state
func (srm *stateRepositoryMock) Save(ctx context.Context, st twins.State) error {
srm.mu.Lock()
defer srm.mu.Unlock()
srm.states[key(st.TwinID, string(st.ID))] = st
return nil
}
// CountStates returns the number of states related to twin
func (srm *stateRepositoryMock) Count(ctx context.Context, tw twins.Twin) (int64, error) {
return int64(len(srm.states)), nil
}
func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (twins.StatesPage, error) {
srm.mu.Lock()
defer srm.mu.Unlock()
items := make([]twins.State, 0)
if limit <= 0 {
return twins.StatesPage{}, nil
}
// This obscure way to examine map keys is enforced by the key structure in mocks/commons.go
prefix := fmt.Sprintf("%s-", id)
for k, v := range srm.states {
if !strings.HasPrefix(k, prefix) {
continue
}
id := uint64(v.ID)
if id > offset && id < limit {
items = append(items, v)
}
if (uint64)(len(items)) >= limit {
break
}
}
sort.SliceStable(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
page := twins.StatesPage{
States: items,
PageMetadata: twins.PageMetadata{
Total: srm.counter,
Offset: offset,
Limit: limit,
},
}
return page, nil
}
// RetrieveLast returns the last state related to twin spec by id
func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, id string) (twins.State, error) {
srm.mu.Lock()
defer srm.mu.Unlock()
items := make([]twins.State, 0)
for _, v := range srm.states {
items = append(items, v)
}
sort.SliceStable(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
return items[len(items)-1], nil
}

141
twins/mocks/twins.go Normal file
View File

@ -0,0 +1,141 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"github.com/mainflux/mainflux/twins"
)
var _ twins.TwinRepository = (*twinRepositoryMock)(nil)
type twinRepositoryMock struct {
mu sync.Mutex
counter uint64
twins map[string]twins.Twin
}
// NewTwinRepository creates in-memory twin repository.
func NewTwinRepository() twins.TwinRepository {
return &twinRepositoryMock{
twins: make(map[string]twins.Twin),
}
}
func (trm *twinRepositoryMock) Save(ctx context.Context, twin twins.Twin) (string, error) {
trm.mu.Lock()
defer trm.mu.Unlock()
for _, tw := range trm.twins {
if tw.ID == twin.ID {
return "", twins.ErrConflict
}
}
trm.twins[key(twin.Owner, twin.ID)] = twin
return twin.ID, nil
}
func (trm *twinRepositoryMock) Update(ctx context.Context, twin twins.Twin) error {
trm.mu.Lock()
defer trm.mu.Unlock()
dbKey := key(twin.Owner, twin.ID)
if _, ok := trm.twins[dbKey]; !ok {
return twins.ErrNotFound
}
trm.twins[dbKey] = twin
return nil
}
func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, id string) (twins.Twin, error) {
trm.mu.Lock()
defer trm.mu.Unlock()
for k, v := range trm.twins {
if id == v.ID {
return trm.twins[k], nil
}
}
return twins.Twin{}, twins.ErrNotFound
}
func (trm *twinRepositoryMock) RetrieveByThing(_ context.Context, thingid string) (twins.Twin, error) {
trm.mu.Lock()
defer trm.mu.Unlock()
for _, twin := range trm.twins {
if twin.ThingID == thingid {
return twin, nil
}
}
return twins.Twin{}, twins.ErrNotFound
}
func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) {
trm.mu.Lock()
defer trm.mu.Unlock()
items := make([]twins.Twin, 0)
if limit <= 0 {
return twins.TwinsPage{}, nil
}
// This obscure way to examine map keys is enforced by the key structure in mocks/commons.go
prefix := fmt.Sprintf("%s-", owner)
for k, v := range trm.twins {
if (uint64)(len(items)) >= limit {
break
}
if !strings.HasPrefix(k, prefix) {
continue
}
suffix := string(v.ID[len(u4Pref):])
id, _ := strconv.ParseUint(suffix, 10, 64)
if id > offset && id <= uint64(offset+limit) {
items = append(items, v)
}
}
sort.SliceStable(items, func(i, j int) bool {
return items[i].ID < items[j].ID
})
page := twins.TwinsPage{
Twins: items,
PageMetadata: twins.PageMetadata{
Total: trm.counter,
Offset: offset,
Limit: limit,
},
}
return page, nil
}
func (trm *twinRepositoryMock) Remove(ctx context.Context, id string) error {
trm.mu.Lock()
defer trm.mu.Unlock()
for k, v := range trm.twins {
if id == v.ID {
delete(trm.twins, k)
}
}
return nil
}

6
twins/mongodb/doc.go Normal file
View File

@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package mongodb contains repository implementations using MongoDB as
// the underlying database.
package mongodb

33
twins/mongodb/init.go Normal file
View File

@ -0,0 +1,33 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb
import (
"context"
"fmt"
"github.com/mainflux/mainflux/logger"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Config defines the options that are used when connecting to a MongoDB instance
type Config struct {
Host string
Port string
Name string
}
// Connect creates a connection to the MongoDB instance
func Connect(cfg Config, logger logger.Logger) (*mongo.Database, error) {
addr := fmt.Sprintf("mongodb://%s:%s", cfg.Host, cfg.Port)
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to database: %s", err))
return nil, err
}
db := client.Database(cfg.Name)
return db, nil
}

View File

@ -0,0 +1,55 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb_test
import (
"context"
"fmt"
"os"
"testing"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
dockertest "gopkg.in/ory-am/dockertest.v3"
)
const (
wrongID = "0"
wrongValue = "wrong-value"
)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err))
}
cfg := []string{
"MONGO_INITDB_DATABASE=test",
}
container, err := pool.Run("mongo", "3.6-jessie", cfg)
if err != nil {
testLog.Error(fmt.Sprintf("Could not start container: %s", err))
}
port = container.GetPort("27017/tcp")
addr = fmt.Sprintf("mongodb://localhost:%s", port)
if err := pool.Retry(func() error {
_, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
return err
}); err != nil {
testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err))
}
code := m.Run()
if err := pool.Purge(container); err != nil {
testLog.Error(fmt.Sprintf("Could not purge container: %s", err))
}
os.Exit(code)
}

140
twins/mongodb/states.go Normal file
View File

@ -0,0 +1,140 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb
import (
"context"
"github.com/mainflux/mainflux/twins"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const statesCollection string = "states"
type stateRepository struct {
db *mongo.Database
}
var _ twins.StateRepository = (*stateRepository)(nil)
// NewStateRepository instantiates a MongoDB implementation of state
// repository.
func NewStateRepository(db *mongo.Database) twins.StateRepository {
return &stateRepository{
db: db,
}
}
// SaveState persists the state
func (sr *stateRepository) Save(ctx context.Context, st twins.State) error {
coll := sr.db.Collection(statesCollection)
if _, err := coll.InsertOne(context.Background(), st); err != nil {
return err
}
return nil
}
// CountStates returns the number of states related to twin
func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, error) {
coll := sr.db.Collection(statesCollection)
filter := bson.D{{"twinid", tw.ID}}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return 0, err
}
return total, nil
}
// RetrieveAll retrieves the subset of states related to twin specified by id
func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (twins.StatesPage, error) {
coll := sr.db.Collection(statesCollection)
findOptions := options.Find()
findOptions.SetSkip(int64(offset))
findOptions.SetLimit(int64(limit))
filter := bson.D{{"twinid", id}}
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
return twins.StatesPage{}, err
}
results, err := decodeStates(ctx, cur)
if err != nil {
return twins.StatesPage{}, err
}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return twins.StatesPage{}, err
}
return twins.StatesPage{
States: results,
PageMetadata: twins.PageMetadata{
Total: uint64(total),
Offset: offset,
Limit: limit,
},
}, nil
}
// RetrieveLast returns the last state related to twin spec by id
func (sr *stateRepository) RetrieveLast(ctx context.Context, id string) (twins.State, error) {
coll := sr.db.Collection(statesCollection)
filter := bson.D{{"twinid", id}}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return twins.State{}, err
}
findOptions := options.Find()
var skip int64
if total > 0 {
skip = total - 1
}
findOptions.SetSkip(skip)
findOptions.SetLimit(1)
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
return twins.State{}, err
}
results, err := decodeStates(ctx, cur)
if err != nil {
return twins.State{}, err
}
if len(results) < 1 {
return twins.State{}, nil
}
return results[0], nil
}
func decodeStates(ctx context.Context, cur *mongo.Cursor) ([]twins.State, error) {
defer cur.Close(ctx)
var results []twins.State
for cur.Next(ctx) {
var elem twins.State
if err := cur.Decode(&elem); err != nil {
return []twins.State{}, nil
}
results = append(results, elem)
}
if err := cur.Err(); err != nil {
return []twins.State{}, nil
}
return results, nil
}

View File

@ -0,0 +1,165 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mongodb"
"github.com/mainflux/mainflux/twins/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func TestStateSave(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewStateRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
var id int64
state := twins.State{
TwinID: twid,
ID: id,
Created: time.Now(),
}
cases := []struct {
desc string
state twins.State
err error
}{
{
desc: "save state",
state: state,
err: nil,
},
}
for _, tc := range cases {
err := repo.Save(context.Background(), tc.state)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestStatesRetrieveAll(t *testing.T) {
idp := uuid.New()
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
db.Collection("states").DeleteMany(context.Background(), bson.D{})
repo := mongodb.NewStateRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
n := uint64(10)
for i := uint64(0); i < n; i++ {
st := twins.State{
TwinID: twid,
ID: int64(i),
Created: time.Now(),
}
repo.Save(context.Background(), st)
}
cases := map[string]struct {
twid string
limit uint64
offset uint64
size uint64
total uint64
}{
"retrieve all states with existing twin": {
twid: twid,
offset: 0,
limit: n,
size: n,
total: n,
},
"retrieve subset of states with existing twin": {
twid: twid,
offset: 0,
limit: n / 2,
size: n / 2,
total: n,
},
"retrieve states with non-existing twin": {
twid: wrongValue,
offset: 0,
limit: n,
size: 0,
total: 0,
},
}
for desc, tc := range cases {
page, err := repo.RetrieveAll(context.Background(), tc.offset, tc.limit, tc.twid)
size := uint64(len(page.States))
assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size))
assert.Equal(t, tc.total, page.Total, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.total, page.Total))
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err))
}
}
func TestStatesRetrieveLast(t *testing.T) {
idp := uuid.New()
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
db.Collection("states").DeleteMany(context.Background(), bson.D{})
repo := mongodb.NewStateRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
n := int64(10)
for i := int64(1); i <= n; i++ {
st := twins.State{
TwinID: twid,
ID: i,
Created: time.Now(),
}
repo.Save(context.Background(), st)
}
cases := map[string]struct {
twid string
id int64
}{
"retrieve last state with existing twin": {
twid: twid,
id: n,
},
"retrieve states with non-existing owner": {
twid: wrongValue,
id: 0,
},
}
for desc, tc := range cases {
state, err := repo.RetrieveLast(context.Background(), tc.twid)
assert.Equal(t, tc.id, state.ID, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.id, state.ID))
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err))
}
}

199
twins/mongodb/twins.go Normal file
View File

@ -0,0 +1,199 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb
import (
"context"
"github.com/mainflux/mainflux/twins"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
maxNameSize = 1024
twinsCollection string = "twins"
)
type twinRepository struct {
db *mongo.Database
}
var _ twins.TwinRepository = (*twinRepository)(nil)
// NewTwinRepository instantiates a MongoDB implementation of twin repository.
func NewTwinRepository(db *mongo.Database) twins.TwinRepository {
return &twinRepository{
db: db,
}
}
func (tr *twinRepository) Save(ctx context.Context, tw twins.Twin) (string, error) {
if len(tw.Name) > maxNameSize {
return "", twins.ErrMalformedEntity
}
coll := tr.db.Collection(twinsCollection)
if _, err := coll.InsertOne(context.Background(), tw); err != nil {
return "", err
}
return tw.ID, nil
}
func (tr *twinRepository) Update(ctx context.Context, tw twins.Twin) error {
if len(tw.Name) > maxNameSize {
return twins.ErrMalformedEntity
}
coll := tr.db.Collection(twinsCollection)
filter := bson.D{{"id", tw.ID}}
update := bson.D{{"$set", tw}}
res, err := coll.UpdateOne(context.Background(), filter, update)
if err != nil {
return err
}
if res.ModifiedCount < 1 {
return twins.ErrNotFound
}
return nil
}
func (tr *twinRepository) RetrieveByID(_ context.Context, id string) (twins.Twin, error) {
coll := tr.db.Collection(twinsCollection)
var tw twins.Twin
filter := bson.D{{"id", id}}
if err := coll.FindOne(context.Background(), filter).Decode(&tw); err != nil {
return tw, twins.ErrNotFound
}
return tw, nil
}
func (tr *twinRepository) RetrieveByThing(ctx context.Context, thingid string) (twins.Twin, error) {
coll := tr.db.Collection(twinsCollection)
tw := twins.Twin{}
filter := bson.D{{"thingid", thingid}}
if err := coll.FindOne(context.Background(), filter).Decode(&tw); err != nil {
return tw, twins.ErrNotFound
}
return tw, nil
}
func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) {
coll := tr.db.Collection(twinsCollection)
findOptions := options.Find()
findOptions.SetSkip(int64(offset))
findOptions.SetLimit(int64(limit))
filter := bson.D{}
if owner != "" {
filter = append(filter, bson.E{"owner", owner})
}
if name != "" {
filter = append(filter, bson.E{"name", name})
}
if len(metadata) > 0 {
filter = append(filter, bson.E{"metadata", metadata})
}
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
return twins.TwinsPage{}, err
}
results, err := decodeTwins(ctx, cur)
if err != nil {
return twins.TwinsPage{}, err
}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return twins.TwinsPage{}, err
}
return twins.TwinsPage{
Twins: results,
PageMetadata: twins.PageMetadata{
Total: uint64(total),
Offset: offset,
Limit: limit,
},
}, nil
}
func (tr *twinRepository) RetrieveAllByThing(ctx context.Context, thingid string, offset uint64, limit uint64) (twins.TwinsPage, error) {
coll := tr.db.Collection(twinsCollection)
findOptions := options.Find()
findOptions.SetSkip(int64(offset))
findOptions.SetLimit(int64(limit))
filter := bson.D{{"thingid", thingid}}
cur, err := coll.Find(ctx, filter, findOptions)
if err != nil {
return twins.TwinsPage{}, err
}
results, err := decodeTwins(ctx, cur)
if err != nil {
return twins.TwinsPage{}, err
}
total, err := coll.CountDocuments(ctx, filter)
if err != nil {
return twins.TwinsPage{}, err
}
return twins.TwinsPage{
Twins: results,
PageMetadata: twins.PageMetadata{
Total: uint64(total),
Offset: offset,
Limit: limit,
},
}, nil
}
func (tr *twinRepository) Remove(ctx context.Context, id string) error {
coll := tr.db.Collection(twinsCollection)
filter := bson.D{{"id", id}}
res, err := coll.DeleteOne(context.Background(), filter)
if err != nil {
return err
}
if res.DeletedCount < 1 {
return twins.ErrNotFound
}
return nil
}
func decodeTwins(ctx context.Context, cur *mongo.Cursor) ([]twins.Twin, error) {
defer cur.Close(ctx)
var results []twins.Twin
for cur.Next(ctx) {
var elem twins.Twin
err := cur.Decode(&elem)
if err != nil {
return []twins.Twin{}, nil
}
results = append(results, elem)
}
if err := cur.Err(); err != nil {
return []twins.Twin{}, nil
}
return results, nil
}

385
twins/mongodb/twins_test.go Normal file
View File

@ -0,0 +1,385 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mongodb_test
import (
"context"
"fmt"
"os"
"strings"
"testing"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mongodb"
"github.com/mainflux/mainflux/twins/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
maxNameSize = 1024
msgsNum = 10
testDB = "test"
collection = "twins"
email = "mfx_twin@example.com"
validName = "mfx_twin"
)
var (
port string
addr string
testLog, _ = log.New(os.Stdout, log.Info.String())
idp = uuid.New()
db mongo.Database
invalidName = strings.Repeat("m", maxNameSize+1)
)
func TestTwinsSave(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonexistentTwinID, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin := twins.Twin{
Owner: email,
ID: twid,
}
cases := []struct {
desc string
twin twins.Twin
err error
}{
{
desc: "create new twin",
twin: twin,
err: nil,
},
{
desc: "create twin with invalid name",
twin: twins.Twin{
ID: nonexistentTwinID,
Owner: email,
Name: invalidName,
},
err: twins.ErrMalformedEntity,
},
}
for _, tc := range cases {
_, err := repo.Save(context.Background(), tc.twin)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestTwinsUpdate(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonexistentTwinID, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin := twins.Twin{
ID: twid,
Name: validName,
}
if _, err := repo.Save(context.Background(), twin); err != nil {
testLog.Error(err.Error())
}
twin.Name = "new_name"
cases := []struct {
desc string
twin twins.Twin
err error
}{
{
desc: "update existing twin",
twin: twin,
err: nil,
},
{
desc: "update non-existing twin",
twin: twins.Twin{
ID: nonexistentTwinID,
},
err: twins.ErrNotFound,
},
{
desc: "update twin with invalid name",
twin: twins.Twin{
ID: twid,
Owner: email,
Name: invalidName,
},
err: twins.ErrMalformedEntity,
},
}
for _, tc := range cases {
err := repo.Update(context.Background(), tc.twin)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestTwinsRetrieveByID(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonexistentTwinID, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin := twins.Twin{
ID: twid,
}
if _, err := repo.Save(context.Background(), twin); err != nil {
testLog.Error(err.Error())
}
cases := []struct {
desc string
id string
err error
}{
{
desc: "retrieve an existing twin",
id: twin.ID,
err: nil,
},
{
desc: "retrieve a non-existing twin",
id: nonexistentTwinID,
err: twins.ErrNotFound,
},
}
for _, tc := range cases {
_, err := repo.RetrieveByID(context.Background(), tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestTwinsRetrieveByThing(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
thingid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonexistentThingID, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin := twins.Twin{
ID: twid,
ThingID: thingid,
}
if _, err := repo.Save(context.Background(), twin); err != nil {
testLog.Error(err.Error())
}
cases := []struct {
desc string
thingid string
err error
}{
{
desc: "retrieve an existing twin",
thingid: thingid,
err: nil,
},
{
desc: "retrieve a non-existing twin",
thingid: nonexistentThingID,
err: twins.ErrNotFound,
},
}
for _, tc := range cases {
_, err := repo.RetrieveByThing(context.Background(), tc.thingid)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestTwinsRetrieveAll(t *testing.T) {
email := "twin-multi-retrieval@example.com"
name := "mainflux"
metadata := twins.Metadata{
"type": "test",
}
wrongMetadata := twins.Metadata{
"wrong": "wrong",
}
idp := uuid.New()
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
db.Collection(collection).DeleteMany(context.Background(), bson.D{})
twinRepo := mongodb.NewTwinRepository(db)
n := uint64(10)
for i := uint64(0); i < n; i++ {
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tw := twins.Twin{
Owner: email,
ID: twid,
Metadata: metadata,
}
// Create first two Twins with name.
if i < 2 {
tw.Name = name
}
twinRepo.Save(context.Background(), tw)
}
cases := map[string]struct {
owner string
limit uint64
offset uint64
name string
size uint64
total uint64
metadata twins.Metadata
}{
"retrieve all twins with existing owner": {
owner: email,
offset: 0,
limit: n,
size: n,
total: n,
},
"retrieve subset of twins with existing owner": {
owner: email,
offset: 0,
limit: n / 2,
size: n / 2,
total: n,
},
"retrieve twins with non-existing owner": {
owner: wrongValue,
offset: 0,
limit: n,
size: 0,
total: 0,
},
"retrieve twins with existing name": {
offset: 0,
limit: 1,
name: name,
size: 1,
total: 2,
},
"retrieve twins with non-existing name": {
offset: 0,
limit: n,
name: "wrong",
size: 0,
total: 0,
},
"retrieve twins with metadata": {
offset: 0,
limit: n,
size: n,
total: n,
metadata: metadata,
},
"retrieve twins with wrong metadata": {
offset: 0,
limit: n,
size: 0,
total: 0,
metadata: wrongMetadata,
},
}
for desc, tc := range cases {
page, err := twinRepo.RetrieveAll(context.Background(), tc.owner, tc.offset, tc.limit, tc.name, tc.metadata)
size := uint64(len(page.Twins))
assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size))
assert.Equal(t, tc.total, page.Total, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.total, page.Total))
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err))
}
}
func TestTwinsRemove(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
twid, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonexistentTwinID, err := idp.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin := twins.Twin{
ID: twid,
}
if _, err := repo.Save(context.Background(), twin); err != nil {
testLog.Error(err.Error())
}
cases := []struct {
desc string
id string
err error
}{
{
desc: "remove an existing twin",
id: twin.ID,
err: nil,
},
{
desc: "remove a non-existing twin",
id: nonexistentTwinID,
err: twins.ErrNotFound,
},
}
for _, tc := range cases {
err := repo.Remove(context.Background(), tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}

84
twins/mqtt/publisher.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"fmt"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/mainflux/logger"
)
// Mqtt stores mqtt client and topic
type Mqtt struct {
client mqtt.Client
channelID string
}
// New instantiates the mqtt service.
func New(mc mqtt.Client, channelID string) Mqtt {
return Mqtt{
client: mc,
channelID: channelID,
}
}
// Connect to MQTT broker
func Connect(mqttURL, id, key string, logger logger.Logger) mqtt.Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttURL)
opts.SetClientID("twins")
opts.SetUsername(id)
opts.SetPassword(key)
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetOnConnectHandler(func(c mqtt.Client) {
logger.Info("Connected to MQTT broker")
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error()))
os.Exit(1)
})
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error()))
os.Exit(1)
}
return client
}
func (m *Mqtt) Channel() string {
return m.channelID
}
func (m *Mqtt) publish(twinID, crudOp string, payload *[]byte) error {
topic := fmt.Sprintf("channels/%s/messages/%s/%s", m.channelID, twinID, crudOp)
if len(twinID) < 1 {
topic = fmt.Sprintf("channels/%s/messages/%s", m.channelID, crudOp)
}
token := m.client.Publish(topic, 0, false, *payload)
token.Wait()
return token.Error()
}
// Publish sends mqtt message to a predefined topic
func (m *Mqtt) Publish(twinID *string, err *error, succOp, failOp string, payload *[]byte) error {
op := succOp
if *err != nil {
op = failOp
esb := []byte((*err).Error())
payload = &esb
}
if err := m.publish(*twinID, op, payload); err != nil {
return err
}
return nil
}

57
twins/nats/subscriber.go Normal file
View File

@ -0,0 +1,57 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package nats
import (
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mqtt"
"github.com/nats-io/go-nats"
)
const (
queue = "twins"
input = "channel.>"
)
var crudOp = map[string]string{
"stateSucc": "state/success",
"stateFail": "state/failure",
}
type pubsub struct {
natsClient *nats.Conn
mqttClient mqtt.Mqtt
logger log.Logger
svc twins.Service
}
// Subscribe to appropriate NATS topic
func Subscribe(nc *nats.Conn, mc mqtt.Mqtt, svc twins.Service, logger log.Logger) {
ps := pubsub{
natsClient: nc,
mqttClient: mc,
logger: logger,
svc: svc,
}
ps.natsClient.QueueSubscribe(input, queue, ps.handleMsg)
}
func (ps pubsub) handleMsg(m *nats.Msg) {
var msg mainflux.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err))
return
}
if msg.Channel == ps.mqttClient.Channel() {
return
}
ps.svc.SaveState(&msg)
}

308
twins/service.go Normal file
View File

@ -0,0 +1,308 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package twins
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/twins/mqtt"
"github.com/mainflux/senml"
"github.com/nats-io/go-nats"
)
var (
// ErrMalformedEntity indicates malformed entity specification (e.g.
// invalid username or password).
ErrMalformedEntity = errors.New("malformed entity specification")
// ErrUnauthorizedAccess indicates missing or invalid credentials provided
// when accessing a protected resource.
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
// ErrNotFound indicates a non-existent entity request.
ErrNotFound = errors.New("non-existent entity")
// ErrConflict indicates that entity already exists.
ErrConflict = errors.New("entity already exists")
)
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
// AddTwin adds new twin related to user identified by the provided key.
AddTwin(context.Context, string, Twin, Definition) (Twin, error)
// UpdateTwin updates twin identified by the provided Twin that
// belongs to the user identified by the provided key.
UpdateTwin(context.Context, string, Twin, Definition) error
// ViewTwin retrieves data about twin with the provided
// ID belonging to the user identified by the provided key.
ViewTwin(context.Context, string, string) (Twin, error)
// ListTwins retrieves data about subset of twins that belongs to the
// user identified by the provided key.
ListTwins(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error)
// ListStates retrieves data about subset of states that belongs to the
// twin identified by the id.
ListStates(context.Context, string, uint64, uint64, string) (StatesPage, error)
// SaveState persists state into database
SaveState(*mainflux.Message) error
// ListTwinsByThing retrieves data about subset of twins that represent
// specified thing belong to the user identified by
// the provided key.
ViewTwinByThing(context.Context, string, string) (Twin, error)
// RemoveTwin removes the twin identified with the provided ID, that
// belongs to the user identified by the provided key.
RemoveTwin(context.Context, string, string) error
}
var crudOp = map[string]string{
"createSucc": "create/success",
"createFail": "create/failure",
"updateSucc": "update/success",
"updateFail": "update/failure",
"getSucc": "get/success",
"getFail": "get/failure",
"removeSucc": "remove/success",
"removeFail": "remove/failure",
}
type twinsService struct {
natsClient *nats.Conn
mqttClient mqtt.Mqtt
auth mainflux.AuthNServiceClient
twins TwinRepository
states StateRepository
idp IdentityProvider
}
var _ Service = (*twinsService)(nil)
// New instantiates the twins service implementation.
func New(nc *nats.Conn, mc mqtt.Mqtt, auth mainflux.AuthNServiceClient, twins TwinRepository, sr StateRepository, idp IdentityProvider) Service {
return &twinsService{
natsClient: nc,
mqttClient: mc,
auth: auth,
twins: twins,
states: sr,
idp: idp,
}
}
func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, def Definition) (tw Twin, err error) {
var id string
var b []byte
defer ts.mqttClient.Publish(&id, &err, crudOp["createSucc"], crudOp["createFail"], &b)
res, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return Twin{}, ErrUnauthorizedAccess
}
twin.ID, err = ts.idp.ID()
if err != nil {
return Twin{}, err
}
twin.Owner = res.GetValue()
twin.Created = time.Now()
twin.Updated = time.Now()
if len(def.Attributes) == 0 {
def = Definition{}
def.Attributes = make(map[string]Attribute)
}
def.Created = time.Now()
def.ID = 0
twin.Definitions = append(twin.Definitions, def)
twin.Revision = 0
if _, err = ts.twins.Save(ctx, twin); err != nil {
return Twin{}, err
}
id = twin.ID
b, err = json.Marshal(twin)
return twin, nil
}
func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) {
var b []byte
var id string
defer ts.mqttClient.Publish(&id, &err, crudOp["updateSucc"], crudOp["updateFail"], &b)
_, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return ErrUnauthorizedAccess
}
tw, err := ts.twins.RetrieveByID(ctx, twin.ID)
if err != nil {
return err
}
tw.Updated = time.Now()
tw.Revision++
if twin.Name != "" {
tw.Name = twin.Name
}
if twin.ThingID != "" {
tw.ThingID = twin.ThingID
}
if len(def.Attributes) > 0 {
def.Created = time.Now()
def.ID = tw.Definitions[len(tw.Definitions)-1].ID + 1
tw.Definitions = append(tw.Definitions, def)
}
if len(twin.Metadata) == 0 {
tw.Metadata = twin.Metadata
}
if err := ts.twins.Update(ctx, tw); err != nil {
return err
}
id = twin.ID
b, err = json.Marshal(tw)
return nil
}
func (ts *twinsService) ViewTwin(ctx context.Context, token, id string) (tw Twin, err error) {
var b []byte
defer ts.mqttClient.Publish(&id, &err, crudOp["getSucc"], crudOp["getFail"], &b)
_, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return Twin{}, ErrUnauthorizedAccess
}
twin, err := ts.twins.RetrieveByID(ctx, id)
if err != nil {
return Twin{}, err
}
b, err = json.Marshal(twin)
return twin, nil
}
func (ts *twinsService) ViewTwinByThing(ctx context.Context, token, thingid string) (Twin, error) {
_, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return Twin{}, ErrUnauthorizedAccess
}
return ts.twins.RetrieveByThing(ctx, thingid)
}
func (ts *twinsService) RemoveTwin(ctx context.Context, token, id string) (err error) {
var b []byte
defer ts.mqttClient.Publish(&id, &err, crudOp["removeSucc"], crudOp["removeFail"], &b)
_, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return ErrUnauthorizedAccess
}
if err := ts.twins.Remove(ctx, id); err != nil {
return err
}
return nil
}
func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata Metadata) (TwinsPage, error) {
res, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return TwinsPage{}, ErrUnauthorizedAccess
}
return ts.twins.RetrieveAll(ctx, res.GetValue(), offset, limit, name, metadata)
}
func (ts *twinsService) SaveState(msg *mainflux.Message) error {
var b []byte
var id string
var err error
defer ts.mqttClient.Publish(&id, &err, crudOp["stateSucc"], crudOp["stateFail"], &b)
tw, err := ts.twins.RetrieveByThing(context.TODO(), msg.Publisher)
if err != nil {
return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err)
}
var recs []senml.Record
if err := json.Unmarshal(msg.Payload, &recs); err != nil {
return fmt.Errorf("Unmarshal payload for %s failed: %s", msg.Publisher, err)
}
st, err := ts.states.RetrieveLast(context.TODO(), tw.ID)
if err != nil {
return fmt.Errorf("Retrieve last state for %s failed: %s", msg.Publisher, err)
}
if save := prepareState(&st, &tw, recs, msg); !save {
return nil
}
if err := ts.states.Save(context.TODO(), st); err != nil {
return fmt.Errorf("Updating state for %s failed: %s", msg.Publisher, err)
}
id = msg.Publisher
b = msg.Payload
return nil
}
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) {
_, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return StatesPage{}, ErrUnauthorizedAccess
}
return ts.states.RetrieveAll(ctx, offset, limit, id)
}
func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Message) bool {
def := tw.Definitions[len(tw.Definitions)-1]
st.TwinID = tw.ID
st.ID++
st.Created = time.Now()
st.Definition = def.ID
if st.Payload == nil {
st.Payload = make(map[string]interface{})
}
save := false
for k, a := range def.Attributes {
if !a.PersistState {
continue
}
if a.Channel == msg.Channel && a.Subtopic == msg.Subtopic {
st.Payload[k] = recs[0].Value
save = true
break
}
}
return save
}

257
twins/service_test.go Normal file
View File

@ -0,0 +1,257 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package twins_test
import (
"context"
"fmt"
"testing"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mocks"
nats "github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
twmqtt "github.com/mainflux/mainflux/twins/mqtt"
)
const (
twinName = "name"
wrongID = ""
token = "token"
wrongToken = "wrong-token"
email = "user@example.com"
natsURL = "nats://localhost:4222"
mqttURL = "tcp://localhost:1883"
topic = "topic"
)
func newService(tokens map[string]string) twins.Service {
auth := mocks.NewAuthNServiceClient(tokens)
twinsRepo := mocks.NewTwinRepository()
statesRepo := mocks.NewStateRepository()
idp := mocks.NewIdentityProvider()
nc, _ := nats.Connect(natsURL)
opts := mqtt.NewClientOptions()
pc := mqtt.NewClient(opts)
mc := twmqtt.New(pc, topic)
return twins.New(nc, mc, auth, twinsRepo, statesRepo, idp)
}
func TestAddTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
twin := twins.Twin{}
def := twins.Definition{}
cases := []struct {
desc string
twin twins.Twin
token string
err error
}{
{
desc: "add new twin",
twin: twin,
token: token,
err: nil,
},
{
desc: "add twin with wrong credentials",
twin: twin,
token: wrongToken,
err: twins.ErrUnauthorizedAccess,
},
}
for _, tc := range cases {
_, err := svc.AddTwin(context.Background(), tc.token, tc.twin, def)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestUpdateTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
twin := twins.Twin{}
other := twins.Twin{}
def := twins.Definition{}
other.ID = wrongID
saved, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
cases := []struct {
desc string
twin twins.Twin
token string
err error
}{
{
desc: "update existing twin",
twin: saved,
token: token,
err: nil,
},
{
desc: "update twin with wrong credentials",
twin: saved,
token: wrongToken,
err: twins.ErrUnauthorizedAccess,
},
{
desc: "update non-existing twin",
twin: other,
token: token,
err: twins.ErrNotFound,
},
}
for _, tc := range cases {
err := svc.UpdateTwin(context.Background(), tc.token, tc.twin, def)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestViewTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
twin := twins.Twin{}
def := twins.Definition{}
saved, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
cases := map[string]struct {
id string
token string
err error
}{
"view existing twin": {
id: saved.ID,
token: token,
err: nil,
},
"view twin with wrong credentials": {
id: saved.ID,
token: wrongToken,
err: twins.ErrUnauthorizedAccess,
},
"view non-existing twin": {
id: wrongID,
token: token,
err: twins.ErrNotFound,
},
}
for desc, tc := range cases {
_, err := svc.ViewTwin(context.Background(), tc.token, tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err))
}
}
func TestListTwins(t *testing.T) {
svc := newService(map[string]string{token: email})
twin := twins.Twin{Name: twinName, Owner: email}
def := twins.Definition{}
m := make(map[string]interface{})
m["serial"] = "123456"
twin.Metadata = m
n := uint64(10)
for i := uint64(0); i < n; i++ {
svc.AddTwin(context.Background(), token, twin, def)
}
cases := map[string]struct {
token string
offset uint64
limit uint64
size uint64
metadata map[string]interface{}
err error
}{
"list all twins": {
token: token,
offset: 0,
limit: n,
size: n,
err: nil,
},
"list with zero limit": {
token: token,
limit: 0,
offset: 0,
size: 0,
err: nil,
},
"list with offset and limit": {
token: token,
offset: 8,
limit: 5,
size: 2,
err: nil,
},
"list with wrong credentials": {
token: wrongToken,
limit: 0,
offset: n,
err: twins.ErrUnauthorizedAccess,
},
}
for desc, tc := range cases {
page, err := svc.ListTwins(context.Background(), tc.token, tc.offset, tc.limit, twinName, tc.metadata)
size := uint64(len(page.Twins))
assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size))
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err))
}
}
func TestRemoveTwin(t *testing.T) {
svc := newService(map[string]string{token: email})
twin := twins.Twin{}
def := twins.Definition{}
saved, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err))
cases := []struct {
desc string
id string
token string
err error
}{
{
desc: "remove twin with wrong credentials",
id: saved.ID,
token: wrongToken,
err: twins.ErrUnauthorizedAccess,
},
{
desc: "remove existing twin",
id: saved.ID,
token: token,
err: nil,
},
{
desc: "remove removed twin",
id: saved.ID,
token: token,
err: nil,
},
{
desc: "remove non-existing twin",
id: wrongID,
token: token,
err: nil,
},
}
for _, tc := range cases {
err := svc.RemoveTwin(context.Background(), tc.token, tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}

40
twins/states.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package twins
import (
"context"
"time"
)
// State stores actual snapshot of entity's values
type State struct {
TwinID string
ID int64
Definition int
Created time.Time
Payload map[string]interface{}
}
// StatesPage contains page related metadata as well as a list of twins that
// belong to this page.
type StatesPage struct {
PageMetadata
States []State
}
// StateRepository specifies a state persistence API.
type StateRepository interface {
// Save persists the state
Save(context.Context, State) error
// Count returns the number of states related to state
Count(context.Context, Twin) (int64, error)
// RetrieveAll retrieves the subset of states related to twin specified by id
RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (StatesPage, error)
// RetrieveLast retrieves the last saved state
RetrieveLast(ctx context.Context, id string) (State, error)
}

77
twins/twins.go Normal file
View File

@ -0,0 +1,77 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package twins
import (
"context"
"time"
)
// Metadata stores arbitrary twin data
type Metadata map[string]interface{}
// Attribute stores individual attribute data
type Attribute struct {
Channel string
Subtopic string
PersistState bool
}
// Definition stores entity's attributes
type Definition struct {
ID int
Created time.Time
Attributes map[string]Attribute
}
// Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and
// is assigned with the unique identifier.
type Twin struct {
Owner string
ID string
Name string
ThingID string
Created time.Time
Updated time.Time
Revision int
Definitions []Definition
Metadata Metadata
}
// PageMetadata contains page metadata that helps navigation.
type PageMetadata struct {
Total uint64
Offset uint64
Limit uint64
Name string
}
// TwinsPage contains page related metadata as well as a list of twins that
// belong to this page.
type TwinsPage struct {
PageMetadata
Twins []Twin
}
// TwinRepository specifies a twin persistence API.
type TwinRepository interface {
// Save persists the twin
Save(context.Context, Twin) (string, error)
// Update performs an update to the existing twin. A non-nil error is
// returned to indicate operation failure.
Update(context.Context, Twin) error
// RetrieveByID retrieves the twin having the provided identifier.
RetrieveByID(ctx context.Context, id string) (Twin, error)
// RetrieveAll retrieves the subset of things owned by the specified user.
RetrieveAll(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error)
// RetrieveByThing retrieves twin that represents specified thing
RetrieveByThing(context.Context, string) (Twin, error)
// Remove removes the twin having the provided identifier.
Remove(ctx context.Context, id string) error
}

36
twins/uuid/idp.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package uuid provides a UUID identity provider.
package uuid
import (
"github.com/gofrs/uuid"
"github.com/mainflux/mainflux/twins"
)
var _ twins.IdentityProvider = (*uuidIdentityProvider)(nil)
type uuidIdentityProvider struct{}
// New instantiates a UUID identity provider.
func New() twins.IdentityProvider {
return &uuidIdentityProvider{}
}
func (idp *uuidIdentityProvider) ID() (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", err
}
return id.String(), nil
}
func (idp *uuidIdentityProvider) IsValid(u4 string) error {
if _, err := uuid.FromString(u4); err != nil {
return twins.ErrMalformedEntity
}
return nil
}