diff --git a/cmd/coap/main.go b/cmd/coap/main.go index cac5bc3c..7c3b8566 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -20,8 +20,8 @@ import ( "github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap/api" logger "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging/nats" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" - broker "github.com/nats-io/nats.go" opentracing "github.com/opentracing/opentracing-go" gocoap "github.com/plgd-dev/go-coap/v2" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -77,13 +77,14 @@ func main() { tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) - nc, err := broker.Connect(cfg.natsURL) + pubsub, err := nats.NewPubSub(cfg.natsURL, "coap", logger) if err != nil { log.Fatalf(err.Error()) } - defer nc.Close() - svc := coap.New(tc, nc) + defer pubsub.Close() + + svc := coap.New(tc, pubsub) svc = api.LoggingMiddleware(svc, logger) diff --git a/coap/adapter.go b/coap/adapter.go index 8ec1f873..6a5246bc 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -11,9 +11,8 @@ import ( "fmt" "sync" - "github.com/gogo/protobuf/proto" "github.com/mainflux/mainflux/pkg/errors" - broker "github.com/nats-io/nats.go" + "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/pkg/messaging" @@ -42,16 +41,16 @@ var _ Service = (*adapterService)(nil) // Observers is a map of maps, type adapterService struct { auth mainflux.ThingsServiceClient - conn *broker.Conn + pubsub nats.PubSub observers map[string]observers obsLock sync.Mutex } // New instantiates the CoAP adapter implementation. -func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service { +func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service { as := &adapterService{ auth: auth, - conn: nc, + pubsub: pubsub, observers: make(map[string]observers), obsLock: sync.Mutex{}, } @@ -70,17 +69,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin } msg.Publisher = thid.GetValue() - data, err := proto.Marshal(&msg) - if err != nil { - return err - } - - subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel) - if msg.Subtopic != "" { - subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) - } - - return svc.conn.Publish(subject, data) + return svc.pubsub.Publish(msg.Channel, msg) } func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error { @@ -102,7 +91,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic svc.remove(subject, c.Token()) }() - obs, err := NewObserver(subject, c, svc.conn) + obs, err := NewObserver(subject, c, svc.pubsub) if err != nil { c.Cancel() return err @@ -126,20 +115,20 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi return svc.remove(subject, token) } -func (svc *adapterService) put(endpoint, token string, o Observer) error { +func (svc *adapterService) put(topic, token string, o Observer) error { svc.obsLock.Lock() defer svc.obsLock.Unlock() - obs, ok := svc.observers[endpoint] - // If there are no observers, create map and assign it to the endpoint. + obs, ok := svc.observers[topic] + // If there are no observers, create map and assign it to the topic. if !ok { obs = observers{token: o} - svc.observers[endpoint] = obs + svc.observers[topic] = obs return nil } // If observer exists, cancel subscription and replace it. if sub, ok := obs[token]; ok { - if err := sub.Cancel(); err != nil { + if err := sub.Cancel(topic); err != nil { return errors.Wrap(ErrUnsubscribe, err) } } @@ -147,23 +136,23 @@ func (svc *adapterService) put(endpoint, token string, o Observer) error { return nil } -func (svc *adapterService) remove(endpoint, token string) error { +func (svc *adapterService) remove(topic, token string) error { svc.obsLock.Lock() defer svc.obsLock.Unlock() - obs, ok := svc.observers[endpoint] + obs, ok := svc.observers[topic] if !ok { return nil } if current, ok := obs[token]; ok { - if err := current.Cancel(); err != nil { + if err := current.Cancel(topic); err != nil { return errors.Wrap(ErrUnsubscribe, err) } } delete(obs, token) // If there are no observers left for the endpint, remove the map. if len(obs) == 0 { - delete(svc.observers, endpoint) + delete(svc.observers, topic) } return nil } diff --git a/coap/observer.go b/coap/observer.go index 4dc2424c..9550c6f1 100644 --- a/coap/observer.go +++ b/coap/observer.go @@ -4,43 +4,35 @@ package coap import ( - "github.com/gogo/protobuf/proto" - "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/nats" broker "github.com/nats-io/nats.go" ) // Observer represents an internal observer used to handle CoAP observe messages. type Observer interface { - Cancel() error + Cancel(topic string) error } // NewObserver returns a new Observer instance. -func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) { - sub, err := conn.Subscribe(subject, func(m *broker.Msg) { - var msg messaging.Message - if err := proto.Unmarshal(m.Data, &msg); err != nil { - return - } - // There is no error handling, but the client takes care to log the error. - c.SendMessage(msg) - }) +func NewObserver(subject string, c Client, pubsub nats.PubSub) (Observer, error) { + err := pubsub.Subscribe(subject, c.SendMessage) if err != nil { return nil, err } ret := &observer{ client: c, - sub: sub, + pubsub: pubsub, } return ret, nil } type observer struct { client Client - sub *broker.Subscription + pubsub nats.PubSub } -func (o *observer) Cancel() error { - if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed { +func (o *observer) Cancel(topic string) error { + if err := o.pubsub.Unsubscribe(topic); err != nil && err != broker.ErrConnectionClosed { return err } return o.client.Cancel()