NOISSUE - Fix CoAP adapter (#1572)

* Revert "NOISSUE - Add nats wrapper for COAP (#1569)"

This reverts commit cc5d5195ab.

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix CoAP adapter

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Update CoAP observation cancel

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix observe

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix GET handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Revert authorization

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Use constants instead of magic numbers

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove an empty line

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Extract special observe value to constant

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
This commit is contained in:
Dušan Borovčanin 2022-03-30 16:52:10 +02:00 committed by GitHub
parent ce46723a8f
commit c6f7c69798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 137 additions and 84 deletions

View File

@ -20,8 +20,8 @@ import (
"github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api" "github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger" logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
gocoap "github.com/plgd-dev/go-coap/v2" gocoap "github.com/plgd-dev/go-coap/v2"
stdprometheus "github.com/prometheus/client_golang/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus"
@ -77,14 +77,13 @@ func main() {
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
pubsub, err := nats.NewPubSub(cfg.natsURL, "coap", logger) nc, err := broker.Connect(cfg.natsURL)
if err != nil { if err != nil {
log.Fatalf(err.Error()) log.Fatalf(err.Error())
} }
defer nc.Close()
defer pubsub.Close() svc := coap.New(tc, nc)
svc := coap.New(tc, pubsub)
svc = api.LoggingMiddleware(svc, logger) svc = api.LoggingMiddleware(svc, logger)

View File

@ -11,8 +11,9 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging/nats" broker "github.com/nats-io/nats.go"
"github.com/mainflux/mainflux" "github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging"
@ -41,16 +42,16 @@ var _ Service = (*adapterService)(nil)
// Observers is a map of maps, // Observers is a map of maps,
type adapterService struct { type adapterService struct {
auth mainflux.ThingsServiceClient auth mainflux.ThingsServiceClient
pubsub nats.PubSub conn *broker.Conn
observers map[string]observers observers map[string]observers
obsLock sync.Mutex obsLock sync.Mutex
} }
// New instantiates the CoAP adapter implementation. // New instantiates the CoAP adapter implementation.
func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service { func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service {
as := &adapterService{ as := &adapterService{
auth: auth, auth: auth,
pubsub: pubsub, conn: nc,
observers: make(map[string]observers), observers: make(map[string]observers),
obsLock: sync.Mutex{}, obsLock: sync.Mutex{},
} }
@ -69,7 +70,17 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin
} }
msg.Publisher = thid.GetValue() msg.Publisher = thid.GetValue()
return svc.pubsub.Publish(msg.Channel, msg) data, err := proto.Marshal(&msg)
if err != nil {
return err
}
subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
return svc.conn.Publish(subject, data)
} }
func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error { func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
@ -86,12 +97,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
subject = fmt.Sprintf("%s.%s", subject, subtopic) subject = fmt.Sprintf("%s.%s", subject, subtopic)
} }
go func() { obs, err := NewObserver(subject, c, svc.conn)
<-c.Done()
svc.remove(subject, c.Token())
}()
obs, err := NewObserver(subject, c, svc.pubsub)
if err != nil { if err != nil {
c.Cancel() c.Cancel()
return err return err
@ -128,7 +134,7 @@ func (svc *adapterService) put(topic, token string, o Observer) error {
} }
// If observer exists, cancel subscription and replace it. // If observer exists, cancel subscription and replace it.
if sub, ok := obs[token]; ok { if sub, ok := obs[token]; ok {
if err := sub.Cancel(topic); err != nil { if err := sub.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err) return errors.Wrap(ErrUnsubscribe, err)
} }
} }
@ -139,13 +145,12 @@ func (svc *adapterService) put(topic, token string, o Observer) error {
func (svc *adapterService) remove(topic, token string) error { func (svc *adapterService) remove(topic, token string) error {
svc.obsLock.Lock() svc.obsLock.Lock()
defer svc.obsLock.Unlock() defer svc.obsLock.Unlock()
obs, ok := svc.observers[topic] obs, ok := svc.observers[topic]
if !ok { if !ok {
return nil return nil
} }
if current, ok := obs[token]; ok { if current, ok := obs[token]; ok {
if err := current.Cancel(topic); err != nil { if err := current.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err) return errors.Wrap(ErrUnsubscribe, err)
} }
} }

View File

@ -61,11 +61,20 @@ func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopi
return lm.svc.Subscribe(ctx, key, chanID, subtopic, c) return lm.svc.Subscribe(ctx, key, chanID, subtopic, c)
} }
func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) (err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s and subtopic %s took %s to complete without errors.", token, chanID, subtopic, time.Since(begin)) destChannel := chanID
lm.logger.Info(fmt.Sprintf(message)) if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s took %s to complete", token, destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now()) }(time.Now())
return lm.svc.Unsubscribe(ctx, key, chanID, subtopic, token) return lm.svc.Unsubscribe(ctx, key, chanID, subtopic, token)
} }

