Mainflux.mainflux/coap/adapter.go

150 lines
3.6 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap
import (
"context"
"errors"
"sync"
"time"
"github.com/mainflux/mainflux"
broker "github.com/nats-io/go-nats"
)
const (
chanID = "id"
keyHeader = "key"
// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
)
var (
errBadOption = errors.New("bad option")
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
)
// Broker represents NATS broker instance.
type Broker interface {
mainflux.MessagePublisher
// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(string, string, string, *Observer) error
}
// Service specifies coap service API.
type Service interface {
Broker
// Unsubscribe method is used to stop observing resource.
Unsubscribe(string)
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
auth mainflux.ThingsServiceClient
pubsub Broker
obs map[string]*Observer
obsLock sync.Mutex
}
// New instantiates the CoAP adapter implementation.
func New(pubsub Broker, auth mainflux.ThingsServiceClient, responses <-chan string) Service {
as := &adapterService{
auth: auth,
pubsub: pubsub,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
}
go as.listenResponses(responses)
return as
}
func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
return val, ok
}
func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
}
svc.obs[obsID] = o
}
func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
}
}
// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses
val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
}
}
func (svc *adapterService) Publish(ctx context.Context, token string, msg mainflux.Message) error {
if err := svc.pubsub.Publish(ctx, token, msg); err != nil {
switch err {
case broker.ErrConnectionClosed, broker.ErrInvalidConnection:
return ErrFailedConnection
default:
return ErrFailedMessagePublish
}
}
return nil
}
func (svc *adapterService) Subscribe(chanID, subtopic, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, subtopic, obsID, o); err != nil {
return ErrFailedSubscription
}
// Put method removes Observer if already exists.
svc.put(obsID, o)
return nil
}
func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
}