// Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 package main import ( "fmt" "io" "io/ioutil" "log" "net/http" "os" "os/signal" "strconv" "syscall" "time" kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/go-redis/redis" "github.com/mainflux/mainflux" authapi "github.com/mainflux/mainflux/auth/api/grpc" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/uuid" localusers "github.com/mainflux/mainflux/things/users" "github.com/mainflux/mainflux/twins" "github.com/mainflux/mainflux/twins/api" twapi "github.com/mainflux/mainflux/twins/api/http" twmongodb "github.com/mainflux/mainflux/twins/mongodb" rediscache "github.com/mainflux/mainflux/twins/redis" "github.com/mainflux/mainflux/twins/tracing" opentracing "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" jconfig "github.com/uber/jaeger-client-go/config" "go.mongodb.org/mongo-driver/mongo" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) const ( queue = "twins" defLogLevel = "error" defHTTPPort = "8180" defJaegerURL = "" defServerCert = "" defServerKey = "" defDB = "mainflux-twins" defDBHost = "localhost" defDBPort = "27017" defCacheURL = "localhost:6379" defCachePass = "" defCacheDB = "0" defSingleUserEmail = "" defSingleUserToken = "" defClientTLS = "false" defCACerts = "" defChannelID = "" defNatsURL = "nats://localhost:4222" defAuthURL = "localhost:8181" defAuthTimeout = "1s" envLogLevel = "MF_TWINS_LOG_LEVEL" envHTTPPort = "MF_TWINS_HTTP_PORT" envJaegerURL = "MF_JAEGER_URL" envServerCert = "MF_TWINS_SERVER_CERT" envServerKey = "MF_TWINS_SERVER_KEY" envDB = "MF_TWINS_DB" envDBHost = "MF_TWINS_DB_HOST" envDBPort = "MF_TWINS_DB_PORT" envCacheURL = "MF_TWINS_CACHE_URL" envCachePass = "MF_TWINS_CACHE_PASS" envCacheDB = "MF_TWINS_CACHE_DB" envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL" envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN" envClientTLS = "MF_TWINS_CLIENT_TLS" envCACerts = "MF_TWINS_CA_CERTS" envChannelID = "MF_TWINS_CHANNEL_ID" envNatsURL = "MF_NATS_URL" envAuthURL = "MF_AUTH_GRPC_URL" envAuthTimeout = "MF_AUTH_GRPC_TIMEOUT" ) type config struct { logLevel string httpPort string jaegerURL string serverCert string serverKey string dbCfg twmongodb.Config cacheURL string cachePass string cacheDB string singleUserEmail string singleUserToken string clientTLS bool caCerts string channelID string natsURL string authURL string authTimeout time.Duration } func main() { cfg := loadConfig() logger, err := logger.New(os.Stdout, cfg.logLevel) if err != nil { log.Fatalf(err.Error()) } cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger) cacheTracer, cacheCloser := initJaeger("twins_cache", cfg.jaegerURL, logger) defer cacheCloser.Close() db, err := twmongodb.Connect(cfg.dbCfg, logger) if err != nil { logger.Error(err.Error()) os.Exit(1) } dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger) defer dbCloser.Close() authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger) defer authCloser.Close() auth, _ := createAuthClient(cfg, authTracer, logger) pubSub, err := nats.NewPubSub(cfg.natsURL, queue, logger) if err != nil { logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) os.Exit(1) } defer pubSub.Close() svc := newService(pubSub, cfg.channelID, auth, dbTracer, db, cacheTracer, cacheClient, logger) tracer, closer := initJaeger("twins", cfg.jaegerURL, logger) defer closer.Close() errs := make(chan error, 2) go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs) go func() { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT) errs <- fmt.Errorf("%s", <-c) }() err = <-errs logger.Error(fmt.Sprintf("Twins service terminated: %s", err)) } func loadConfig() config { tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS)) if err != nil { log.Fatalf("Invalid value passed for %s\n", envClientTLS) } authTimeout, err := time.ParseDuration(mainflux.Env(envAuthTimeout, defAuthTimeout)) if err != nil { log.Fatalf("Invalid %s value: %s", envAuthTimeout, err.Error()) } dbCfg := twmongodb.Config{ Name: mainflux.Env(envDB, defDB), Host: mainflux.Env(envDBHost, defDBHost), Port: mainflux.Env(envDBPort, defDBPort), } return config{ logLevel: mainflux.Env(envLogLevel, defLogLevel), httpPort: mainflux.Env(envHTTPPort, defHTTPPort), serverCert: mainflux.Env(envServerCert, defServerCert), serverKey: mainflux.Env(envServerKey, defServerKey), jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL), dbCfg: dbCfg, cacheURL: mainflux.Env(envCacheURL, defCacheURL), cachePass: mainflux.Env(envCachePass, defCachePass), cacheDB: mainflux.Env(envCacheDB, defCacheDB), singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail), singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken), clientTLS: tls, caCerts: mainflux.Env(envCACerts, defCACerts), channelID: mainflux.Env(envChannelID, defChannelID), natsURL: mainflux.Env(envNatsURL, defNatsURL), authURL: mainflux.Env(envAuthURL, defAuthURL), authTimeout: authTimeout, } } func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) { if url == "" { return opentracing.NoopTracer{}, ioutil.NopCloser(nil) } tracer, closer, err := jconfig.Configuration{ ServiceName: svcName, Sampler: &jconfig.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jconfig.ReporterConfig{ LocalAgentHostPort: url, LogSpans: true, }, }.NewTracer() if err != nil { logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err)) os.Exit(1) } return tracer, closer } func createAuthClient(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthServiceClient, func() error) { if cfg.singleUserEmail != "" && cfg.singleUserToken != "" { return localusers.NewSingleUserService(cfg.singleUserEmail, cfg.singleUserToken), nil } conn := connectToAuth(cfg, logger) return authapi.NewClient(tracer, conn, cfg.authTimeout), conn.Close } func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn { var opts []grpc.DialOption if cfg.clientTLS { if cfg.caCerts != "" { tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "") if err != nil { logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err)) os.Exit(1) } opts = append(opts, grpc.WithTransportCredentials(tpc)) } } else { opts = append(opts, grpc.WithInsecure()) logger.Info("gRPC communication is not encrypted") } conn, err := grpc.Dial(cfg.authURL, opts...) if err != nil { logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err)) os.Exit(1) } return conn } func connectToRedis(cacheURL, cachePass, cacheDB string, logger logger.Logger) *redis.Client { db, err := strconv.Atoi(cacheDB) if err != nil { logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err)) os.Exit(1) } return redis.NewClient(&redis.Options{ Addr: cacheURL, Password: cachePass, DB: db, }) } func newService(ps messaging.PubSub, chanID string, users mainflux.AuthServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger logger.Logger) twins.Service { twinRepo := twmongodb.NewTwinRepository(db) twinRepo = tracing.TwinRepositoryMiddleware(dbTracer, twinRepo) stateRepo := twmongodb.NewStateRepository(db) stateRepo = tracing.StateRepositoryMiddleware(dbTracer, stateRepo) idProvider := uuid.New() twinCache := rediscache.NewTwinCache(cacheClient) twinCache = tracing.TwinCacheMiddleware(cacheTracer, twinCache) svc := twins.New(ps, users, twinRepo, twinCache, stateRepo, idProvider, chanID, logger) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( svc, kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: "twins", Subsystem: "api", Name: "request_count", Help: "Number of requests received.", }, []string{"method"}), kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ Namespace: "twins", Subsystem: "api", Name: "request_latency_microseconds", Help: "Total duration of requests in microseconds.", }, []string{"method"}), ) err := ps.Subscribe(nats.SubjectAllChannels, func(msg messaging.Message) error { if msg.Channel == chanID { return nil } if err := svc.SaveStates(&msg); err != nil { logger.Error(fmt.Sprintf("State save failed: %s", err)) return err } return nil }) if err != nil { logger.Error(err.Error()) os.Exit(1) } return svc } func startHTTPServer(handler http.Handler, port string, cfg config, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", port) if cfg.serverCert != "" || cfg.serverKey != "" { logger.Info(fmt.Sprintf("Twins service started using https on port %s with cert %s key %s", port, cfg.serverCert, cfg.serverKey)) errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, handler) return } logger.Info(fmt.Sprintf("Twins service started using http on port %s", cfg.httpPort)) errs <- http.ListenAndServe(p, handler) }