View File

@ -27,13 +27,22 @@ import (
) )
const ( const (
protocol = "coap" protocol = "coap"
authQuery = "auth" authQuery = "auth"
startObserve = 0 // observe option value that indicates start of observation
) )
var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
var errMalformedSubtopic = errors.New("malformed subtopic") const (
numGroups = 3 // entire expression + channel group + subtopic group
channelGroup = 2 // channel group is second in channel regexp
)
var (
errMalformedSubtopic = errors.New("malformed subtopic")
errBadOptions = errors.New("bad options")
)
var ( var (
logger log.Logger logger log.Logger
@ -70,73 +79,79 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
Context: m.Context, Context: m.Context,
Options: make(message.Options, 0, 16), Options: make(message.Options, 0, 16),
} }
defer sendResp(w, &resp)
if m.Options == nil {
logger.Warn("Nil options")
resp.Code = codes.BadOption
return
}
msg, err := decodeMessage(m) msg, err := decodeMessage(m)
if err != nil { if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err)) logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
resp.Code = codes.BadRequest resp.Code = codes.BadRequest
sendResp(w, &resp)
return return
} }
key, err := parseKey(m) key, err := parseKey(m)
if err != nil { if err != nil {
logger.Warn(fmt.Sprintf("Error parsing auth: %s", err)) logger.Warn(fmt.Sprintf("Error parsing auth: %s", err))
resp.Code = codes.Unauthorized resp.Code = codes.Unauthorized
sendResp(w, &resp)
return return
} }
switch m.Code { switch m.Code {
case codes.GET: case codes.GET:
var obs uint32 err = handleGet(m, w.Client(), msg, key)
obs, err = m.Options.Observe()
if err != nil {
resp.Code = codes.BadOption
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return
}
if obs == 0 {
c := coap.NewClient(w.Client(), m.Token, logger)
err = service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c)
break
}
service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String())
case codes.POST: case codes.POST:
err = service.Publish(context.Background(), key, msg) err = service.Publish(context.Background(), key, msg)
default: default:
resp.Code = codes.NotFound err = errors.ErrNotFound
return
} }
if err != nil { if err != nil {
switch { switch {
case errors.Contains(err, errors.ErrAuthorization): case err == errBadOptions:
resp.Code = codes.BadOption
case err == errors.ErrNotFound:
resp.Code = codes.NotFound
case errors.Contains(err, errors.ErrAuthorization),
errors.Contains(err, errors.ErrAuthentication):
resp.Code = codes.Unauthorized resp.Code = codes.Unauthorized
return default:
case errors.Contains(err, coap.ErrUnsubscribe):
resp.Code = codes.InternalServerError resp.Code = codes.InternalServerError
} }
sendResp(w, &resp)
} }
} }
func handleGet(m *mux.Message, c mux.Client, msg messaging.Message, key string) error {
var obs uint32
obs, err := m.Options.Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
return service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c)
}
return service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String())
}
func decodeMessage(msg *mux.Message) (messaging.Message, error) { func decodeMessage(msg *mux.Message) (messaging.Message, error) {
if msg.Options == nil {
return messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path() path, err := msg.Options.Path()
if err != nil { if err != nil {
return messaging.Message{}, err return messaging.Message{}, err
} }
channelParts := channelPartRegExp.FindStringSubmatch(path) channelParts := channelPartRegExp.FindStringSubmatch(path)
if len(channelParts) < 2 { if len(channelParts) < numGroups {
return messaging.Message{}, errMalformedSubtopic return messaging.Message{}, errMalformedSubtopic
} }
st, err := parseSubtopic(channelParts[2]) st, err := parseSubtopic(channelParts[channelGroup])
if err != nil { if err != nil {
return messaging.Message{}, err return messaging.Message{}, err
} }
ret := messaging.Message{ ret := messaging.Message{
Protocol: protocol, Protocol: protocol,
Channel: parseID(path), Channel: channelParts[1],
Subtopic: st, Subtopic: st,
Payload: []byte{}, Payload: []byte{},
Created: time.Now().UnixNano(), Created: time.Now().UnixNano(),
@ -152,15 +167,10 @@ func decodeMessage(msg *mux.Message) (messaging.Message, error) {
return ret, nil return ret, nil
} }
func parseID(path string) string {
vars := strings.Split(path, "/")
if len(vars) > 1 {
return vars[1]
}
return ""
}
func parseKey(msg *mux.Message) (string, error) { func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery) authKey, err := msg.Options.GetString(message.URIQuery)
if err != nil { if err != nil {
return "", err return "", err

View File

@ -5,7 +5,9 @@ package coap
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"sync/atomic"
"github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/errors"
@ -30,25 +32,36 @@ type observers map[string]Observer
var ErrOption = errors.New("unable to set option") var ErrOption = errors.New("unable to set option")
type client struct { type client struct {
client mux.Client client mux.Client
token message.Token token message.Token
logger logger.Logger observe uint32
logger logger.Logger
} }
// NewClient instantiates a new Observer. // NewClient instantiates a new Observer.
func NewClient(mc mux.Client, token message.Token, l logger.Logger) Client { func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client {
return &client{ return &client{
client: mc, client: c,
token: token, token: tkn,
logger: l, logger: l,
observe: 0,
} }
} }
func (c *client) Done() <-chan struct{} { func (c *client) Done() <-chan struct{} {
return c.client.Context().Done() return c.client.Done()
} }
func (c *client) Cancel() error { func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close() return c.client.Close()
} }
@ -63,22 +76,29 @@ func (c *client) SendMessage(msg messaging.Message) error {
Context: c.client.Context(), Context: c.client.Context(),
Body: bytes.NewReader(msg.Payload), Body: bytes.NewReader(msg.Payload),
} }
atomic.AddUint32(&c.observe, 1)
var opts message.Options var opts message.Options
var buff []byte var buff []byte
opts, n, err := opts.SetContentFormat(buff, message.TextPlain) opts, n, err := opts.SetContentFormat(buff, message.TextPlain)
if err == message.ErrTooSmall { if err == message.ErrTooSmall {
buff = append(buff, make([]byte, n)...) buff = append(buff, make([]byte, n)...)
opts, n, err = opts.SetContentFormat(buff, message.TextPlain) _, _, err = opts.SetContentFormat(buff, message.TextPlain)
} }
if err != nil { if err != nil {
c.logger.Error(fmt.Sprintf("Can't set content format: %s.", err)) c.logger.Error(fmt.Sprintf("Can't set content format: %s.", err))
return errors.Wrap(ErrOption, err) return errors.Wrap(ErrOption, err)
} }
m.Options = opts opts = append(opts, message.Option{ID: message.Observe, Value: []byte{byte(c.observe)}})
if err := c.client.WriteMessage(&m); err != nil { opts, n, err = opts.SetObserve(buff, uint32(c.observe))
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err)) if err == message.ErrTooSmall {
return err buff = append(buff, make([]byte, n)...)
opts, _, err = opts.SetObserve(buff, uint32(c.observe))
} }
return nil if err != nil {
return fmt.Errorf("cannot set options to response: %w", err)
}
m.Options = opts
return c.client.WriteMessage(&m)
} }

