NOISSUE - Add nats wrapper for COAP (#1569)
* Add nats wrapper for COAP Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Pass pubsub as argument Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Defer close connection Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Defer close connection Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Rename endpoint to topic Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>
This commit is contained in:
parent
5ca8495f35
commit
cc5d5195ab
|
@ -20,8 +20,8 @@ import (
|
|||
"github.com/mainflux/mainflux/coap"
|
||||
"github.com/mainflux/mainflux/coap/api"
|
||||
logger "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/pkg/messaging/nats"
|
||||
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
|
||||
broker "github.com/nats-io/nats.go"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
gocoap "github.com/plgd-dev/go-coap/v2"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -77,13 +77,14 @@ func main() {
|
|||
|
||||
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
|
||||
|
||||
nc, err := broker.Connect(cfg.natsURL)
|
||||
pubsub, err := nats.NewPubSub(cfg.natsURL, "coap", logger)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
svc := coap.New(tc, nc)
|
||||
defer pubsub.Close()
|
||||
|
||||
svc := coap.New(tc, pubsub)
|
||||
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
|
||||
|
|
|
@ -11,9 +11,8 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/mainflux/mainflux/pkg/errors"
|
||||
broker "github.com/nats-io/nats.go"
|
||||
"github.com/mainflux/mainflux/pkg/messaging/nats"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
|
@ -42,16 +41,16 @@ var _ Service = (*adapterService)(nil)
|
|||
// Observers is a map of maps,
|
||||
type adapterService struct {
|
||||
auth mainflux.ThingsServiceClient
|
||||
conn *broker.Conn
|
||||
pubsub nats.PubSub
|
||||
observers map[string]observers
|
||||
obsLock sync.Mutex
|
||||
}
|
||||
|
||||
// New instantiates the CoAP adapter implementation.
|
||||
func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service {
|
||||
func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service {
|
||||
as := &adapterService{
|
||||
auth: auth,
|
||||
conn: nc,
|
||||
pubsub: pubsub,
|
||||
observers: make(map[string]observers),
|
||||
obsLock: sync.Mutex{},
|
||||
}
|
||||
|
@ -70,17 +69,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin
|
|||
}
|
||||
msg.Publisher = thid.GetValue()
|
||||
|
||||
data, err := proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel)
|
||||
if msg.Subtopic != "" {
|
||||
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
|
||||
}
|
||||
|
||||
return svc.conn.Publish(subject, data)
|
||||
return svc.pubsub.Publish(msg.Channel, msg)
|
||||
}
|
||||
|
||||
func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
|
||||
|
@ -102,7 +91,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
|
|||
svc.remove(subject, c.Token())
|
||||
}()
|
||||
|
||||
obs, err := NewObserver(subject, c, svc.conn)
|
||||
obs, err := NewObserver(subject, c, svc.pubsub)
|
||||
if err != nil {
|
||||
c.Cancel()
|
||||
return err
|
||||
|
@ -126,20 +115,20 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi
|
|||
return svc.remove(subject, token)
|
||||
}
|
||||
|
||||
func (svc *adapterService) put(endpoint, token string, o Observer) error {
|
||||
func (svc *adapterService) put(topic, token string, o Observer) error {
|
||||
svc.obsLock.Lock()
|
||||
defer svc.obsLock.Unlock()
|
||||
|
||||
obs, ok := svc.observers[endpoint]
|
||||
// If there are no observers, create map and assign it to the endpoint.
|
||||
obs, ok := svc.observers[topic]
|
||||
// If there are no observers, create map and assign it to the topic.
|
||||
if !ok {
|
||||
obs = observers{token: o}
|
||||
svc.observers[endpoint] = obs
|
||||
svc.observers[topic] = obs
|
||||
return nil
|
||||
}
|
||||
// If observer exists, cancel subscription and replace it.
|
||||
if sub, ok := obs[token]; ok {
|
||||
if err := sub.Cancel(); err != nil {
|
||||
if err := sub.Cancel(topic); err != nil {
|
||||
return errors.Wrap(ErrUnsubscribe, err)
|
||||
}
|
||||
}
|
||||
|
@ -147,23 +136,23 @@ func (svc *adapterService) put(endpoint, token string, o Observer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (svc *adapterService) remove(endpoint, token string) error {
|
||||
func (svc *adapterService) remove(topic, token string) error {
|
||||
svc.obsLock.Lock()
|
||||
defer svc.obsLock.Unlock()
|
||||
|
||||
obs, ok := svc.observers[endpoint]
|
||||
obs, ok := svc.observers[topic]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if current, ok := obs[token]; ok {
|
||||
if err := current.Cancel(); err != nil {
|
||||
if err := current.Cancel(topic); err != nil {
|
||||
return errors.Wrap(ErrUnsubscribe, err)
|
||||
}
|
||||
}
|
||||
delete(obs, token)
|
||||
// If there are no observers left for the endpint, remove the map.
|
||||
if len(obs) == 0 {
|
||||
delete(svc.observers, endpoint)
|
||||
delete(svc.observers, topic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,43 +4,35 @@
|
|||
package coap
|
||||
|
||||
import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
"github.com/mainflux/mainflux/pkg/messaging/nats"
|
||||
broker "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// Observer represents an internal observer used to handle CoAP observe messages.
|
||||
type Observer interface {
|
||||
Cancel() error
|
||||
Cancel(topic string) error
|
||||
}
|
||||
|
||||
// NewObserver returns a new Observer instance.
|
||||
func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) {
|
||||
sub, err := conn.Subscribe(subject, func(m *broker.Msg) {
|
||||
var msg messaging.Message
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
return
|
||||
}
|
||||
// There is no error handling, but the client takes care to log the error.
|
||||
c.SendMessage(msg)
|
||||
})
|
||||
func NewObserver(subject string, c Client, pubsub nats.PubSub) (Observer, error) {
|
||||
err := pubsub.Subscribe(subject, c.SendMessage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := &observer{
|
||||
client: c,
|
||||
sub: sub,
|
||||
pubsub: pubsub,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
type observer struct {
|
||||
client Client
|
||||
sub *broker.Subscription
|
||||
pubsub nats.PubSub
|
||||
}
|
||||
|
||||
func (o *observer) Cancel() error {
|
||||
if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed {
|
||||
func (o *observer) Cancel(topic string) error {
|
||||
if err := o.pubsub.Unsubscribe(topic); err != nil && err != broker.ErrConnectionClosed {
|
||||
return err
|
||||
}
|
||||
return o.client.Cancel()
|
||||
|
|
Loading…
Reference in New Issue