Add some new MQTT adaptor functions with QOS

#644
This commit is contained in:
Trevor Rosen 2019-01-02 18:12:38 -06:00 committed by Ron Evans
parent 79de57d475
commit 013817c24c
1 changed files with 34 additions and 6 deletions

View File

@ -9,6 +9,12 @@ import (
paho "github.com/eclipse/paho.mqtt.golang"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
)
var (
// ErrNilClient is returned when a client action can't be taken because the struct has no client
ErrNilClient = errors.New("no MQTT client available")
)
// Message is a message received from the broker.
@ -131,21 +137,43 @@ func (a *Adaptor) Finalize() (err error) {
// Publish a message under a specific topic
func (a *Adaptor) Publish(topic string, message []byte) bool {
if a.client == nil {
_, err := a.PublishWithQOS(topic, a.qos, message)
if err != nil {
return false
}
a.client.Publish(topic, byte(a.qos), false, message)
return true
}
// PublishWithQOS allows per-publish QOS values to be set and returns a poken.Token
func (a *Adaptor) PublishWithQOS(topic string, qos int, message []byte) (paho.Token, error) {
if a.client == nil {
return nil, ErrNilClient
}
token := a.client.Publish(topic, byte(qos), false, message)
return token, nil
}
// OnWithQOS allows per-subscribe QOS values to be set and returns a paho.Token
func (a *Adaptor) OnWithQOS(event string, qos int, f func(msg Message)) (paho.Token, error) {
if a.client == nil {
return nil, ErrNilClient
}
token := a.client.Subscribe(event, byte(qos), func(client paho.Client, msg paho.Message) {
f(msg)
})
return token, nil
}
// On subscribes to a topic, and then calls the message handler function when data is received
func (a *Adaptor) On(event string, f func(msg Message)) bool {
if a.client == nil {
_, err := a.OnWithQOS(event, a.qos, f)
if err != nil {
return false
}
a.client.Subscribe(event, byte(a.qos), func(client paho.Client, msg paho.Message) {
f(msg)
})
return true
}