View File

@ -4,35 +4,43 @@
package coap package coap
import ( import (
"github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/messaging"
broker "github.com/nats-io/nats.go" broker "github.com/nats-io/nats.go"
) )
// Observer represents an internal observer used to handle CoAP observe messages. // Observer represents an internal observer used to handle CoAP observe messages.
type Observer interface { type Observer interface {
Cancel(topic string) error Cancel() error
} }
// NewObserver returns a new Observer instance. // NewObserver returns a new Observer instance.
func NewObserver(subject string, c Client, pubsub nats.PubSub) (Observer, error) { func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) {
err := pubsub.Subscribe(subject, c.SendMessage) sub, err := conn.Subscribe(subject, func(m *broker.Msg) {
var msg messaging.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
return
}
// There is no error handling, but the client takes care to log the error.
c.SendMessage(msg)
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
ret := &observer{ ret := &observer{
client: c, client: c,
pubsub: pubsub, sub: sub,
} }
return ret, nil return ret, nil
} }
type observer struct { type observer struct {
client Client client Client
pubsub nats.PubSub sub *broker.Subscription
} }
func (o *observer) Cancel(topic string) error { func (o *observer) Cancel() error {
if err := o.pubsub.Unsubscribe(topic); err != nil && err != broker.ErrConnectionClosed { if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed {
return err return err
} }
return o.client.Cancel() return o.client.Cancel()

View File

@ -105,6 +105,8 @@ services:
- ./nats/:/etc/nats - ./nats/:/etc/nats
networks: networks:
- mainflux-base-net - mainflux-base-net
ports:
- 4222:4222
auth-db: auth-db:
image: postgres:13.3-alpine image: postgres:13.3-alpine