diff --git a/cmd/twins/main.go b/cmd/twins/main.go new file mode 100644 index 00000000..d30487a1 --- /dev/null +++ b/cmd/twins/main.go @@ -0,0 +1,296 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/mainflux/mainflux/twins/mqtt" + + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/mainflux/mainflux" + authapi "github.com/mainflux/mainflux/authn/api/grpc" + "github.com/mainflux/mainflux/logger" + localusers "github.com/mainflux/mainflux/things/users" + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/api" + twapi "github.com/mainflux/mainflux/twins/api/http" + twmongodb "github.com/mainflux/mainflux/twins/mongodb" + twnats "github.com/mainflux/mainflux/twins/nats" + "github.com/mainflux/mainflux/twins/uuid" + nats "github.com/nats-io/go-nats" + opentracing "github.com/opentracing/opentracing-go" + stdprometheus "github.com/prometheus/client_golang/prometheus" + jconfig "github.com/uber/jaeger-client-go/config" + "go.mongodb.org/mongo-driver/mongo" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const ( + defLogLevel = "info" + defHTTPPort = "9021" + defJaegerURL = "" + defServerCert = "" + defServerKey = "" + defDBName = "mainflux" + defDBHost = "localhost" + defDBPort = "27017" + defSingleUserEmail = "" + defSingleUserToken = "" + defClientTLS = "false" + defCACerts = "" + defMqttURL = "tcp://localhost:1883" + defThingID = "" + defThingKey = "" + defChannelID = "" + defNatsURL = nats.DefaultURL + + defAuthnTimeout = "1" // in seconds + defAuthnURL = "localhost:8181" + + envLogLevel = "MF_TWINS_LOG_LEVEL" + envHTTPPort = "MF_TWINS_HTTP_PORT" + envJaegerURL = "MF_JAEGER_URL" + envServerCert = "MF_TWINS_SERVER_CERT" + envServerKey = "MF_TWINS_SERVER_KEY" + envDBName = "MF_TWINS_DB_NAME" + envDBHost = "MF_TWINS_DB_HOST" + envDBPort = "MF_TWINS_DB_PORT" + envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL" + envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN" + envClientTLS = "MF_TWINS_CLIENT_TLS" + envCACerts = "MF_TWINS_CA_CERTS" + envMqttURL = "MF_TWINS_MQTT_URL" + envThingID = "MF_TWINS_THING_ID" + envThingKey = "MF_TWINS_THING_KEY" + envChannelID = "MF_TWINS_CHANNEL_ID" + envNatsURL = "MF_NATS_URL" + + envAuthnTimeout = "MF_AUTHN_TIMEOUT" + envAuthnURL = "MF_AUTHN_URL" +) + +type config struct { + logLevel string + httpPort string + jaegerURL string + serverCert string + serverKey string + dbCfg twmongodb.Config + singleUserEmail string + singleUserToken string + clientTLS bool + caCerts string + mqttURL string + thingID string + thingKey string + channelID string + NatsURL string + + authnTimeout time.Duration + authnURL string +} + +func main() { + cfg := loadConfig() + + logger, err := logger.New(os.Stdout, cfg.logLevel) + if err != nil { + log.Fatalf(err.Error()) + } + + db, err := twmongodb.Connect(cfg.dbCfg, logger) + if err != nil { + logger.Error(err.Error()) + os.Exit(1) + } + + authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger) + defer authCloser.Close() + + auth, _ := createAuthClient(cfg, authTracer, logger) + + dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger) + defer dbCloser.Close() + + pc := mqtt.Connect(cfg.mqttURL, cfg.thingID, cfg.thingKey, logger) + mc := mqtt.New(pc, cfg.channelID) + + mcTracer, mcCloser := initJaeger("twins_mqtt", cfg.jaegerURL, logger) + defer mcCloser.Close() + + nc, err := nats.Connect(cfg.NatsURL) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + os.Exit(1) + } + defer nc.Close() + + ncTracer, ncCloser := initJaeger("twins_nats", cfg.jaegerURL, logger) + defer ncCloser.Close() + + tracer, closer := initJaeger("twins", cfg.jaegerURL, logger) + defer closer.Close() + + svc := newService(nc, ncTracer, mc, mcTracer, auth, dbTracer, db, logger) + errs := make(chan error, 2) + + go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs) + + go func() { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT) + errs <- fmt.Errorf("%s", <-c) + }() + + err = <-errs + logger.Error(fmt.Sprintf("Twins service terminated: %s", err)) +} + +func loadConfig() config { + tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS)) + if err != nil { + log.Fatalf("Invalid value passed for %s\n", envClientTLS) + } + + timeout, err := strconv.ParseInt(mainflux.Env(envAuthnTimeout, defAuthnTimeout), 10, 64) + if err != nil { + log.Fatalf("Invalid %s value: %s", envAuthnTimeout, err.Error()) + } + + dbCfg := twmongodb.Config{ + Name: mainflux.Env(envDBName, defDBName), + Host: mainflux.Env(envDBHost, defDBHost), + Port: mainflux.Env(envDBPort, defDBPort), + } + + return config{ + logLevel: mainflux.Env(envLogLevel, defLogLevel), + httpPort: mainflux.Env(envHTTPPort, defHTTPPort), + serverCert: mainflux.Env(envServerCert, defServerCert), + serverKey: mainflux.Env(envServerKey, defServerKey), + jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL), + dbCfg: dbCfg, + singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail), + singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken), + clientTLS: tls, + caCerts: mainflux.Env(envCACerts, defCACerts), + mqttURL: mainflux.Env(envMqttURL, defMqttURL), + thingID: mainflux.Env(envThingID, defThingID), + channelID: mainflux.Env(envChannelID, defChannelID), + thingKey: mainflux.Env(envThingKey, defThingKey), + NatsURL: mainflux.Env(envNatsURL, defNatsURL), + authnURL: mainflux.Env(envAuthnURL, defAuthnURL), + authnTimeout: time.Duration(timeout) * time.Second, + } +} + +func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) { + if url == "" { + return opentracing.NoopTracer{}, ioutil.NopCloser(nil) + } + + tracer, closer, err := jconfig.Configuration{ + ServiceName: svcName, + Sampler: &jconfig.SamplerConfig{ + Type: "const", + Param: 1, + }, + Reporter: &jconfig.ReporterConfig{ + LocalAgentHostPort: url, + LogSpans: true, + }, + }.NewTracer() + if err != nil { + logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err)) + os.Exit(1) + } + + return tracer, closer +} + +func createAuthClient(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthNServiceClient, func() error) { + if cfg.singleUserEmail != "" && cfg.singleUserToken != "" { + return localusers.NewSingleUserService(cfg.singleUserEmail, cfg.singleUserToken), nil + } + + conn := connectToAuth(cfg, logger) + return authapi.NewClient(tracer, conn, cfg.authnTimeout), conn.Close +} + +func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn { + var opts []grpc.DialOption + if cfg.clientTLS { + if cfg.caCerts != "" { + tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "") + if err != nil { + logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err)) + os.Exit(1) + } + opts = append(opts, grpc.WithTransportCredentials(tpc)) + } + } else { + opts = append(opts, grpc.WithInsecure()) + logger.Info("gRPC communication is not encrypted") + } + + conn, err := grpc.Dial(cfg.authnURL, opts...) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err)) + os.Exit(1) + } + + return conn +} + +func newService(nc *nats.Conn, ncTracer opentracing.Tracer, mc mqtt.Mqtt, mcTracer opentracing.Tracer, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, logger logger.Logger) twins.Service { + twinRepo := twmongodb.NewTwinRepository(db) + stateRepo := twmongodb.NewStateRepository(db) + idp := uuid.New() + + svc := twins.New(nc, mc, users, twinRepo, stateRepo, idp) + svc = api.LoggingMiddleware(svc, logger) + svc = api.MetricsMiddleware( + svc, + kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "twins", + Subsystem: "api", + Name: "request_count", + Help: "Number of requests received.", + }, []string{"method"}), + kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "twins", + Subsystem: "api", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, []string{"method"}), + ) + + twnats.Subscribe(nc, mc, svc, logger) + + return svc +} + +func startHTTPServer(handler http.Handler, port string, cfg config, logger logger.Logger, errs chan error) { + p := fmt.Sprintf(":%s", port) + if cfg.serverCert != "" || cfg.serverKey != "" { + logger.Info(fmt.Sprintf("Twins service started using https on port %s with cert %s key %s", + port, cfg.serverCert, cfg.serverKey)) + errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, handler) + return + } + logger.Info(fmt.Sprintf("Twins service started using http on port %s", cfg.httpPort)) + errs <- http.ListenAndServe(p, handler) +} diff --git a/go.mod b/go.mod index e85a8ff4..de6275ce 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/mainflux/senml v1.0.0 github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.3 // indirect + github.com/nats-io/gnatsd v1.4.1 // indirect github.com/nats-io/go-nats v1.6.0 github.com/nats-io/nuid v1.0.0 // indirect github.com/onsi/ginkgo v1.10.3 // indirect @@ -54,6 +55,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.5.0 github.com/stretchr/testify v1.4.0 + github.com/tidwall/pretty v1.0.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible github.com/uber/jaeger-lib v2.0.0+incompatible // indirect github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect diff --git a/go.sum b/go.sum index 678217c7..55aaae75 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/Microsoft/go-winio v0.4.7/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyv github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -22,6 +23,7 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cisco/senml v0.0.0-20181031221301-910a55054e16 h1:gVMpF6unIWOG8JgCt3XhlYdT3lRFDcMMVJdMesU+TQY= github.com/cisco/senml v0.0.0-20181031221301-910a55054e16/go.mod h1:pLFASTQEf1nGfvoMwxmOjLeImwMKQMx18w38UcI9ZfI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/containerd/continuity v0.0.0-20180416230128-c6cef3483023 h1:ydDbSX89iFHufaVN8xlS22aWpajSFfmXL+fQNWnhrIg= github.com/containerd/continuity v0.0.0-20180416230128-c6cef3483023/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= @@ -64,6 +66,7 @@ github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80n github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis v6.15.0+incompatible h1:/Wib9cA7CF3SQxBZRMHyQvqzlwzc8PJGDMkRfqQebSE= github.com/go-redis/redis v6.15.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.7.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= @@ -81,6 +84,7 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -92,6 +96,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gopcua/opcua v0.1.6 h1:B9SVRKQGzcWcwP2QPYN93Uku32+3wL+v5cgzBxE6V5I= github.com/gopcua/opcua v0.1.6/go.mod h1:INwnDoRxmNWAt7+tzqxuGqQkSF2c1C69VAL0c2q6AcY= @@ -107,6 +112,7 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hokaccha/go-prettyjson v0.0.0-20180920040306-f579f869bbfe h1:MCgzztuoH5LZNr9AkIaicIDvCfACu11KUCCZQnRHDC0= github.com/hokaccha/go-prettyjson v0.0.0-20180920040306-f579f869bbfe/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -143,6 +149,7 @@ github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRU github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -150,13 +157,17 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= +github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70= github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -228,6 +239,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQGFt7E53bPYqEgug/AoBtY= github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= @@ -314,6 +327,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gorp.v1 v1.7.1 h1:GBB9KrWRATQZh95HJyVGUZrWwOPswitEYEyqlK8JbAA= gopkg.in/gorp.v1 v1.7.1/go.mod h1:Wo3h+DBQZIxATwftsglhdD/62zRFPhGhTiu5jUJmCaw= @@ -322,6 +336,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ory-am/dockertest.v3 v3.3.2 h1:NgIHJacfXajJResc7luKYPF/F2kul6MXqbleEjv4PAY= gopkg.in/ory-am/dockertest.v3 v3.3.2/go.mod h1:s9mmoLkaGeAh97qygnNj4xWkiN7e1SKekYC6CovU+ek= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/twins/README.md b/twins/README.md new file mode 100644 index 00000000..b4f8c232 --- /dev/null +++ b/twins/README.md @@ -0,0 +1,98 @@ +# Twins + +Service twins is used for CRUD and update of digital twins. Twin is a semantic +representation of a real world entity, be it device, application or something +else. It holds the sequence of attribute based definitions of a real world thing +and refers to the time series of definition based states that hold the +historical data about the represented real world thing. + +## Configuration + +The service is configured using the environment variables presented in the +following table. Note that any unset variables will be replaced with their +default values. + +| Variable | Description | Default | +|----------------------------|----------------------------------------------------------------------|-----------------------| +| MF_TWINS_LOG_LEVEL | Log level for twin service (debug, info, warn, error) | error | +| MF_TWINS_HTTP_PORT | Twins service HTTP port | 9021 | +| MF_TWINS_SERVER_CERT | Path to server certificate in PEM format | | +| MF_TWINS_SERVER_KEY | Path to server key in PEM format | | +| MF_JAEGER_URL | Jaeger server URL | | +| MF_TWINS_DB_NAME | Database name | mainflux | +| MF_TWINS_DB_HOST | Database host address | localhost | +| MF_TWINS_DB_PORT | Database host port | 27017 | +| MF_TWINS_SINGLE_USER_EMAIL | User email for single user mode (no gRPC communication with users) | | +| MF_TWINS_SINGLE_USER_TOKEN | User token for single user mode that should be passed in auth header | | +| MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | +| MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | | +| MF_TWINS_MQTT_URL | Mqtt broker URL for twin CRUD and states update notifications | tcp://localhost:1883 | +| MF_TWINS_THING_ID | ID of thing representing twins service & mqtt user | | +| MF_TWINS_THING_KEY | Key of thing representing twins service & mqtt pass | | +| MF_TWINS_CHANNEL_ID | Mqtt notifications topic | | +| MF_NATS_URL | Mainflux NATS broker URL | nats://127.0.0.1:4222 | +| MF_AUTHN_GRPC_PORT | Authn service gRPC port | 8181 | +| MF_AUTHN_TIMEOUT | Authn gRPC request timeout in seconds | 1 | +| MF_AUTHN_URL | Authn service URL | localhost:8181 | + +## Deployment + +The service itself is distributed as Docker container. The following snippet +provides a compose file template that can be used to deploy the service container +locally: + +```yaml +version: "3" +services: + twins: + image: mainflux/twins:[version] + container_name: [instance name] + ports: + - [host machine port]:[configured HTTP port] + environment: + MF_TWINS_LOG_LEVEL: [Twins log level] + MF_TWINS_HTTP_PORT: [Service HTTP port] + MF_TWINS_SERVER_CERT: [String path to server cert in pem format] + MF_TWINS_SERVER_KEY: [String path to server key in pem format] + MF_JAEGER_URL: [Jaeger server URL] + MF_TWINS_DB_NAME: [Database name] + MF_TWINS_DB_HOST: [Database host address] + MF_TWINS_DB_PORT: [Database host port] + MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode] + MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] + MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] + MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] + MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] + MF_TWINS_THING_ID: [ID of thing representing twins service] + MF_TWINS_THING_KEY: [Key of thing representing twins service] + MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] + MF_NATS_URL: [Mainflux NATS broker URL] + MF_AUTHN_GRPC_PORT: [Authn service gRPC port] + MF_AUTHN_TIMEOUT: [Authn gRPC request timeout in seconds] + MF_AUTHN_URL: [Authn service URL] +``` + +To start the service outside of the container, execute the following shell script: + +```bash +# download the latest version of the service +go get github.com/mainflux/mainflux + +cd $GOPATH/src/github.com/mainflux/mainflux + +# compile the twins +make twins + +# copy binary to bin +make install + +# set the environment variables and run the service +MF_TWINS_LOG_LEVEL: [Twins log level] MF_TWINS_HTTP_PORT: [Service HTTP port] MF_TWINS_SERVER_CERT: [String path to server cert in pem format] MF_TWINS_SERVER_KEY: [String path to server key in pem format] MF_JAEGER_URL: [Jaeger server URL] MF_TWINS_DB_NAME: [Database name] MF_TWINS_DB_HOST: [Database host address] MF_TWINS_DB_PORT: [Database host port] MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode] MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] MF_TWINS_THING_ID: [ID of thing representing twins service] MF_TWINS_THING_KEY: [Key of thing representing twins service] MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] MF_NATS_URL: [Mainflux NATS broker URL] MF_AUTHN_GRPC_PORT: [Authn service gRPC port] MF_AUTHN_TIMEOUT: [Authn gRPC request timeout in seconds] MF_AUTHN_URL: [Authn service URL] $GOBIN/mainflux-twins +``` + +## Usage + +For more information about service capabilities and its usage, please check out +the [API documentation](swagger.yaml). + +[doc]: http://mainflux.readthedocs.io diff --git a/twins/api/doc.go b/twins/api/doc.go new file mode 100644 index 00000000..fb3127e4 --- /dev/null +++ b/twins/api/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package api contains API-related concerns: endpoint definitions, middlewares +// and all resource representations. +package api diff --git a/twins/api/http/doc.go b/twins/api/http/doc.go new file mode 100644 index 00000000..b198bb55 --- /dev/null +++ b/twins/api/http/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package http contains implementation of kit service HTTP API. +package http diff --git a/twins/api/http/endpoint.go b/twins/api/http/endpoint.go new file mode 100644 index 00000000..43017892 --- /dev/null +++ b/twins/api/http/endpoint.go @@ -0,0 +1,211 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "context" + + "github.com/go-kit/kit/endpoint" + "github.com/mainflux/mainflux/twins" +) + +func addTwinEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(addTwinReq) + + if err := req.validate(); err != nil { + return nil, err + } + + twin := twins.Twin{ + Name: req.Name, + ThingID: req.ThingID, + Metadata: req.Metadata, + } + saved, err := svc.AddTwin(ctx, req.token, twin, req.Definition) + if err != nil { + return nil, err + } + + res := twinRes{ + id: saved.ID, + created: true, + } + return res, nil + } +} + +func updateTwinEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(updateTwinReq) + + if err := req.validate(); err != nil { + return nil, err + } + + twin := twins.Twin{ + ID: req.id, + Name: req.Name, + ThingID: req.ThingID, + Metadata: req.Metadata, + } + + if err := svc.UpdateTwin(ctx, req.token, twin, req.Definition); err != nil { + return nil, err + } + + res := twinRes{id: req.id, created: false} + return res, nil + } +} + +func viewTwinEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(viewTwinReq) + + if err := req.validate(); err != nil { + return nil, err + } + + twin, err := svc.ViewTwin(ctx, req.token, req.id) + if err != nil { + return nil, err + } + + res := viewTwinRes{ + Owner: twin.Owner, + ID: twin.ID, + Name: twin.Name, + ThingID: twin.ThingID, + Created: twin.Created, + Updated: twin.Updated, + Revision: twin.Revision, + Definitions: twin.Definitions, + Metadata: twin.Metadata, + } + return res, nil + } +} + +func viewTwinByThingEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(viewTwinReq) + + if err := req.validate(); err != nil { + return nil, err + } + + twin, err := svc.ViewTwinByThing(ctx, req.token, req.id) + if err != nil { + return nil, err + } + + res := viewTwinRes{ + Owner: twin.Owner, + ID: twin.ID, + Name: twin.Name, + ThingID: twin.ThingID, + Created: twin.Created, + Updated: twin.Updated, + Revision: twin.Revision, + Definitions: twin.Definitions, + Metadata: twin.Metadata, + } + + return res, nil + } +} + +func listTwinsEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(listReq) + + if err := req.validate(); err != nil { + return nil, err + } + + page, err := svc.ListTwins(ctx, req.token, req.offset, req.limit, req.name, req.metadata) + if err != nil { + return nil, err + } + + res := twinsPageRes{ + pageRes: pageRes{ + Total: page.Total, + Offset: page.Offset, + Limit: page.Limit, + }, + Twins: []viewTwinRes{}, + } + for _, twin := range page.Twins { + view := viewTwinRes{ + Owner: twin.Owner, + ID: twin.ID, + Name: twin.Name, + ThingID: twin.ThingID, + Created: twin.Created, + Updated: twin.Updated, + Revision: twin.Revision, + Definitions: twin.Definitions, + Metadata: twin.Metadata, + } + res.Twins = append(res.Twins, view) + } + + return res, nil + } +} + +func removeTwinEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(viewTwinReq) + + err := req.validate() + if err != nil { + return nil, err + } + + if err := svc.RemoveTwin(ctx, req.token, req.id); err != nil { + return nil, err + } + + return removeRes{}, nil + } +} + +func listStatesEndpoint(svc twins.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(listStatesReq) + + if err := req.validate(); err != nil { + return nil, err + } + + page, err := svc.ListStates(ctx, req.token, req.offset, req.limit, req.id) + if err != nil { + return nil, err + } + + res := statesPageRes{ + pageRes: pageRes{ + Total: page.Total, + Offset: page.Offset, + Limit: page.Limit, + }, + States: []viewStateRes{}, + } + for _, state := range page.States { + view := viewStateRes{ + TwinID: state.TwinID, + ID: state.ID, + Definition: state.Definition, + Created: state.Created, + Payload: state.Payload, + } + res.States = append(res.States, view) + } + + return res, nil + } +} diff --git a/twins/api/http/endpoint_test.go b/twins/api/http/endpoint_test.go new file mode 100644 index 00000000..57bc9e40 --- /dev/null +++ b/twins/api/http/endpoint_test.go @@ -0,0 +1,468 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http_test + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/mainflux/twins" + httpapi "github.com/mainflux/mainflux/twins/api/http" + "github.com/mainflux/mainflux/twins/mocks" + twmqtt "github.com/mainflux/mainflux/twins/mqtt" + nats "github.com/nats-io/go-nats" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + contentType = "application/json" + email = "user@example.com" + token = "token" + wrongValue = "wrong_value" + thingID = "5b68df78-86f7-48a6-ac4f-bb24dd75c39e" + wrongID = 0 + maxNameSize = 1024 + natsURL = "nats://localhost:4222" + topic = "topic" +) + +var invalidName = strings.Repeat("m", maxNameSize+1) + +type testRequest struct { + client *http.Client + method string + url string + contentType string + token string + body io.Reader +} + +func (tr testRequest) make() (*http.Response, error) { + req, err := http.NewRequest(tr.method, tr.url, tr.body) + if err != nil { + return nil, err + } + if tr.token != "" { + req.Header.Set("Authorization", tr.token) + } + if tr.contentType != "" { + req.Header.Set("Content-Type", tr.contentType) + } + return tr.client.Do(req) +} + +func newService(tokens map[string]string) twins.Service { + auth := mocks.NewAuthNServiceClient(tokens) + twinsRepo := mocks.NewTwinRepository() + statesRepo := mocks.NewStateRepository() + idp := mocks.NewIdentityProvider() + + nc, _ := nats.Connect(natsURL) + + opts := mqtt.NewClientOptions() + pc := mqtt.NewClient(opts) + + mc := twmqtt.New(pc, topic) + + return twins.New(nc, mc, auth, twinsRepo, statesRepo, idp) +} + +func newServer(svc twins.Service) *httptest.Server { + mux := httpapi.MakeHandler(mocktracer.New(), svc) + return httptest.NewServer(mux) +} + +func toJSON(data interface{}) string { + jsonData, _ := json.Marshal(data) + return string(jsonData) +} + +func TestAddTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + ts := newServer(svc) + defer ts.Close() + + // tw := twins.Twin{ThingID: thingID} + tw := twinReq{ThingID: thingID} + data := toJSON(tw) + + tw.Name = invalidName + invalidData := toJSON(tw) + + cases := []struct { + desc string + req string + contentType string + auth string + status int + location string + }{ + { + desc: "add valid twin", + req: data, + contentType: contentType, + auth: token, + status: http.StatusCreated, + location: "/twins/123e4567-e89b-12d3-a456-000000000001", + }, + { + desc: "add twin with empty JSON request", + req: "{}", + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + location: "", + }, + { + desc: "add twin with invalid auth token", + req: data, + contentType: contentType, + auth: wrongValue, + status: http.StatusForbidden, + location: "", + }, + { + desc: "add twin with empty auth token", + req: data, + contentType: contentType, + auth: "", + status: http.StatusForbidden, + location: "", + }, + { + desc: "add twin with invalid request format", + req: "}", + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + location: "", + }, + { + desc: "add twin with empty request", + req: "", + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + location: "", + }, + { + desc: "add twin without content type", + req: data, + contentType: "", + auth: token, + status: http.StatusUnsupportedMediaType, + location: "", + }, + { + desc: "add twin with invalid name", + req: invalidData, + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + location: "", + }, + } + + for _, tc := range cases { + req := testRequest{ + client: ts.Client(), + method: http.MethodPost, + url: fmt.Sprintf("%s/twins", ts.URL), + contentType: tc.contentType, + token: tc.auth, + body: strings.NewReader(tc.req), + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + + location := res.Header.Get("Location") + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) + assert.Equal(t, tc.location, location, fmt.Sprintf("%s: expected location %s got %s", tc.desc, tc.location, location)) + } +} + +func TestUpdateTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + ts := newServer(svc) + defer ts.Close() + + twin := twins.Twin{ThingID: thingID} + def := twins.Definition{} + data := toJSON(twin) + stw, _ := svc.AddTwin(context.Background(), token, twin, def) + + tw := twin + tw.Name = invalidName + invalidData := toJSON(tw) + + cases := []struct { + desc string + req string + id string + contentType string + auth string + status int + }{ + { + desc: "update existing twin", + req: data, + id: stw.ID, + contentType: contentType, + auth: token, + status: http.StatusOK, + }, + { + desc: "update twin with empty JSON request", + req: "{}", + id: stw.ID, + contentType: contentType, + auth: token, + status: http.StatusOK, + }, + { + desc: "update non-existent twin", + req: data, + id: strconv.FormatUint(wrongID, 10), + contentType: contentType, + auth: token, + status: http.StatusNotFound, + }, + { + desc: "update twin with invalid user token", + req: data, + id: stw.ID, + contentType: contentType, + auth: wrongValue, + status: http.StatusForbidden, + }, + { + desc: "update twin with empty user token", + req: data, + id: stw.ID, + contentType: contentType, + auth: "", + status: http.StatusForbidden, + }, + { + desc: "update twin with invalid data format", + req: "{", + id: stw.ID, + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + }, + { + desc: "update twin with empty request", + req: "", + id: stw.ID, + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + }, + { + desc: "update twin without content type", + req: data, + id: stw.ID, + contentType: "", + auth: token, + status: http.StatusUnsupportedMediaType, + }, + { + desc: "update twin with invalid name", + req: invalidData, + contentType: contentType, + auth: token, + status: http.StatusBadRequest, + }, + } + + for _, tc := range cases { + req := testRequest{ + client: ts.Client(), + method: http.MethodPut, + url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id), + contentType: tc.contentType, + token: tc.auth, + body: strings.NewReader(tc.req), + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) + } +} + +func TestViewTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + ts := newServer(svc) + defer ts.Close() + + def := twins.Definition{} + twin := twins.Twin{ThingID: thingID} + stw, err := svc.AddTwin(context.Background(), token, twin, def) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + twres := twinRes{ + Owner: stw.Owner, + Name: stw.Name, + ID: stw.ID, + ThingID: stw.ThingID, + Revision: stw.Revision, + Created: stw.Created, + Updated: stw.Updated, + Definitions: stw.Definitions, + Metadata: stw.Metadata, + } + data := toJSON(twres) + + cases := []struct { + desc string + id string + auth string + status int + res string + }{ + { + desc: "view existing twin", + id: stw.ID, + auth: token, + status: http.StatusOK, + res: data, + }, + { + desc: "view non-existent twin", + id: strconv.FormatUint(wrongID, 10), + auth: token, + status: http.StatusNotFound, + res: "", + }, + { + desc: "view twin by passing invalid token", + id: stw.ID, + auth: wrongValue, + status: http.StatusForbidden, + res: "", + }, + { + desc: "view twin by passing empty id", + id: "", + auth: token, + status: http.StatusBadRequest, + res: "", + }, + { + desc: "view twin by passing empty token", + id: stw.ID, + auth: "", + status: http.StatusForbidden, + res: "", + }, + } + + for _, tc := range cases { + req := testRequest{ + client: ts.Client(), + method: http.MethodGet, + url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id), + token: tc.auth, + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) + body, err := ioutil.ReadAll(res.Body) + data := strings.Trim(string(body), "\n") + assert.Equal(t, tc.res, data, fmt.Sprintf("%s: expected body %s got %s", tc.desc, tc.res, data)) + } +} + +func TestRemoveTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + ts := newServer(svc) + defer ts.Close() + + def := twins.Definition{} + twin := twins.Twin{ThingID: thingID} + stw, _ := svc.AddTwin(context.Background(), token, twin, def) + + cases := []struct { + desc string + id string + auth string + status int + }{ + { + desc: "delete existing twin", + id: stw.ID, + auth: token, + status: http.StatusNoContent, + }, + { + desc: "delete non-existent twin", + id: strconv.FormatUint(wrongID, 10), + auth: token, + status: http.StatusNoContent, + }, + { + desc: "delete twin by passing empty id", + id: "", + auth: token, + status: http.StatusBadRequest, + }, + { + desc: "delete twin with invalid token", + id: stw.ID, + auth: wrongValue, + status: http.StatusForbidden, + }, + { + desc: "delete twin with empty token", + id: stw.ID, + auth: "", + status: http.StatusForbidden, + }, + } + + for _, tc := range cases { + req := testRequest{ + client: ts.Client(), + method: http.MethodDelete, + url: fmt.Sprintf("%s/twins/%s", ts.URL, tc.id), + token: tc.auth, + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) + } +} + +type twinReq struct { + token string + Name string `json:"name,omitempty"` + ThingID string `json:"thing_id"` + Definition twins.Definition `json:"definition,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +type twinRes struct { + Owner string `json:"owner"` + Name string `json:"name,omitempty"` + ID string `json:"id"` + ThingID string `json:"thing_id"` + Revision int `json:"revision"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` + Definitions []twins.Definition `json:"definitions"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} diff --git a/twins/api/http/requests.go b/twins/api/http/requests.go new file mode 100644 index 00000000..ccbd7bcc --- /dev/null +++ b/twins/api/http/requests.go @@ -0,0 +1,128 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "github.com/mainflux/mainflux/twins" +) + +const maxNameSize = 1024 +const maxLimitSize = 100 + +type apiReq interface { + validate() error +} + +type addTwinReq struct { + token string + Name string `json:"name,omitempty"` + ThingID string `json:"thing_id"` + Definition twins.Definition `json:"definition,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func (req addTwinReq) validate() error { + if req.token == "" { + return twins.ErrUnauthorizedAccess + } + + if req.ThingID == "" { + return twins.ErrMalformedEntity + } + + if len(req.Name) > maxNameSize { + return twins.ErrMalformedEntity + } + + return nil +} + +type updateTwinReq struct { + token string + id string + Name string `json:"name,omitempty"` + ThingID string `json:"thing_id,omitempty"` + Definition twins.Definition `json:"definition,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func (req updateTwinReq) validate() error { + if req.token == "" { + return twins.ErrUnauthorizedAccess + } + + if req.id == "" { + return twins.ErrMalformedEntity + } + + if len(req.Name) > maxNameSize { + return twins.ErrMalformedEntity + } + + return nil +} + +type viewTwinReq struct { + token string + id string +} + +func (req viewTwinReq) validate() error { + if req.token == "" { + return twins.ErrUnauthorizedAccess + } + + if req.id == "" { + return twins.ErrMalformedEntity + } + + return nil +} + +type listReq struct { + token string + offset uint64 + limit uint64 + name string + metadata map[string]interface{} +} + +func (req *listReq) validate() error { + if req.token == "" { + return twins.ErrUnauthorizedAccess + } + + if req.limit == 0 || req.limit > maxLimitSize { + return twins.ErrMalformedEntity + } + + if len(req.name) > maxNameSize { + return twins.ErrMalformedEntity + } + + return nil +} + +type listStatesReq struct { + token string + offset uint64 + limit uint64 + id string +} + +func (req *listStatesReq) validate() error { + if req.token == "" { + return twins.ErrUnauthorizedAccess + } + + if req.id == "" { + return twins.ErrMalformedEntity + } + + if req.limit == 0 || req.limit > maxLimitSize { + return twins.ErrMalformedEntity + } + + return nil +} diff --git a/twins/api/http/responses.go b/twins/api/http/responses.go new file mode 100644 index 00000000..a340f0fc --- /dev/null +++ b/twins/api/http/responses.go @@ -0,0 +1,147 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "fmt" + "net/http" + "time" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/twins" +) + +var ( + _ mainflux.Response = (*twinRes)(nil) + _ mainflux.Response = (*viewTwinRes)(nil) + _ mainflux.Response = (*viewStateRes)(nil) + _ mainflux.Response = (*twinsPageRes)(nil) + _ mainflux.Response = (*statesPageRes)(nil) + _ mainflux.Response = (*removeRes)(nil) +) + +type twinRes struct { + id string + created bool +} + +func (res twinRes) Code() int { + if res.created { + return http.StatusCreated + } + + return http.StatusOK +} + +func (res twinRes) Headers() map[string]string { + if res.created { + return map[string]string{ + "Location": fmt.Sprintf("/twins/%s", res.id), + } + } + + return map[string]string{} +} + +func (res twinRes) Empty() bool { + return true +} + +type viewTwinRes struct { + Owner string `json:"owner,omitempty"` + ID string `json:"id"` + ThingID string `json:"thing_id"` + Name string `json:"name,omitempty"` + Revision int `json:"revision"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` + Definitions []twins.Definition `json:"definitions,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func (res viewTwinRes) Code() int { + return http.StatusOK +} + +func (res viewTwinRes) Headers() map[string]string { + return map[string]string{} +} + +func (res viewTwinRes) Empty() bool { + return false +} + +type viewStateRes struct { + TwinID string `json:"twin_id"` + ID int64 `json:"id"` + Definition int `json:"definition"` + Created time.Time `json:"created"` + Payload map[string]interface{} `json:"payload"` +} + +func (res viewStateRes) Code() int { + return http.StatusOK +} + +func (res viewStateRes) Headers() map[string]string { + return map[string]string{} +} + +func (res viewStateRes) Empty() bool { + return false +} + +type pageRes struct { + Total uint64 `json:"total"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` +} + +type twinsPageRes struct { + pageRes + Twins []viewTwinRes `json:"twins"` +} + +func (res twinsPageRes) Code() int { + return http.StatusOK +} + +func (res twinsPageRes) Headers() map[string]string { + return map[string]string{} +} + +func (res twinsPageRes) Empty() bool { + return false +} + +type statesPageRes struct { + pageRes + States []viewStateRes `json:"states"` +} + +func (res statesPageRes) Code() int { + return http.StatusOK +} + +func (res statesPageRes) Headers() map[string]string { + return map[string]string{} +} + +func (res statesPageRes) Empty() bool { + return false +} + +type removeRes struct{} + +func (res removeRes) Code() int { + return http.StatusNoContent +} + +func (res removeRes) Headers() map[string]string { + return map[string]string{} +} + +func (res removeRes) Empty() bool { + return true +} diff --git a/twins/api/http/transport.go b/twins/api/http/transport.go new file mode 100644 index 00000000..cb81d58d --- /dev/null +++ b/twins/api/http/transport.go @@ -0,0 +1,303 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "strconv" + "strings" + + kitot "github.com/go-kit/kit/tracing/opentracing" + kithttp "github.com/go-kit/kit/transport/http" + "github.com/go-zoo/bone" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/twins" + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + contentType = "application/json" + + offset = "offset" + limit = "limit" + name = "name" + metadata = "metadata" + + defLimit = 10 + defOffset = 0 +) + +var ( + errUnsupportedContentType = errors.New("unsupported content type") + errInvalidQueryParams = errors.New("invalid query params") +) + +// MakeHandler returns a HTTP handler for API endpoints. +func MakeHandler(tracer opentracing.Tracer, svc twins.Service) http.Handler { + opts := []kithttp.ServerOption{ + kithttp.ServerErrorEncoder(encodeError), + } + + r := bone.New() + + r.Post("/twins", kithttp.NewServer( + kitot.TraceServer(tracer, "add_twin")(addTwinEndpoint(svc)), + decodeTwinCreation, + encodeResponse, + opts..., + )) + + r.Put("/twins/:id", kithttp.NewServer( + kitot.TraceServer(tracer, "update_twin")(updateTwinEndpoint(svc)), + decodeTwinUpdate, + encodeResponse, + opts..., + )) + + r.Get("/twins/:id", kithttp.NewServer( + kitot.TraceServer(tracer, "view_twin")(viewTwinEndpoint(svc)), + decodeView, + encodeResponse, + opts..., + )) + + r.Delete("/twins/:id", kithttp.NewServer( + kitot.TraceServer(tracer, "remove_twin")(removeTwinEndpoint(svc)), + decodeView, + encodeResponse, + opts..., + )) + + r.Get("/twins", kithttp.NewServer( + kitot.TraceServer(tracer, "list_twins")(listTwinsEndpoint(svc)), + decodeList, + encodeResponse, + opts..., + )) + + r.Get("/things/:id", kithttp.NewServer( + kitot.TraceServer(tracer, "view_twin_by_thing")(viewTwinByThingEndpoint(svc)), + decodeViewTwinByThing, + encodeResponse, + opts..., + )) + + r.Get("/states/:id", kithttp.NewServer( + kitot.TraceServer(tracer, "list_states")(listStatesEndpoint(svc)), + decodeListStates, + encodeResponse, + opts..., + )) + + r.GetFunc("/version", mainflux.Version("twins")) + r.Handle("/metrics", promhttp.Handler()) + + return r +} + +func decodeTwinCreation(_ context.Context, r *http.Request) (interface{}, error) { + if !strings.Contains(r.Header.Get("Content-Type"), contentType) { + return nil, errUnsupportedContentType + } + + req := addTwinReq{token: r.Header.Get("Authorization")} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + + return req, nil +} + +func decodeTwinUpdate(_ context.Context, r *http.Request) (interface{}, error) { + if !strings.Contains(r.Header.Get("Content-Type"), contentType) { + return nil, errUnsupportedContentType + } + + req := updateTwinReq{ + token: r.Header.Get("Authorization"), + id: bone.GetValue(r, "id"), + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + + return req, nil +} + +func decodeView(_ context.Context, r *http.Request) (interface{}, error) { + req := viewTwinReq{ + token: r.Header.Get("Authorization"), + id: bone.GetValue(r, "id"), + } + + return req, nil +} + +func decodeList(_ context.Context, r *http.Request) (interface{}, error) { + l, err := readUintQuery(r, limit, defLimit) + if err != nil { + return nil, err + } + + o, err := readUintQuery(r, offset, defOffset) + if err != nil { + return nil, err + } + + n, err := readStringQuery(r, name) + if err != nil { + return nil, err + } + + m, err := readMetadataQuery(r, "metadata") + if err != nil { + return nil, err + } + + req := listReq{ + token: r.Header.Get("Authorization"), + limit: l, + offset: o, + name: n, + metadata: m, + } + + return req, nil +} + +func decodeListStates(_ context.Context, r *http.Request) (interface{}, error) { + l, err := readUintQuery(r, limit, defLimit) + if err != nil { + return nil, err + } + + o, err := readUintQuery(r, offset, defOffset) + if err != nil { + return nil, err + } + + req := listStatesReq{ + token: r.Header.Get("Authorization"), + limit: l, + offset: o, + id: bone.GetValue(r, "id"), + } + + return req, nil +} + +func decodeViewTwinByThing(_ context.Context, r *http.Request) (interface{}, error) { + req := viewTwinReq{ + token: r.Header.Get("Authorization"), + id: bone.GetValue(r, "id"), + } + + return req, nil +} + +func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { + w.Header().Set("Content-Type", contentType) + + if ar, ok := response.(mainflux.Response); ok { + for k, v := range ar.Headers() { + w.Header().Set(k, v) + } + + w.WriteHeader(ar.Code()) + + if ar.Empty() { + return nil + } + } + + return json.NewEncoder(w).Encode(response) +} + +func encodeError(_ context.Context, err error, w http.ResponseWriter) { + w.Header().Set("Content-Type", contentType) + + switch err { + case twins.ErrMalformedEntity: + w.WriteHeader(http.StatusBadRequest) + case twins.ErrUnauthorizedAccess: + w.WriteHeader(http.StatusForbidden) + case twins.ErrNotFound: + w.WriteHeader(http.StatusNotFound) + case twins.ErrConflict: + w.WriteHeader(http.StatusUnprocessableEntity) + case errUnsupportedContentType: + w.WriteHeader(http.StatusUnsupportedMediaType) + case errInvalidQueryParams: + w.WriteHeader(http.StatusBadRequest) + case io.ErrUnexpectedEOF: + w.WriteHeader(http.StatusBadRequest) + case io.EOF: + w.WriteHeader(http.StatusBadRequest) + default: + switch err.(type) { + case *json.SyntaxError: + w.WriteHeader(http.StatusBadRequest) + case *json.UnmarshalTypeError: + w.WriteHeader(http.StatusBadRequest) + default: + w.WriteHeader(http.StatusInternalServerError) + } + } +} + +func readUintQuery(r *http.Request, key string, def uint64) (uint64, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return 0, errInvalidQueryParams + } + + if len(vals) == 0 { + return def, nil + } + + strval := vals[0] + val, err := strconv.ParseUint(strval, 10, 64) + if err != nil { + return 0, errInvalidQueryParams + } + + return val, nil +} + +func readStringQuery(r *http.Request, key string) (string, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return "", errInvalidQueryParams + } + + if len(vals) == 0 { + return "", nil + } + + return vals[0], nil +} + +func readMetadataQuery(r *http.Request, key string) (map[string]interface{}, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return nil, errInvalidQueryParams + } + + if len(vals) == 0 { + return nil, nil + } + + m := make(map[string]interface{}) + err := json.Unmarshal([]byte(vals[0]), &m) + if err != nil { + return nil, err + } + + return m, nil +} diff --git a/twins/api/logging.go b/twins/api/logging.go new file mode 100644 index 00000000..45b8c762 --- /dev/null +++ b/twins/api/logging.go @@ -0,0 +1,132 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// +build !test + +package api + +import ( + "context" + "fmt" + "time" + + "github.com/mainflux/mainflux" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/twins" +) + +var _ twins.Service = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger log.Logger + svc twins.Service +} + +// LoggingMiddleware adds logging facilities to the core service. +func LoggingMiddleware(svc twins.Service, logger log.Logger) twins.Service { + return &loggingMiddleware{logger, svc} +} + +func (lm *loggingMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (saved twins.Twin, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method add_twin for token %s and twin %s took %s to complete", token, twin.ID, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.AddTwin(ctx, token, twin, def) +} + +func (lm *loggingMiddleware) UpdateTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method update_twin for token %s and twin %s took %s to complete", token, twin.ID, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.UpdateTwin(ctx, token, twin, def) +} + +func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method view_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.ViewTwin(ctx, token, id) +} + +func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.TwinsPage, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method list_twins for token %s took %s to complete", token, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.ListTwins(ctx, token, offset, limit, name, metadata) +} + +func (lm *loggingMiddleware) SaveState(msg *mainflux.Message) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method save_state took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.SaveState(msg) +} + +func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method list_states for token %s took %s to complete", token, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.ListStates(ctx, token, offset, limit, id) +} + +func (lm *loggingMiddleware) ViewTwinByThing(ctx context.Context, token, thingid string) (tw twins.Twin, err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method view_twin_by_thing for token %s and thing %s took %s to complete", token, thingid, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.ViewTwinByThing(ctx, token, thingid) +} + +func (lm *loggingMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method remove_twin for token %s and twin %s took %s to complete", token, id, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.RemoveTwin(ctx, token, id) +} diff --git a/twins/api/metrics.go b/twins/api/metrics.go new file mode 100644 index 00000000..4bb51521 --- /dev/null +++ b/twins/api/metrics.go @@ -0,0 +1,105 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// +build !test + +package api + +import ( + "context" + "time" + + "github.com/go-kit/kit/metrics" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/twins" +) + +var _ twins.Service = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + svc twins.Service +} + +// MetricsMiddleware instruments core service by tracking request count and +// latency. +func MetricsMiddleware(svc twins.Service, counter metrics.Counter, latency metrics.Histogram) twins.Service { + return &metricsMiddleware{ + counter: counter, + latency: latency, + svc: svc, + } +} + +func (ms *metricsMiddleware) AddTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (saved twins.Twin, err error) { + defer func(begin time.Time) { + ms.counter.With("method", "add_twin").Add(1) + ms.latency.With("method", "add_twin").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.AddTwin(ctx, token, twin, def) +} + +func (ms *metricsMiddleware) UpdateTwin(ctx context.Context, token string, twin twins.Twin, def twins.Definition) (err error) { + defer func(begin time.Time) { + ms.counter.With("method", "update_twin").Add(1) + ms.latency.With("method", "update_twin").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.UpdateTwin(ctx, token, twin, def) +} + +func (ms *metricsMiddleware) ViewTwin(ctx context.Context, token, id string) (viewed twins.Twin, err error) { + defer func(begin time.Time) { + ms.counter.With("method", "view_twin").Add(1) + ms.latency.With("method", "view_twin").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.ViewTwin(ctx, token, id) +} + +func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (tw twins.TwinsPage, err error) { + defer func(begin time.Time) { + ms.counter.With("method", "list_twins").Add(1) + ms.latency.With("method", "list_twins").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.ListTwins(ctx, token, offset, limit, name, metadata) +} + +func (ms *metricsMiddleware) SaveState(msg *mainflux.Message) error { + defer func(begin time.Time) { + ms.counter.With("method", "save_state").Add(1) + ms.latency.With("method", "save_state").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.SaveState(msg) +} + +func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (st twins.StatesPage, err error) { + defer func(begin time.Time) { + ms.counter.With("method", "list_states").Add(1) + ms.latency.With("method", "list_states").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.ListStates(ctx, token, offset, limit, id) +} + +func (ms *metricsMiddleware) ViewTwinByThing(ctx context.Context, token, thingid string) (twins.Twin, error) { + defer func(begin time.Time) { + ms.counter.With("method", "view_twin_by_thing").Add(1) + ms.latency.With("method", "view_twin_by_thing").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.ViewTwinByThing(ctx, token, thingid) +} + +func (ms *metricsMiddleware) RemoveTwin(ctx context.Context, token, id string) (err error) { + defer func(begin time.Time) { + ms.counter.With("method", "remove_twin").Add(1) + ms.latency.With("method", "remove_twin").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return ms.svc.RemoveTwin(ctx, token, id) +} diff --git a/twins/doc.go b/twins/doc.go new file mode 100644 index 00000000..2466db8c --- /dev/null +++ b/twins/doc.go @@ -0,0 +1,11 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package twins contains the domain concept definitions needed to support +// Mainflux twins service functionality. Twin is a semantic representation +// of a real world entity, be it device, application or something else. +// It holds the sequence of attribute based definitions of a real world +// thing and refers to the time series of definition based states that +// hold the historical data about the represented real world thing. + +package twins diff --git a/twins/idp.go b/twins/idp.go new file mode 100644 index 00000000..9770ba43 --- /dev/null +++ b/twins/idp.go @@ -0,0 +1,13 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package twins + +// IdentityProvider specifies an API for generating unique identifiers. +type IdentityProvider interface { + // ID generates the unique identifier. + ID() (string, error) + + // IsValid checks whether string is a valid uuid4. + IsValid(u4 string) error +} diff --git a/twins/mocks/authn.go b/twins/mocks/authn.go new file mode 100644 index 00000000..6cd1e77c --- /dev/null +++ b/twins/mocks/authn.go @@ -0,0 +1,34 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import ( + "context" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/users" + "google.golang.org/grpc" +) + +var _ mainflux.AuthNServiceClient = (*authNServiceClient)(nil) + +type authNServiceClient struct { + users map[string]string +} + +// NewAuthNServiceClient creates mock of auth service. +func NewAuthNServiceClient(users map[string]string) mainflux.AuthNServiceClient { + return &authNServiceClient{users} +} + +func (svc authNServiceClient) Identify(ctx context.Context, in *mainflux.Token, opts ...grpc.CallOption) (*mainflux.UserID, error) { + if id, ok := svc.users[in.Value]; ok { + return &mainflux.UserID{Value: id}, nil + } + return nil, users.ErrUnauthorizedAccess +} + +func (c *authNServiceClient) Issue(ctx context.Context, in *mainflux.IssueReq, opts ...grpc.CallOption) (*mainflux.Token, error) { + return new(mainflux.Token), nil +} diff --git a/twins/mocks/commons.go b/twins/mocks/commons.go new file mode 100644 index 00000000..532ddc81 --- /dev/null +++ b/twins/mocks/commons.go @@ -0,0 +1,13 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import "fmt" + +// Since mocks will store data in map, and they need to resemble the real +// identifiers as much as possible, a key will be created as combination of +// owner and their id. This will allow searching by prefix or suffix. +func key(owner string, id string) string { + return fmt.Sprintf("%s-%s", owner, id) +} diff --git a/twins/mocks/idp.go b/twins/mocks/idp.go new file mode 100644 index 00000000..a759bf5f --- /dev/null +++ b/twins/mocks/idp.go @@ -0,0 +1,43 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import ( + "fmt" + "strings" + "sync" + + "github.com/mainflux/mainflux/twins" +) + +const u4Pref = "123e4567-e89b-12d3-a456-" + +var _ twins.IdentityProvider = (*identityProviderMock)(nil) + +type identityProviderMock struct { + mu sync.Mutex + counter int +} + +func (idp *identityProviderMock) ID() (string, error) { + idp.mu.Lock() + defer idp.mu.Unlock() + + idp.counter++ + return fmt.Sprintf("%s%012d", u4Pref, idp.counter), nil +} + +func (idp *identityProviderMock) IsValid(u4 string) error { + if !strings.Contains(u4Pref, u4) { + return twins.ErrMalformedEntity + } + + return nil +} + +// NewIdentityProvider creates "mirror" identity provider, i.e. generated +// token will hold value provided by the caller. +func NewIdentityProvider() twins.IdentityProvider { + return &identityProviderMock{} +} diff --git a/twins/mocks/states.go b/twins/mocks/states.go new file mode 100644 index 00000000..5fabc7f3 --- /dev/null +++ b/twins/mocks/states.go @@ -0,0 +1,101 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + + "github.com/mainflux/mainflux/twins" +) + +var _ twins.StateRepository = (*stateRepositoryMock)(nil) + +type stateRepositoryMock struct { + mu sync.Mutex + counter uint64 + states map[string]twins.State +} + +// NewStateRepository creates in-memory twin repository. +func NewStateRepository() twins.StateRepository { + return &stateRepositoryMock{ + states: make(map[string]twins.State), + } +} + +// SaveState persists the state +func (srm *stateRepositoryMock) Save(ctx context.Context, st twins.State) error { + srm.mu.Lock() + defer srm.mu.Unlock() + + srm.states[key(st.TwinID, string(st.ID))] = st + + return nil +} + +// CountStates returns the number of states related to twin +func (srm *stateRepositoryMock) Count(ctx context.Context, tw twins.Twin) (int64, error) { + return int64(len(srm.states)), nil +} + +func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (twins.StatesPage, error) { + srm.mu.Lock() + defer srm.mu.Unlock() + + items := make([]twins.State, 0) + + if limit <= 0 { + return twins.StatesPage{}, nil + } + + // This obscure way to examine map keys is enforced by the key structure in mocks/commons.go + prefix := fmt.Sprintf("%s-", id) + for k, v := range srm.states { + if !strings.HasPrefix(k, prefix) { + continue + } + id := uint64(v.ID) + if id > offset && id < limit { + items = append(items, v) + } + if (uint64)(len(items)) >= limit { + break + } + } + + sort.SliceStable(items, func(i, j int) bool { + return items[i].ID < items[j].ID + }) + + page := twins.StatesPage{ + States: items, + PageMetadata: twins.PageMetadata{ + Total: srm.counter, + Offset: offset, + Limit: limit, + }, + } + + return page, nil +} + +// RetrieveLast returns the last state related to twin spec by id +func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, id string) (twins.State, error) { + srm.mu.Lock() + defer srm.mu.Unlock() + + items := make([]twins.State, 0) + for _, v := range srm.states { + items = append(items, v) + } + sort.SliceStable(items, func(i, j int) bool { + return items[i].ID < items[j].ID + }) + + return items[len(items)-1], nil +} diff --git a/twins/mocks/twins.go b/twins/mocks/twins.go new file mode 100644 index 00000000..a6b339e4 --- /dev/null +++ b/twins/mocks/twins.go @@ -0,0 +1,141 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "sync" + + "github.com/mainflux/mainflux/twins" +) + +var _ twins.TwinRepository = (*twinRepositoryMock)(nil) + +type twinRepositoryMock struct { + mu sync.Mutex + counter uint64 + twins map[string]twins.Twin +} + +// NewTwinRepository creates in-memory twin repository. +func NewTwinRepository() twins.TwinRepository { + return &twinRepositoryMock{ + twins: make(map[string]twins.Twin), + } +} + +func (trm *twinRepositoryMock) Save(ctx context.Context, twin twins.Twin) (string, error) { + trm.mu.Lock() + defer trm.mu.Unlock() + + for _, tw := range trm.twins { + if tw.ID == twin.ID { + return "", twins.ErrConflict + } + } + + trm.twins[key(twin.Owner, twin.ID)] = twin + + return twin.ID, nil +} + +func (trm *twinRepositoryMock) Update(ctx context.Context, twin twins.Twin) error { + trm.mu.Lock() + defer trm.mu.Unlock() + + dbKey := key(twin.Owner, twin.ID) + if _, ok := trm.twins[dbKey]; !ok { + return twins.ErrNotFound + } + + trm.twins[dbKey] = twin + + return nil +} + +func (trm *twinRepositoryMock) RetrieveByID(_ context.Context, id string) (twins.Twin, error) { + trm.mu.Lock() + defer trm.mu.Unlock() + + for k, v := range trm.twins { + if id == v.ID { + return trm.twins[k], nil + } + } + + return twins.Twin{}, twins.ErrNotFound +} + +func (trm *twinRepositoryMock) RetrieveByThing(_ context.Context, thingid string) (twins.Twin, error) { + trm.mu.Lock() + defer trm.mu.Unlock() + + for _, twin := range trm.twins { + if twin.ThingID == thingid { + return twin, nil + } + } + + return twins.Twin{}, twins.ErrNotFound + +} + +func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) { + trm.mu.Lock() + defer trm.mu.Unlock() + + items := make([]twins.Twin, 0) + + if limit <= 0 { + return twins.TwinsPage{}, nil + } + + // This obscure way to examine map keys is enforced by the key structure in mocks/commons.go + prefix := fmt.Sprintf("%s-", owner) + for k, v := range trm.twins { + if (uint64)(len(items)) >= limit { + break + } + if !strings.HasPrefix(k, prefix) { + continue + } + suffix := string(v.ID[len(u4Pref):]) + id, _ := strconv.ParseUint(suffix, 10, 64) + if id > offset && id <= uint64(offset+limit) { + items = append(items, v) + } + } + + sort.SliceStable(items, func(i, j int) bool { + return items[i].ID < items[j].ID + }) + + page := twins.TwinsPage{ + Twins: items, + PageMetadata: twins.PageMetadata{ + Total: trm.counter, + Offset: offset, + Limit: limit, + }, + } + + return page, nil +} + +func (trm *twinRepositoryMock) Remove(ctx context.Context, id string) error { + trm.mu.Lock() + defer trm.mu.Unlock() + + for k, v := range trm.twins { + if id == v.ID { + delete(trm.twins, k) + } + } + + return nil +} diff --git a/twins/mongodb/doc.go b/twins/mongodb/doc.go new file mode 100644 index 00000000..616fd6fc --- /dev/null +++ b/twins/mongodb/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package mongodb contains repository implementations using MongoDB as +// the underlying database. +package mongodb diff --git a/twins/mongodb/init.go b/twins/mongodb/init.go new file mode 100644 index 00000000..420967bf --- /dev/null +++ b/twins/mongodb/init.go @@ -0,0 +1,33 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb + +import ( + "context" + "fmt" + + "github.com/mainflux/mainflux/logger" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// Config defines the options that are used when connecting to a MongoDB instance +type Config struct { + Host string + Port string + Name string +} + +// Connect creates a connection to the MongoDB instance +func Connect(cfg Config, logger logger.Logger) (*mongo.Database, error) { + addr := fmt.Sprintf("mongodb://%s:%s", cfg.Host, cfg.Port) + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to database: %s", err)) + return nil, err + } + + db := client.Database(cfg.Name) + return db, nil +} diff --git a/twins/mongodb/setup_test.go b/twins/mongodb/setup_test.go new file mode 100644 index 00000000..85426d2d --- /dev/null +++ b/twins/mongodb/setup_test.go @@ -0,0 +1,55 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb_test + +import ( + "context" + "fmt" + "os" + "testing" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + dockertest "gopkg.in/ory-am/dockertest.v3" +) + +const ( + wrongID = "0" + wrongValue = "wrong-value" +) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err)) + } + + cfg := []string{ + "MONGO_INITDB_DATABASE=test", + } + + container, err := pool.Run("mongo", "3.6-jessie", cfg) + if err != nil { + testLog.Error(fmt.Sprintf("Could not start container: %s", err)) + } + + port = container.GetPort("27017/tcp") + addr = fmt.Sprintf("mongodb://localhost:%s", port) + + if err := pool.Retry(func() error { + _, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + return err + }); err != nil { + testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err)) + } + + code := m.Run() + + if err := pool.Purge(container); err != nil { + testLog.Error(fmt.Sprintf("Could not purge container: %s", err)) + } + + os.Exit(code) +} diff --git a/twins/mongodb/states.go b/twins/mongodb/states.go new file mode 100644 index 00000000..b7b1bf7d --- /dev/null +++ b/twins/mongodb/states.go @@ -0,0 +1,140 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb + +import ( + "context" + + "github.com/mainflux/mainflux/twins" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const statesCollection string = "states" + +type stateRepository struct { + db *mongo.Database +} + +var _ twins.StateRepository = (*stateRepository)(nil) + +// NewStateRepository instantiates a MongoDB implementation of state +// repository. +func NewStateRepository(db *mongo.Database) twins.StateRepository { + return &stateRepository{ + db: db, + } +} + +// SaveState persists the state +func (sr *stateRepository) Save(ctx context.Context, st twins.State) error { + coll := sr.db.Collection(statesCollection) + + if _, err := coll.InsertOne(context.Background(), st); err != nil { + return err + } + + return nil +} + +// CountStates returns the number of states related to twin +func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, error) { + coll := sr.db.Collection(statesCollection) + + filter := bson.D{{"twinid", tw.ID}} + total, err := coll.CountDocuments(ctx, filter) + if err != nil { + return 0, err + } + + return total, nil +} + +// RetrieveAll retrieves the subset of states related to twin specified by id +func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (twins.StatesPage, error) { + coll := sr.db.Collection(statesCollection) + + findOptions := options.Find() + findOptions.SetSkip(int64(offset)) + findOptions.SetLimit(int64(limit)) + + filter := bson.D{{"twinid", id}} + + cur, err := coll.Find(ctx, filter, findOptions) + if err != nil { + return twins.StatesPage{}, err + } + + results, err := decodeStates(ctx, cur) + if err != nil { + return twins.StatesPage{}, err + } + + total, err := coll.CountDocuments(ctx, filter) + if err != nil { + return twins.StatesPage{}, err + } + + return twins.StatesPage{ + States: results, + PageMetadata: twins.PageMetadata{ + Total: uint64(total), + Offset: offset, + Limit: limit, + }, + }, nil +} + +// RetrieveLast returns the last state related to twin spec by id +func (sr *stateRepository) RetrieveLast(ctx context.Context, id string) (twins.State, error) { + coll := sr.db.Collection(statesCollection) + + filter := bson.D{{"twinid", id}} + total, err := coll.CountDocuments(ctx, filter) + if err != nil { + return twins.State{}, err + } + + findOptions := options.Find() + var skip int64 + if total > 0 { + skip = total - 1 + } + findOptions.SetSkip(skip) + findOptions.SetLimit(1) + + cur, err := coll.Find(ctx, filter, findOptions) + if err != nil { + return twins.State{}, err + } + + results, err := decodeStates(ctx, cur) + if err != nil { + return twins.State{}, err + } + + if len(results) < 1 { + return twins.State{}, nil + } + return results[0], nil +} + +func decodeStates(ctx context.Context, cur *mongo.Cursor) ([]twins.State, error) { + defer cur.Close(ctx) + + var results []twins.State + for cur.Next(ctx) { + var elem twins.State + if err := cur.Decode(&elem); err != nil { + return []twins.State{}, nil + } + results = append(results, elem) + } + + if err := cur.Err(); err != nil { + return []twins.State{}, nil + } + return results, nil +} diff --git a/twins/mongodb/states_test.go b/twins/mongodb/states_test.go new file mode 100644 index 00000000..a8d22744 --- /dev/null +++ b/twins/mongodb/states_test.go @@ -0,0 +1,165 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/mongodb" + "github.com/mainflux/mainflux/twins/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func TestStateSave(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewStateRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + var id int64 + state := twins.State{ + TwinID: twid, + ID: id, + Created: time.Now(), + } + + cases := []struct { + desc string + state twins.State + err error + }{ + { + desc: "save state", + state: state, + err: nil, + }, + } + + for _, tc := range cases { + err := repo.Save(context.Background(), tc.state) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestStatesRetrieveAll(t *testing.T) { + idp := uuid.New() + + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + db.Collection("states").DeleteMany(context.Background(), bson.D{}) + + repo := mongodb.NewStateRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + n := uint64(10) + for i := uint64(0); i < n; i++ { + st := twins.State{ + TwinID: twid, + ID: int64(i), + Created: time.Now(), + } + + repo.Save(context.Background(), st) + } + + cases := map[string]struct { + twid string + limit uint64 + offset uint64 + size uint64 + total uint64 + }{ + "retrieve all states with existing twin": { + twid: twid, + offset: 0, + limit: n, + size: n, + total: n, + }, + "retrieve subset of states with existing twin": { + twid: twid, + offset: 0, + limit: n / 2, + size: n / 2, + total: n, + }, + "retrieve states with non-existing twin": { + twid: wrongValue, + offset: 0, + limit: n, + size: 0, + total: 0, + }, + } + + for desc, tc := range cases { + page, err := repo.RetrieveAll(context.Background(), tc.offset, tc.limit, tc.twid) + size := uint64(len(page.States)) + assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size)) + assert.Equal(t, tc.total, page.Total, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.total, page.Total)) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err)) + } +} + +func TestStatesRetrieveLast(t *testing.T) { + idp := uuid.New() + + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + db.Collection("states").DeleteMany(context.Background(), bson.D{}) + + repo := mongodb.NewStateRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + n := int64(10) + for i := int64(1); i <= n; i++ { + st := twins.State{ + TwinID: twid, + ID: i, + Created: time.Now(), + } + + repo.Save(context.Background(), st) + } + + cases := map[string]struct { + twid string + id int64 + }{ + "retrieve last state with existing twin": { + twid: twid, + id: n, + }, + "retrieve states with non-existing owner": { + twid: wrongValue, + id: 0, + }, + } + + for desc, tc := range cases { + state, err := repo.RetrieveLast(context.Background(), tc.twid) + assert.Equal(t, tc.id, state.ID, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.id, state.ID)) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err)) + } +} diff --git a/twins/mongodb/twins.go b/twins/mongodb/twins.go new file mode 100644 index 00000000..4226f3f6 --- /dev/null +++ b/twins/mongodb/twins.go @@ -0,0 +1,199 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb + +import ( + "context" + + "github.com/mainflux/mainflux/twins" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const ( + maxNameSize = 1024 + twinsCollection string = "twins" +) + +type twinRepository struct { + db *mongo.Database +} + +var _ twins.TwinRepository = (*twinRepository)(nil) + +// NewTwinRepository instantiates a MongoDB implementation of twin repository. +func NewTwinRepository(db *mongo.Database) twins.TwinRepository { + return &twinRepository{ + db: db, + } +} + +func (tr *twinRepository) Save(ctx context.Context, tw twins.Twin) (string, error) { + if len(tw.Name) > maxNameSize { + return "", twins.ErrMalformedEntity + } + + coll := tr.db.Collection(twinsCollection) + + if _, err := coll.InsertOne(context.Background(), tw); err != nil { + return "", err + } + + return tw.ID, nil +} + +func (tr *twinRepository) Update(ctx context.Context, tw twins.Twin) error { + if len(tw.Name) > maxNameSize { + return twins.ErrMalformedEntity + } + + coll := tr.db.Collection(twinsCollection) + + filter := bson.D{{"id", tw.ID}} + update := bson.D{{"$set", tw}} + res, err := coll.UpdateOne(context.Background(), filter, update) + if err != nil { + return err + } + + if res.ModifiedCount < 1 { + return twins.ErrNotFound + } + + return nil +} + +func (tr *twinRepository) RetrieveByID(_ context.Context, id string) (twins.Twin, error) { + coll := tr.db.Collection(twinsCollection) + var tw twins.Twin + + filter := bson.D{{"id", id}} + if err := coll.FindOne(context.Background(), filter).Decode(&tw); err != nil { + return tw, twins.ErrNotFound + } + + return tw, nil +} + +func (tr *twinRepository) RetrieveByThing(ctx context.Context, thingid string) (twins.Twin, error) { + coll := tr.db.Collection(twinsCollection) + tw := twins.Twin{} + filter := bson.D{{"thingid", thingid}} + if err := coll.FindOne(context.Background(), filter).Decode(&tw); err != nil { + return tw, twins.ErrNotFound + } + + return tw, nil +} + +func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.TwinsPage, error) { + coll := tr.db.Collection(twinsCollection) + + findOptions := options.Find() + findOptions.SetSkip(int64(offset)) + findOptions.SetLimit(int64(limit)) + + filter := bson.D{} + + if owner != "" { + filter = append(filter, bson.E{"owner", owner}) + } + if name != "" { + filter = append(filter, bson.E{"name", name}) + } + if len(metadata) > 0 { + filter = append(filter, bson.E{"metadata", metadata}) + } + cur, err := coll.Find(ctx, filter, findOptions) + if err != nil { + return twins.TwinsPage{}, err + } + + results, err := decodeTwins(ctx, cur) + if err != nil { + return twins.TwinsPage{}, err + } + + total, err := coll.CountDocuments(ctx, filter) + if err != nil { + return twins.TwinsPage{}, err + } + + return twins.TwinsPage{ + Twins: results, + PageMetadata: twins.PageMetadata{ + Total: uint64(total), + Offset: offset, + Limit: limit, + }, + }, nil +} + +func (tr *twinRepository) RetrieveAllByThing(ctx context.Context, thingid string, offset uint64, limit uint64) (twins.TwinsPage, error) { + coll := tr.db.Collection(twinsCollection) + + findOptions := options.Find() + findOptions.SetSkip(int64(offset)) + findOptions.SetLimit(int64(limit)) + + filter := bson.D{{"thingid", thingid}} + cur, err := coll.Find(ctx, filter, findOptions) + if err != nil { + return twins.TwinsPage{}, err + } + + results, err := decodeTwins(ctx, cur) + if err != nil { + return twins.TwinsPage{}, err + } + + total, err := coll.CountDocuments(ctx, filter) + if err != nil { + return twins.TwinsPage{}, err + } + + return twins.TwinsPage{ + Twins: results, + PageMetadata: twins.PageMetadata{ + Total: uint64(total), + Offset: offset, + Limit: limit, + }, + }, nil +} + +func (tr *twinRepository) Remove(ctx context.Context, id string) error { + coll := tr.db.Collection(twinsCollection) + + filter := bson.D{{"id", id}} + res, err := coll.DeleteOne(context.Background(), filter) + if err != nil { + return err + } + + if res.DeletedCount < 1 { + return twins.ErrNotFound + } + + return nil +} + +func decodeTwins(ctx context.Context, cur *mongo.Cursor) ([]twins.Twin, error) { + defer cur.Close(ctx) + var results []twins.Twin + for cur.Next(ctx) { + var elem twins.Twin + err := cur.Decode(&elem) + if err != nil { + return []twins.Twin{}, nil + } + results = append(results, elem) + } + + if err := cur.Err(); err != nil { + return []twins.Twin{}, nil + } + return results, nil +} diff --git a/twins/mongodb/twins_test.go b/twins/mongodb/twins_test.go new file mode 100644 index 00000000..3a45a036 --- /dev/null +++ b/twins/mongodb/twins_test.go @@ -0,0 +1,385 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mongodb_test + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/mongodb" + "github.com/mainflux/mainflux/twins/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const ( + maxNameSize = 1024 + msgsNum = 10 + testDB = "test" + collection = "twins" + email = "mfx_twin@example.com" + validName = "mfx_twin" +) + +var ( + port string + addr string + testLog, _ = log.New(os.Stdout, log.Info.String()) + idp = uuid.New() + db mongo.Database + invalidName = strings.Repeat("m", maxNameSize+1) +) + +func TestTwinsSave(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + nonexistentTwinID, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + twin := twins.Twin{ + Owner: email, + ID: twid, + } + + cases := []struct { + desc string + twin twins.Twin + err error + }{ + { + desc: "create new twin", + twin: twin, + err: nil, + }, + { + desc: "create twin with invalid name", + twin: twins.Twin{ + ID: nonexistentTwinID, + Owner: email, + Name: invalidName, + }, + err: twins.ErrMalformedEntity, + }, + } + + for _, tc := range cases { + _, err := repo.Save(context.Background(), tc.twin) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestTwinsUpdate(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + nonexistentTwinID, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + twin := twins.Twin{ + ID: twid, + Name: validName, + } + + if _, err := repo.Save(context.Background(), twin); err != nil { + testLog.Error(err.Error()) + } + + twin.Name = "new_name" + cases := []struct { + desc string + twin twins.Twin + err error + }{ + { + desc: "update existing twin", + twin: twin, + err: nil, + }, + { + desc: "update non-existing twin", + twin: twins.Twin{ + ID: nonexistentTwinID, + }, + err: twins.ErrNotFound, + }, + { + desc: "update twin with invalid name", + twin: twins.Twin{ + ID: twid, + Owner: email, + Name: invalidName, + }, + err: twins.ErrMalformedEntity, + }, + } + + for _, tc := range cases { + err := repo.Update(context.Background(), tc.twin) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestTwinsRetrieveByID(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + nonexistentTwinID, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + twin := twins.Twin{ + ID: twid, + } + + if _, err := repo.Save(context.Background(), twin); err != nil { + testLog.Error(err.Error()) + } + + cases := []struct { + desc string + id string + err error + }{ + { + desc: "retrieve an existing twin", + id: twin.ID, + err: nil, + }, + { + desc: "retrieve a non-existing twin", + id: nonexistentTwinID, + err: twins.ErrNotFound, + }, + } + + for _, tc := range cases { + _, err := repo.RetrieveByID(context.Background(), tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestTwinsRetrieveByThing(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + thingid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + nonexistentThingID, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + twin := twins.Twin{ + ID: twid, + ThingID: thingid, + } + + if _, err := repo.Save(context.Background(), twin); err != nil { + testLog.Error(err.Error()) + } + + cases := []struct { + desc string + thingid string + err error + }{ + { + desc: "retrieve an existing twin", + thingid: thingid, + err: nil, + }, + { + desc: "retrieve a non-existing twin", + thingid: nonexistentThingID, + err: twins.ErrNotFound, + }, + } + + for _, tc := range cases { + _, err := repo.RetrieveByThing(context.Background(), tc.thingid) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestTwinsRetrieveAll(t *testing.T) { + email := "twin-multi-retrieval@example.com" + name := "mainflux" + metadata := twins.Metadata{ + "type": "test", + } + wrongMetadata := twins.Metadata{ + "wrong": "wrong", + } + idp := uuid.New() + + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + db.Collection(collection).DeleteMany(context.Background(), bson.D{}) + + twinRepo := mongodb.NewTwinRepository(db) + + n := uint64(10) + for i := uint64(0); i < n; i++ { + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + tw := twins.Twin{ + Owner: email, + ID: twid, + Metadata: metadata, + } + + // Create first two Twins with name. + if i < 2 { + tw.Name = name + } + + twinRepo.Save(context.Background(), tw) + } + + cases := map[string]struct { + owner string + limit uint64 + offset uint64 + name string + size uint64 + total uint64 + metadata twins.Metadata + }{ + "retrieve all twins with existing owner": { + owner: email, + offset: 0, + limit: n, + size: n, + total: n, + }, + "retrieve subset of twins with existing owner": { + owner: email, + offset: 0, + limit: n / 2, + size: n / 2, + total: n, + }, + "retrieve twins with non-existing owner": { + owner: wrongValue, + offset: 0, + limit: n, + size: 0, + total: 0, + }, + "retrieve twins with existing name": { + offset: 0, + limit: 1, + name: name, + size: 1, + total: 2, + }, + "retrieve twins with non-existing name": { + offset: 0, + limit: n, + name: "wrong", + size: 0, + total: 0, + }, + "retrieve twins with metadata": { + offset: 0, + limit: n, + size: n, + total: n, + metadata: metadata, + }, + "retrieve twins with wrong metadata": { + offset: 0, + limit: n, + size: 0, + total: 0, + metadata: wrongMetadata, + }, + } + + for desc, tc := range cases { + page, err := twinRepo.RetrieveAll(context.Background(), tc.owner, tc.offset, tc.limit, tc.name, tc.metadata) + size := uint64(len(page.Twins)) + assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size)) + assert.Equal(t, tc.total, page.Total, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.total, page.Total)) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %d\n", desc, err)) + } +} + +func TestTwinsRemove(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + twid, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + nonexistentTwinID, err := idp.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + twin := twins.Twin{ + ID: twid, + } + + if _, err := repo.Save(context.Background(), twin); err != nil { + testLog.Error(err.Error()) + } + + cases := []struct { + desc string + id string + err error + }{ + { + desc: "remove an existing twin", + id: twin.ID, + err: nil, + }, + { + desc: "remove a non-existing twin", + id: nonexistentTwinID, + err: twins.ErrNotFound, + }, + } + + for _, tc := range cases { + err := repo.Remove(context.Background(), tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} diff --git a/twins/mqtt/publisher.go b/twins/mqtt/publisher.go new file mode 100644 index 00000000..44f8e5b2 --- /dev/null +++ b/twins/mqtt/publisher.go @@ -0,0 +1,84 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mqtt + +import ( + "fmt" + "os" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/mainflux/logger" +) + +// Mqtt stores mqtt client and topic +type Mqtt struct { + client mqtt.Client + channelID string +} + +// New instantiates the mqtt service. +func New(mc mqtt.Client, channelID string) Mqtt { + return Mqtt{ + client: mc, + channelID: channelID, + } +} + +// Connect to MQTT broker +func Connect(mqttURL, id, key string, logger logger.Logger) mqtt.Client { + opts := mqtt.NewClientOptions() + opts.AddBroker(mqttURL) + opts.SetClientID("twins") + opts.SetUsername(id) + opts.SetPassword(key) + opts.SetCleanSession(true) + opts.SetAutoReconnect(true) + opts.SetOnConnectHandler(func(c mqtt.Client) { + logger.Info("Connected to MQTT broker") + }) + opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) + os.Exit(1) + }) + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error())) + os.Exit(1) + } + + return client +} + +func (m *Mqtt) Channel() string { + return m.channelID +} + +func (m *Mqtt) publish(twinID, crudOp string, payload *[]byte) error { + topic := fmt.Sprintf("channels/%s/messages/%s/%s", m.channelID, twinID, crudOp) + if len(twinID) < 1 { + topic = fmt.Sprintf("channels/%s/messages/%s", m.channelID, crudOp) + } + + token := m.client.Publish(topic, 0, false, *payload) + token.Wait() + + return token.Error() +} + +// Publish sends mqtt message to a predefined topic +func (m *Mqtt) Publish(twinID *string, err *error, succOp, failOp string, payload *[]byte) error { + op := succOp + if *err != nil { + op = failOp + esb := []byte((*err).Error()) + payload = &esb + } + + if err := m.publish(*twinID, op, payload); err != nil { + return err + } + + return nil +} diff --git a/twins/nats/subscriber.go b/twins/nats/subscriber.go new file mode 100644 index 00000000..8676f742 --- /dev/null +++ b/twins/nats/subscriber.go @@ -0,0 +1,57 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "fmt" + + "github.com/gogo/protobuf/proto" + "github.com/mainflux/mainflux" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/mqtt" + "github.com/nats-io/go-nats" +) + +const ( + queue = "twins" + input = "channel.>" +) + +var crudOp = map[string]string{ + "stateSucc": "state/success", + "stateFail": "state/failure", +} + +type pubsub struct { + natsClient *nats.Conn + mqttClient mqtt.Mqtt + logger log.Logger + svc twins.Service +} + +// Subscribe to appropriate NATS topic +func Subscribe(nc *nats.Conn, mc mqtt.Mqtt, svc twins.Service, logger log.Logger) { + ps := pubsub{ + natsClient: nc, + mqttClient: mc, + logger: logger, + svc: svc, + } + ps.natsClient.QueueSubscribe(input, queue, ps.handleMsg) +} + +func (ps pubsub) handleMsg(m *nats.Msg) { + var msg mainflux.Message + if err := proto.Unmarshal(m.Data, &msg); err != nil { + ps.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err)) + return + } + + if msg.Channel == ps.mqttClient.Channel() { + return + } + + ps.svc.SaveState(&msg) +} diff --git a/twins/service.go b/twins/service.go new file mode 100644 index 00000000..0f574b74 --- /dev/null +++ b/twins/service.go @@ -0,0 +1,308 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package twins + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/twins/mqtt" + "github.com/mainflux/senml" + "github.com/nats-io/go-nats" +) + +var ( + // ErrMalformedEntity indicates malformed entity specification (e.g. + // invalid username or password). + ErrMalformedEntity = errors.New("malformed entity specification") + + // ErrUnauthorizedAccess indicates missing or invalid credentials provided + // when accessing a protected resource. + ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided") + + // ErrNotFound indicates a non-existent entity request. + ErrNotFound = errors.New("non-existent entity") + + // ErrConflict indicates that entity already exists. + ErrConflict = errors.New("entity already exists") +) + +// Service specifies an API that must be fullfiled by the domain service +// implementation, and all of its decorators (e.g. logging & metrics). +type Service interface { + // AddTwin adds new twin related to user identified by the provided key. + AddTwin(context.Context, string, Twin, Definition) (Twin, error) + + // UpdateTwin updates twin identified by the provided Twin that + // belongs to the user identified by the provided key. + UpdateTwin(context.Context, string, Twin, Definition) error + + // ViewTwin retrieves data about twin with the provided + // ID belonging to the user identified by the provided key. + ViewTwin(context.Context, string, string) (Twin, error) + + // ListTwins retrieves data about subset of twins that belongs to the + // user identified by the provided key. + ListTwins(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error) + + // ListStates retrieves data about subset of states that belongs to the + // twin identified by the id. + ListStates(context.Context, string, uint64, uint64, string) (StatesPage, error) + + // SaveState persists state into database + SaveState(*mainflux.Message) error + + // ListTwinsByThing retrieves data about subset of twins that represent + // specified thing belong to the user identified by + // the provided key. + ViewTwinByThing(context.Context, string, string) (Twin, error) + + // RemoveTwin removes the twin identified with the provided ID, that + // belongs to the user identified by the provided key. + RemoveTwin(context.Context, string, string) error +} + +var crudOp = map[string]string{ + "createSucc": "create/success", + "createFail": "create/failure", + "updateSucc": "update/success", + "updateFail": "update/failure", + "getSucc": "get/success", + "getFail": "get/failure", + "removeSucc": "remove/success", + "removeFail": "remove/failure", +} + +type twinsService struct { + natsClient *nats.Conn + mqttClient mqtt.Mqtt + auth mainflux.AuthNServiceClient + twins TwinRepository + states StateRepository + idp IdentityProvider +} + +var _ Service = (*twinsService)(nil) + +// New instantiates the twins service implementation. +func New(nc *nats.Conn, mc mqtt.Mqtt, auth mainflux.AuthNServiceClient, twins TwinRepository, sr StateRepository, idp IdentityProvider) Service { + return &twinsService{ + natsClient: nc, + mqttClient: mc, + auth: auth, + twins: twins, + states: sr, + idp: idp, + } +} + +func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, def Definition) (tw Twin, err error) { + var id string + var b []byte + defer ts.mqttClient.Publish(&id, &err, crudOp["createSucc"], crudOp["createFail"], &b) + + res, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return Twin{}, ErrUnauthorizedAccess + } + + twin.ID, err = ts.idp.ID() + if err != nil { + return Twin{}, err + } + + twin.Owner = res.GetValue() + + twin.Created = time.Now() + twin.Updated = time.Now() + + if len(def.Attributes) == 0 { + def = Definition{} + def.Attributes = make(map[string]Attribute) + } + def.Created = time.Now() + def.ID = 0 + twin.Definitions = append(twin.Definitions, def) + + twin.Revision = 0 + if _, err = ts.twins.Save(ctx, twin); err != nil { + return Twin{}, err + } + + id = twin.ID + b, err = json.Marshal(twin) + + return twin, nil +} + +func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) { + var b []byte + var id string + defer ts.mqttClient.Publish(&id, &err, crudOp["updateSucc"], crudOp["updateFail"], &b) + + _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return ErrUnauthorizedAccess + } + + tw, err := ts.twins.RetrieveByID(ctx, twin.ID) + if err != nil { + return err + } + tw.Updated = time.Now() + tw.Revision++ + + if twin.Name != "" { + tw.Name = twin.Name + } + + if twin.ThingID != "" { + tw.ThingID = twin.ThingID + } + + if len(def.Attributes) > 0 { + def.Created = time.Now() + def.ID = tw.Definitions[len(tw.Definitions)-1].ID + 1 + tw.Definitions = append(tw.Definitions, def) + } + + if len(twin.Metadata) == 0 { + tw.Metadata = twin.Metadata + } + + if err := ts.twins.Update(ctx, tw); err != nil { + return err + } + + id = twin.ID + b, err = json.Marshal(tw) + + return nil +} + +func (ts *twinsService) ViewTwin(ctx context.Context, token, id string) (tw Twin, err error) { + var b []byte + defer ts.mqttClient.Publish(&id, &err, crudOp["getSucc"], crudOp["getFail"], &b) + + _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return Twin{}, ErrUnauthorizedAccess + } + + twin, err := ts.twins.RetrieveByID(ctx, id) + if err != nil { + return Twin{}, err + } + + b, err = json.Marshal(twin) + + return twin, nil +} + +func (ts *twinsService) ViewTwinByThing(ctx context.Context, token, thingid string) (Twin, error) { + _, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return Twin{}, ErrUnauthorizedAccess + } + + return ts.twins.RetrieveByThing(ctx, thingid) +} + +func (ts *twinsService) RemoveTwin(ctx context.Context, token, id string) (err error) { + var b []byte + defer ts.mqttClient.Publish(&id, &err, crudOp["removeSucc"], crudOp["removeFail"], &b) + + _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return ErrUnauthorizedAccess + } + + if err := ts.twins.Remove(ctx, id); err != nil { + return err + } + + return nil +} + +func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata Metadata) (TwinsPage, error) { + res, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return TwinsPage{}, ErrUnauthorizedAccess + } + + return ts.twins.RetrieveAll(ctx, res.GetValue(), offset, limit, name, metadata) +} + +func (ts *twinsService) SaveState(msg *mainflux.Message) error { + var b []byte + var id string + var err error + defer ts.mqttClient.Publish(&id, &err, crudOp["stateSucc"], crudOp["stateFail"], &b) + + tw, err := ts.twins.RetrieveByThing(context.TODO(), msg.Publisher) + if err != nil { + return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err) + } + + var recs []senml.Record + if err := json.Unmarshal(msg.Payload, &recs); err != nil { + return fmt.Errorf("Unmarshal payload for %s failed: %s", msg.Publisher, err) + } + + st, err := ts.states.RetrieveLast(context.TODO(), tw.ID) + if err != nil { + return fmt.Errorf("Retrieve last state for %s failed: %s", msg.Publisher, err) + } + + if save := prepareState(&st, &tw, recs, msg); !save { + return nil + } + + if err := ts.states.Save(context.TODO(), st); err != nil { + return fmt.Errorf("Updating state for %s failed: %s", msg.Publisher, err) + } + + id = msg.Publisher + b = msg.Payload + + return nil +} + +func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (StatesPage, error) { + _, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) + if err != nil { + return StatesPage{}, ErrUnauthorizedAccess + } + + return ts.states.RetrieveAll(ctx, offset, limit, id) +} + +func prepareState(st *State, tw *Twin, recs []senml.Record, msg *mainflux.Message) bool { + def := tw.Definitions[len(tw.Definitions)-1] + st.TwinID = tw.ID + st.ID++ + st.Created = time.Now() + st.Definition = def.ID + if st.Payload == nil { + st.Payload = make(map[string]interface{}) + } + + save := false + for k, a := range def.Attributes { + if !a.PersistState { + continue + } + if a.Channel == msg.Channel && a.Subtopic == msg.Subtopic { + st.Payload[k] = recs[0].Value + save = true + break + } + } + + return save +} diff --git a/twins/service_test.go b/twins/service_test.go new file mode 100644 index 00000000..7a1f6aa4 --- /dev/null +++ b/twins/service_test.go @@ -0,0 +1,257 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package twins_test + +import ( + "context" + "fmt" + "testing" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/mocks" + nats "github.com/nats-io/go-nats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + twmqtt "github.com/mainflux/mainflux/twins/mqtt" +) + +const ( + twinName = "name" + wrongID = "" + token = "token" + wrongToken = "wrong-token" + email = "user@example.com" + natsURL = "nats://localhost:4222" + mqttURL = "tcp://localhost:1883" + topic = "topic" +) + +func newService(tokens map[string]string) twins.Service { + auth := mocks.NewAuthNServiceClient(tokens) + twinsRepo := mocks.NewTwinRepository() + statesRepo := mocks.NewStateRepository() + idp := mocks.NewIdentityProvider() + + nc, _ := nats.Connect(natsURL) + + opts := mqtt.NewClientOptions() + pc := mqtt.NewClient(opts) + + mc := twmqtt.New(pc, topic) + + return twins.New(nc, mc, auth, twinsRepo, statesRepo, idp) +} + +func TestAddTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + twin := twins.Twin{} + def := twins.Definition{} + + cases := []struct { + desc string + twin twins.Twin + token string + err error + }{ + { + desc: "add new twin", + twin: twin, + token: token, + err: nil, + }, + { + desc: "add twin with wrong credentials", + twin: twin, + token: wrongToken, + err: twins.ErrUnauthorizedAccess, + }, + } + + for _, tc := range cases { + _, err := svc.AddTwin(context.Background(), tc.token, tc.twin, def) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestUpdateTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + twin := twins.Twin{} + other := twins.Twin{} + def := twins.Definition{} + + other.ID = wrongID + saved, err := svc.AddTwin(context.Background(), token, twin, def) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err)) + + cases := []struct { + desc string + twin twins.Twin + token string + err error + }{ + { + desc: "update existing twin", + twin: saved, + token: token, + err: nil, + }, + { + desc: "update twin with wrong credentials", + twin: saved, + token: wrongToken, + err: twins.ErrUnauthorizedAccess, + }, + { + desc: "update non-existing twin", + twin: other, + token: token, + err: twins.ErrNotFound, + }, + } + + for _, tc := range cases { + err := svc.UpdateTwin(context.Background(), tc.token, tc.twin, def) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} + +func TestViewTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + twin := twins.Twin{} + def := twins.Definition{} + saved, err := svc.AddTwin(context.Background(), token, twin, def) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err)) + + cases := map[string]struct { + id string + token string + err error + }{ + "view existing twin": { + id: saved.ID, + token: token, + err: nil, + }, + "view twin with wrong credentials": { + id: saved.ID, + token: wrongToken, + err: twins.ErrUnauthorizedAccess, + }, + "view non-existing twin": { + id: wrongID, + token: token, + err: twins.ErrNotFound, + }, + } + + for desc, tc := range cases { + _, err := svc.ViewTwin(context.Background(), tc.token, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err)) + } +} + +func TestListTwins(t *testing.T) { + svc := newService(map[string]string{token: email}) + twin := twins.Twin{Name: twinName, Owner: email} + def := twins.Definition{} + m := make(map[string]interface{}) + m["serial"] = "123456" + twin.Metadata = m + + n := uint64(10) + for i := uint64(0); i < n; i++ { + svc.AddTwin(context.Background(), token, twin, def) + } + + cases := map[string]struct { + token string + offset uint64 + limit uint64 + size uint64 + metadata map[string]interface{} + err error + }{ + "list all twins": { + token: token, + offset: 0, + limit: n, + size: n, + err: nil, + }, + "list with zero limit": { + token: token, + limit: 0, + offset: 0, + size: 0, + err: nil, + }, + "list with offset and limit": { + token: token, + offset: 8, + limit: 5, + size: 2, + err: nil, + }, + "list with wrong credentials": { + token: wrongToken, + limit: 0, + offset: n, + err: twins.ErrUnauthorizedAccess, + }, + } + + for desc, tc := range cases { + page, err := svc.ListTwins(context.Background(), tc.token, tc.offset, tc.limit, twinName, tc.metadata) + size := uint64(len(page.Twins)) + assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", desc, tc.size, size)) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err)) + } +} + +func TestRemoveTwin(t *testing.T) { + svc := newService(map[string]string{token: email}) + twin := twins.Twin{} + def := twins.Definition{} + saved, err := svc.AddTwin(context.Background(), token, twin, def) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s\n", err)) + + cases := []struct { + desc string + id string + token string + err error + }{ + { + desc: "remove twin with wrong credentials", + id: saved.ID, + token: wrongToken, + err: twins.ErrUnauthorizedAccess, + }, + { + desc: "remove existing twin", + id: saved.ID, + token: token, + err: nil, + }, + { + desc: "remove removed twin", + id: saved.ID, + token: token, + err: nil, + }, + { + desc: "remove non-existing twin", + id: wrongID, + token: token, + err: nil, + }, + } + + for _, tc := range cases { + err := svc.RemoveTwin(context.Background(), tc.token, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + } +} diff --git a/twins/states.go b/twins/states.go new file mode 100644 index 00000000..6aa11046 --- /dev/null +++ b/twins/states.go @@ -0,0 +1,40 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package twins + +import ( + "context" + "time" +) + +// State stores actual snapshot of entity's values +type State struct { + TwinID string + ID int64 + Definition int + Created time.Time + Payload map[string]interface{} +} + +// StatesPage contains page related metadata as well as a list of twins that +// belong to this page. +type StatesPage struct { + PageMetadata + States []State +} + +// StateRepository specifies a state persistence API. +type StateRepository interface { + // Save persists the state + Save(context.Context, State) error + + // Count returns the number of states related to state + Count(context.Context, Twin) (int64, error) + + // RetrieveAll retrieves the subset of states related to twin specified by id + RetrieveAll(ctx context.Context, offset uint64, limit uint64, id string) (StatesPage, error) + + // RetrieveLast retrieves the last saved state + RetrieveLast(ctx context.Context, id string) (State, error) +} diff --git a/twins/twins.go b/twins/twins.go new file mode 100644 index 00000000..a7f01a9b --- /dev/null +++ b/twins/twins.go @@ -0,0 +1,77 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package twins + +import ( + "context" + "time" +) + +// Metadata stores arbitrary twin data +type Metadata map[string]interface{} + +// Attribute stores individual attribute data +type Attribute struct { + Channel string + Subtopic string + PersistState bool +} + +// Definition stores entity's attributes +type Definition struct { + ID int + Created time.Time + Attributes map[string]Attribute +} + +// Twin represents a Mainflux thing digital twin. Each twin is owned by one thing, and +// is assigned with the unique identifier. +type Twin struct { + Owner string + ID string + Name string + ThingID string + Created time.Time + Updated time.Time + Revision int + Definitions []Definition + Metadata Metadata +} + +// PageMetadata contains page metadata that helps navigation. +type PageMetadata struct { + Total uint64 + Offset uint64 + Limit uint64 + Name string +} + +// TwinsPage contains page related metadata as well as a list of twins that +// belong to this page. +type TwinsPage struct { + PageMetadata + Twins []Twin +} + +// TwinRepository specifies a twin persistence API. +type TwinRepository interface { + // Save persists the twin + Save(context.Context, Twin) (string, error) + + // Update performs an update to the existing twin. A non-nil error is + // returned to indicate operation failure. + Update(context.Context, Twin) error + + // RetrieveByID retrieves the twin having the provided identifier. + RetrieveByID(ctx context.Context, id string) (Twin, error) + + // RetrieveAll retrieves the subset of things owned by the specified user. + RetrieveAll(context.Context, string, uint64, uint64, string, Metadata) (TwinsPage, error) + + // RetrieveByThing retrieves twin that represents specified thing + RetrieveByThing(context.Context, string) (Twin, error) + + // Remove removes the twin having the provided identifier. + Remove(ctx context.Context, id string) error +} diff --git a/twins/uuid/idp.go b/twins/uuid/idp.go new file mode 100644 index 00000000..4b4a4237 --- /dev/null +++ b/twins/uuid/idp.go @@ -0,0 +1,36 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package uuid provides a UUID identity provider. +package uuid + +import ( + "github.com/gofrs/uuid" + "github.com/mainflux/mainflux/twins" +) + +var _ twins.IdentityProvider = (*uuidIdentityProvider)(nil) + +type uuidIdentityProvider struct{} + +// New instantiates a UUID identity provider. +func New() twins.IdentityProvider { + return &uuidIdentityProvider{} +} + +func (idp *uuidIdentityProvider) ID() (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + return id.String(), nil +} + +func (idp *uuidIdentityProvider) IsValid(u4 string) error { + if _, err := uuid.FromString(u4); err != nil { + return twins.ErrMalformedEntity + } + + return nil +}