Mainflux.mainflux/cmd/twins/main.go

325 lines
9.7 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/nats"
uuidProvider "github.com/mainflux/mainflux/pkg/uuid"
localusers "github.com/mainflux/mainflux/things/users"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/api"
twapi "github.com/mainflux/mainflux/twins/api/http"
twmongodb "github.com/mainflux/mainflux/twins/mongodb"
rediscache "github.com/mainflux/mainflux/twins/redis"
"github.com/mainflux/mainflux/twins/tracing"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
queue = "twins"
defLogLevel = "error"
defHTTPPort = "8180"
defJaegerURL = ""
defServerCert = ""
defServerKey = ""
defDB = "mainflux-twins"
defDBHost = "localhost"
defDBPort = "27017"
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defSingleUserEmail = ""
defSingleUserToken = ""
defClientTLS = "false"
defCACerts = ""
defChannelID = ""
defNatsURL = "nats://localhost:4222"
defAuthnURL = "localhost:8181"
defAuthnTimeout = "1s"
envLogLevel = "MF_TWINS_LOG_LEVEL"
envHTTPPort = "MF_TWINS_HTTP_PORT"
envJaegerURL = "MF_JAEGER_URL"
envServerCert = "MF_TWINS_SERVER_CERT"
envServerKey = "MF_TWINS_SERVER_KEY"
envDB = "MF_TWINS_DB"
envDBHost = "MF_TWINS_DB_HOST"
envDBPort = "MF_TWINS_DB_PORT"
envCacheURL = "MF_TWINS_CACHE_URL"
envCachePass = "MF_TWINS_CACHE_PASS"
envCacheDB = "MF_TWINS_CACHE_DB"
envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN"
envClientTLS = "MF_TWINS_CLIENT_TLS"
envCACerts = "MF_TWINS_CA_CERTS"
envChannelID = "MF_TWINS_CHANNEL_ID"
envNatsURL = "MF_NATS_URL"
envAuthnURL = "MF_AUTH_GRPC_URL"
envAuthnTimeout = "MF_AUTH_GRPC_TIMEOUT"
)
type config struct {
logLevel string
httpPort string
jaegerURL string
serverCert string
serverKey string
dbCfg twmongodb.Config
cacheURL string
cachePass string
cacheDB string
singleUserEmail string
singleUserToken string
clientTLS bool
caCerts string
channelID string
natsURL string
authnURL string
authnTimeout time.Duration
}
func main() {
cfg := loadConfig()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger)
cacheTracer, cacheCloser := initJaeger("twins_cache", cfg.jaegerURL, logger)
defer cacheCloser.Close()
db, err := twmongodb.Connect(cfg.dbCfg, logger)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
dbTracer, dbCloser := initJaeger("twins_db", cfg.jaegerURL, logger)
defer dbCloser.Close()
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()
auth, _ := createAuthClient(cfg, authTracer, logger)
pubSub, err := nats.NewPubSub(cfg.natsURL, queue, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer pubSub.Close()
svc := newService(pubSub, cfg.channelID, auth, dbTracer, db, cacheTracer, cacheClient, logger)
tracer, closer := initJaeger("twins", cfg.jaegerURL, logger)
defer closer.Close()
errs := make(chan error, 2)
go startHTTPServer(twapi.MakeHandler(tracer, svc), cfg.httpPort, cfg, logger, errs)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err = <-errs
logger.Error(fmt.Sprintf("Twins service terminated: %s", err))
}
func loadConfig() config {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}
authnTimeout, err := time.ParseDuration(mainflux.Env(envAuthnTimeout, defAuthnTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envAuthnTimeout, err.Error())
}
dbCfg := twmongodb.Config{
Name: mainflux.Env(envDB, defDB),
Host: mainflux.Env(envDBHost, defDBHost),
Port: mainflux.Env(envDBPort, defDBPort),
}
return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
dbCfg: dbCfg,
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
channelID: mainflux.Env(envChannelID, defChannelID),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
authnURL: mainflux.Env(envAuthnURL, defAuthnURL),
authnTimeout: authnTimeout,
}
}
func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}
tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}
return tracer, closer
}
func createAuthClient(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthServiceClient, func() error) {
if cfg.singleUserEmail != "" && cfg.singleUserToken != "" {
return localusers.NewSingleUserService(cfg.singleUserEmail, cfg.singleUserToken), nil
}
conn := connectToAuth(cfg, logger)
return authapi.NewClient(tracer, conn, cfg.authnTimeout), conn.Close
}
func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}
conn, err := grpc.Dial(cfg.authnURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to authn service: %s", err))
os.Exit(1)
}
return conn
}
func connectToRedis(cacheURL, cachePass, cacheDB string, logger logger.Logger) *redis.Client {
db, err := strconv.Atoi(cacheDB)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
os.Exit(1)
}
return redis.NewClient(&redis.Options{
Addr: cacheURL,
Password: cachePass,
DB: db,
})
}
func newService(ps messaging.PubSub, chanID string, users mainflux.AuthServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db)
twinRepo = tracing.TwinRepositoryMiddleware(dbTracer, twinRepo)
stateRepo := twmongodb.NewStateRepository(db)
stateRepo = tracing.StateRepositoryMiddleware(dbTracer, stateRepo)
up := uuidProvider.New()
twinCache := rediscache.NewTwinCache(cacheClient)
twinCache = tracing.TwinCacheMiddleware(cacheTracer, twinCache)
svc := twins.New(ps, users, twinRepo, twinCache, stateRepo, up, chanID, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "twins",
Subsystem: "api",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "twins",
Subsystem: "api",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
err := ps.Subscribe(nats.SubjectAllChannels, func(msg messaging.Message) error {
if msg.Channel == chanID {
return nil
}
if err := svc.SaveStates(&msg); err != nil {
logger.Error(fmt.Sprintf("State save failed: %s", err))
return err
}
return nil
})
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
return svc
}
func startHTTPServer(handler http.Handler, port string, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("Twins service started using https on port %s with cert %s key %s",
port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, handler)
return
}
logger.Info(fmt.Sprintf("Twins service started using http on port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, handler)
}