Register http adapter to consul

Signed-off-by: Dejan Mijic <dejan@mainflux.com>
This commit is contained in:
Dejan Mijic 2017-10-15 18:07:29 +02:00
parent 2e5316e3fd
commit 3f9f56d0d4
1 changed files with 71 additions and 29 deletions

View File

@ -9,34 +9,65 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
kitprometheus "github.com/go-kit/kit/metrics/prometheus" kitprometheus "github.com/go-kit/kit/metrics/prometheus"
kitconsul "github.com/go-kit/kit/sd/consul"
stdconsul "github.com/hashicorp/consul/api"
adapter "github.com/mainflux/mainflux/http" adapter "github.com/mainflux/mainflux/http"
"github.com/mainflux/mainflux/http/api" "github.com/mainflux/mainflux/http/api"
"github.com/mainflux/mainflux/http/nats" "github.com/mainflux/mainflux/http/nats"
broker "github.com/nats-io/go-nats" broker "github.com/nats-io/go-nats"
stdprometheus "github.com/prometheus/client_golang/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"
) )
const ( const (
port int = 7070 port int = 9002
defNatsURL string = broker.DefaultURL natsKey string = "nats"
envNatsURL string = "HTTP_ADAPTER_NATS_URL"
) )
type config struct { var (
Port int kv *stdconsul.KV
NatsURL string logger log.Logger
} )
func main() { func main() {
cfg := config{ logger = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
Port: port,
NatsURL: env(envNatsURL, defNatsURL),
}
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "ts", log.DefaultTimestampUTC)
nc, err := broker.Connect(cfg.NatsURL) consulAddr := os.Getenv("CONSUL_ADDR")
if consulAddr == "" {
logger.Log("status", "Cannot start the service: CONSUL_ADDR not set.")
os.Exit(1)
}
consul, err := stdconsul.NewClient(&stdconsul.Config{
Address: consulAddr,
})
if err != nil {
status := fmt.Sprintf("Cannot connect to Consul due to %s", err)
logger.Log("status", status)
os.Exit(1)
}
kv = consul.KV()
asr := &stdconsul.AgentServiceRegistration{
ID: uuid.NewV4().String(),
Name: "http-adapter",
Tags: []string{},
Port: port,
Address: "",
EnableTagOverride: false,
}
sd := kitconsul.NewClient(consul)
if err = sd.Register(asr); err != nil {
status := fmt.Sprintf("Cannot register service due to %s", err)
logger.Log("status", status)
os.Exit(1)
}
nc, err := broker.Connect(get(natsKey))
if err != nil { if err != nil {
status := fmt.Sprintf("Cannot connect to NATS due to %s", err.Error()) status := fmt.Sprintf("Cannot connect to NATS due to %s", err.Error())
logger.Log("status", status) logger.Log("status", status)
@ -66,29 +97,40 @@ func main() {
svc, svc,
) )
errs := make(chan error, 2) errChan := make(chan error, 10)
go func() { go func() {
p := fmt.Sprintf(":%d", cfg.Port) p := fmt.Sprintf(":%d", port)
logger.Log("status", "HTTP adapter started.") logger.Log("status", "HTTP adapter started.")
errs <- http.ListenAndServe(p, api.MakeHandler(svc)) errChan <- http.ListenAndServe(p, api.MakeHandler(svc))
}() }()
go func() { sigChan := make(chan os.Signal, 1)
c := make(chan os.Signal) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
status := fmt.Sprintf("HTTP adapter stopped due to %s", <-errs) for {
logger.Log("status", status) select {
case err := <-errChan:
status := fmt.Sprintf("HTTP adapter stopped due to %s", err)
logger.Log("status", status)
sd.Deregister(asr)
os.Exit(1)
case <-sigChan:
status := fmt.Sprintf("HTTP adapter terminated.")
logger.Log("status", status)
sd.Deregister(asr)
os.Exit(0)
}
}
} }
func env(key, fallback string) string { func get(key string) string {
value := os.Getenv(key) pair, _, err := kv.Get(key, nil)
if value == "" { if err != nil {
return fallback status := fmt.Sprintf("Cannot retrieve %s due to %s", key, err)
logger.Log("status", status)
os.Exit(1)
} }
return value return string(pair.Value)
} }