增加 Golang: Using Paho Mqtt.
Signed-off-by: chen.yang <chen.yang@yuzhen-iot.com>
This commit is contained in:
parent
765ef02a7c
commit
47928007cb
|
@ -0,0 +1,200 @@
|
|||
# Golang: Using Paho Mqtt
|
||||
|
||||
In this post we I’ll be creating a shell to publish messages on a particular topic using Mosquitto mqtt broker, and another application to subscribe to a topic and print incoming messages on the terminal.
|
||||
|
||||
For this we need to have mosquitto and mosquitto-server installed. On Fedora you can install it with following command
|
||||
|
||||
```bash
|
||||
sudo dnf install mosquitto mosquitto-server
|
||||
```
|
||||
|
||||
To install go package for mqtt
|
||||
|
||||
```bash
|
||||
go get github.com/eclipse/paho.mqtt.golang
|
||||
```
|
||||
|
||||
First we will create a tool to publish messages on a given topic.
|
||||
|
||||
First import the package.
|
||||
|
||||
```go
|
||||
import "github.com/eclipse/paho.mqtt.golang"
|
||||
```
|
||||
|
||||
Then we need to create a new MQTT client.
|
||||
|
||||
```go
|
||||
// set the protocol, ip and port of the broker.
|
||||
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
|
||||
|
||||
// set the id to the client.
|
||||
opts.SetClientID("Device-pub")
|
||||
|
||||
// create a new client.
|
||||
c := MQTT.NewClient(opts)
|
||||
```
|
||||
|
||||
Connect to the broker and obtain the token.
|
||||
|
||||
```go
|
||||
token := c.Connect();
|
||||
```
|
||||
|
||||
Once the client is connected to the broker, you can now publish messages on a particular topic.
|
||||
|
||||
```go
|
||||
message := "hello this is the trial message"
|
||||
c.Publish("some_topic", 0, false, message)
|
||||
```
|
||||
|
||||
Once that is done, subscribe to a particular topic.
|
||||
|
||||
```go
|
||||
c.Subscribe("some_topic", 0, nil);
|
||||
```
|
||||
|
||||
You can now recieve/listen to the messages published on the topic named some_topic.
|
||||
|
||||
Now let’s build a tool to subscribe to a topic and recieve the messages published on that topic.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
//import the Paho Go MQTT library
|
||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var flag bool = false
|
||||
|
||||
//var wcount int = 0
|
||||
|
||||
//define a function for the default message handler
|
||||
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
|
||||
topic := msg.Topic()
|
||||
payload := msg.Payload()
|
||||
if strings.Compare(string(payload), "\n") > 0 {
|
||||
fmt.Printf("TOPIC: %s\n", topic)
|
||||
fmt.Printf("MSG: %s\n", payload)
|
||||
}
|
||||
|
||||
if strings.Compare("bye\n", string(payload)) == 0 {
|
||||
fmt.Println("exitting")
|
||||
flag = true
|
||||
}
|
||||
}
|
||||
func main() {
|
||||
//create a ClientOptions struct setting the broker address, clientid, turn
|
||||
//off trace output and set the default message handler
|
||||
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
|
||||
opts.SetClientID("Device-sub")
|
||||
opts.SetDefaultPublishHandler(f)
|
||||
|
||||
//create and start a client using the above ClientOptions
|
||||
c := MQTT.NewClient(opts)
|
||||
if token := c.Connect(); token.Wait() && token.Error() != nil {
|
||||
panic(token.Error())
|
||||
}
|
||||
|
||||
//subscribe to the topic /go-mqtt/sample and request messages to be delivered
|
||||
//at a maximum qos of zero, wait for the receipt to confirm the subscription
|
||||
if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
|
||||
fmt.Println(token.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
for flag == false {
|
||||
time.Sleep(1 * time.Second)
|
||||
//fmt.Println("waiting: ", wcount)
|
||||
//wcount += 1
|
||||
}
|
||||
|
||||
//unsubscribe from /go-mqtt/sample
|
||||
if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
|
||||
fmt.Println(token.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.Disconnect(250)
|
||||
}
|
||||
```
|
||||
|
||||
Now that we have built a tool to publish messages over mqtt, lets build a tool to receive messages.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main(){
|
||||
//create a ClientOptions struct setting the broker address, clientid, turn
|
||||
//off trace output and set the default message handler
|
||||
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
|
||||
opts.SetClientID("Device-pub")
|
||||
|
||||
//create and start a client using the above ClientOptions
|
||||
c := MQTT.NewClient(opts)
|
||||
|
||||
//we are going to try connecting for max 10 times to the server if the connection fails.
|
||||
for i := 0; i < 10; i++ {
|
||||
if token := c.Connect(); token.Wait() && token.Error() == nil {
|
||||
break
|
||||
} else {
|
||||
fmt.Println(token.Error())
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
//subscribe to the topic /go-mqtt/sample and request messages to be delivered
|
||||
//at a maximum qos of zero, wait for the receipt to confirm the subscription
|
||||
//same thing needs to go here as well.
|
||||
if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
|
||||
fmt.Println(token.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// this is the shell where we will take input from the user and publish the message on the topic until user enters `bye`.
|
||||
|
||||
for {
|
||||
var message string
|
||||
fmt.Print(">> ")
|
||||
// create a new bffer reader.
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
// read a string.
|
||||
message, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
if strings.Compare(message, "\n") > 0 {
|
||||
// if there is a message, publish it.
|
||||
token := c.Publish("some_topic", 0, false, message)
|
||||
if strings.Compare(message, "bye\n") == 0 {
|
||||
// if message == "bye" then exit the shell.
|
||||
break
|
||||
}
|
||||
token.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
//unsubscribe from /go-mqtt/sample
|
||||
if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
|
||||
fmt.Println(token.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.Disconnect(250)
|
||||
|
||||
}
|
||||
```
|
Loading…
Reference in New Issue