diff --git a/cmd/lora/main.go b/cmd/lora/main.go index f2b2dbb0..fb93d7e6 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -4,6 +4,8 @@ package main import ( + "context" + "encoding/json" "fmt" "log" "net/http" @@ -11,14 +13,15 @@ import ( "os/signal" "strconv" "syscall" + "time" - mqttPaho "github.com/eclipse/paho.mqtt.golang" r "github.com/go-redis/redis" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/lora" "github.com/mainflux/mainflux/lora/api" - "github.com/mainflux/mainflux/lora/mqtt" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/mqtt" "github.com/mainflux/mainflux/pkg/messaging/nats" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -30,6 +33,7 @@ const ( defLogLevel = "error" defHTTPPort = "8180" defLoraMsgURL = "tcp://localhost:1883" + defSubTimeout = "30s" // 30 seconds defNatsURL = "nats://localhost:4222" defESURL = "localhost:6379" defESPass = "" @@ -41,6 +45,7 @@ const ( envHTTPPort = "MF_LORA_ADAPTER_HTTP_PORT" envLoraMsgURL = "MF_LORA_ADAPTER_MESSAGES_URL" + envSubTimeout = "MF_LORA_ADAPTER_SUBSCRIBER_TIMEOUT" envNatsURL = "MF_NATS_URL" envLogLevel = "MF_LORA_ADAPTER_LOG_LEVEL" envESURL = "MF_THINGS_ES_URL" @@ -61,6 +66,7 @@ type config struct { httpPort string loraMsgURL string natsURL string + subTimeout time.Duration logLevel string esURL string esPass string @@ -95,8 +101,6 @@ func main() { thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger) chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger) - mqttConn := connectToMQTTBroker(cfg.loraMsgURL, logger) - svc := lora.New(pub, thingRM, chanRM) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( @@ -115,7 +119,14 @@ func main() { }, []string{"method"}), ) - go subscribeToLoRaBroker(svc, mqttConn, logger) + msub, err := mqtt.NewSubscriber(cfg.loraMsgURL, cfg.subTimeout, logger) + if err != nil { + logger.Error(fmt.Sprintf("Failed to create MQTT subscriber: %s", err)) + os.Exit(1) + } + + go subscribeToLoRaBroker(svc, msub, logger) + go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger) errs := make(chan error, 2) @@ -133,9 +144,14 @@ func main() { } func loadConfig() config { + mqttTimeout, err := time.ParseDuration(mainflux.Env(envSubTimeout, defSubTimeout)) + if err != nil { + log.Fatalf("Invalid %s value: %s", envSubTimeout, err.Error()) + } return config{ httpPort: mainflux.Env(envHTTPPort, defHTTPPort), loraMsgURL: mainflux.Env(envLoraMsgURL, defLoraMsgURL), + subTimeout: mqttTimeout, natsURL: mainflux.Env(envNatsURL, defNatsURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), esURL: mainflux.Env(envESURL, defESURL), @@ -148,28 +164,6 @@ func loadConfig() config { } } -func connectToMQTTBroker(loraURL string, logger logger.Logger) mqttPaho.Client { - opts := mqttPaho.NewClientOptions() - opts.AddBroker(loraURL) - opts.SetUsername("") - opts.SetPassword("") - opts.SetOnConnectHandler(func(c mqttPaho.Client) { - logger.Info("Connected to Lora MQTT broker") - }) - opts.SetConnectionLostHandler(func(c mqttPaho.Client, err error) { - logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) - os.Exit(1) - }) - - client := mqttPaho.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - logger.Error(fmt.Sprintf("Failed to connect to Lora MQTT broker: %s", token.Error())) - os.Exit(1) - } - - return client -} - func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *r.Client { db, err := strconv.Atoi(redisDB) if err != nil { @@ -184,20 +178,30 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, logger logger.Logger) { - mqtt := mqtt.NewBroker(svc, mc, logger) - logger.Info("Subscribed to Lora MQTT broker") - if err := mqtt.Subscribe(loraServerTopic); err != nil { - logger.Error(fmt.Sprintf("Failed to subscribe to Lora MQTT broker: %s", err)) +func subscribeToLoRaBroker(svc lora.Service, msub messaging.Subscriber, logger logger.Logger) { + err := msub.Subscribe(loraServerTopic, func(msg messaging.Message) error { + var m lora.Message + if err := json.Unmarshal(msg.Payload, &m); err != nil { + logger.Warn(fmt.Sprintf("Failed to Unmarshal message: %s", err.Error())) + return err + } + if err := svc.Publish(context.Background(), "", m); err != nil { + return err + } + return nil + }) + if err != nil { + logger.Error(fmt.Sprintf("Failed to subscribe to LoRa MQTT broker: %s", err)) os.Exit(1) } + logger.Info("Subscribed to LoRa MQTT broker") } func subscribeToThingsES(svc lora.Service, client *r.Client, consumer string, logger logger.Logger) { eventStore := redis.NewEventStore(svc, client, consumer, logger) logger.Info("Subscribed to Redis Event Store") if err := eventStore.Subscribe("mainflux.things"); err != nil { - logger.Warn(fmt.Sprintf("Lora-adapter service failed to subscribe to Redis event source: %s", err)) + logger.Warn(fmt.Sprintf("LoRa-adapter service failed to subscribe to Redis event source: %s", err)) } } @@ -208,6 +212,6 @@ func newRouteMapRepositoy(client *r.Client, prefix string, logger logger.Logger) func startHTTPServer(cfg config, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", cfg.httpPort) - logger.Info(fmt.Sprintf("lora-adapter service started, exposed port %s", cfg.httpPort)) + logger.Info(fmt.Sprintf("LoRa-adapter service started, exposed port %s", cfg.httpPort)) errs <- http.ListenAndServe(p, api.MakeHandler()) } diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index d7fe2889..ed3a5d7b 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -152,14 +152,14 @@ func main() { } defer nps.Close() - mp, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort), cfg.mqttForwarderTimeout) + mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort), cfg.mqttForwarderTimeout) if err != nil { logger.Error(fmt.Sprintf("Failed to create MQTT publisher: %s", err)) os.Exit(1) } fwd := mqtt.NewForwarder(nats.SubjectAllChannels, logger) - if err := fwd.Forward(nps, mp); err != nil { + if err := fwd.Forward(nps, mpub); err != nil { logger.Error(fmt.Sprintf("Failed to forward NATS messages: %s", err)) os.Exit(1) } @@ -216,7 +216,7 @@ func loadConfig() config { mqttTimeout, err := time.ParseDuration(mainflux.Env(envMQTTForwarderTimeout, defMQTTForwarderTimeout)) if err != nil { - log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error()) + log.Fatalf("Invalid %s value: %s", envMQTTForwarderTimeout, err.Error()) } return config{ diff --git a/lora/mqtt/sub.go b/lora/mqtt/sub.go deleted file mode 100644 index 03fa4237..00000000 --- a/lora/mqtt/sub.go +++ /dev/null @@ -1,56 +0,0 @@ -package mqtt - -// LoraSubscribe subscribe to lora server messages -import ( - "context" - "encoding/json" - "fmt" - - "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/lora" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -// Subscriber represents the MQTT broker. -type Subscriber interface { - // Subscribes to given subject and receives events. - Subscribe(string) error -} - -type broker struct { - svc lora.Service - client mqtt.Client - logger logger.Logger -} - -// NewBroker returns new MQTT broker instance. -func NewBroker(svc lora.Service, client mqtt.Client, log logger.Logger) Subscriber { - return broker{ - svc: svc, - client: client, - logger: log, - } -} - -// Subscribe subscribes to the Lora MQTT message broker -func (b broker) Subscribe(subject string) error { - s := b.client.Subscribe(subject, 0, b.handleMsg) - if err := s.Error(); s.Wait() && err != nil { - return err - } - - return nil -} - -// handleMsg triggered when new message is received on Lora MQTT broker -func (b broker) handleMsg(c mqtt.Client, msg mqtt.Message) { - m := lora.Message{} - if err := json.Unmarshal(msg.Payload(), &m); err != nil { - b.logger.Warn(fmt.Sprintf("Failed to Unmarshal message: %s", err.Error())) - return - } - - b.svc.Publish(context.Background(), "", m) - return -}