diff --git a/Software/Development/Language/Go/Package/Mqtt/Paho/Golang_Using_Paho_Mqtt.md b/Software/Development/Language/Go/Package/Mqtt/Paho/Golang_Using_Paho_Mqtt.md new file mode 100644 index 0000000..15ef60d --- /dev/null +++ b/Software/Development/Language/Go/Package/Mqtt/Paho/Golang_Using_Paho_Mqtt.md @@ -0,0 +1,200 @@ +# Golang: Using Paho Mqtt + +In this post we I’ll be creating a shell to publish messages on a particular topic using Mosquitto mqtt broker, and another application to subscribe to a topic and print incoming messages on the terminal. + +For this we need to have mosquitto and mosquitto-server installed. On Fedora you can install it with following command + +```bash +sudo dnf install mosquitto mosquitto-server +``` + +To install go package for mqtt + +```bash +go get github.com/eclipse/paho.mqtt.golang +``` + +First we will create a tool to publish messages on a given topic. + +First import the package. + +```go +import "github.com/eclipse/paho.mqtt.golang" +``` + +Then we need to create a new MQTT client. + +```go +// set the protocol, ip and port of the broker. +opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + +// set the id to the client. +opts.SetClientID("Device-pub") + +// create a new client. +c := MQTT.NewClient(opts) +``` + +Connect to the broker and obtain the token. + +```go +token := c.Connect(); +``` + +Once the client is connected to the broker, you can now publish messages on a particular topic. + +```go +message := "hello this is the trial message" +c.Publish("some_topic", 0, false, message) +``` + +Once that is done, subscribe to a particular topic. + +```go +c.Subscribe("some_topic", 0, nil); +``` + +You can now recieve/listen to the messages published on the topic named some_topic. + +Now let’s build a tool to subscribe to a topic and recieve the messages published on that topic. + +```go +package main + +import ( + "fmt" + //import the Paho Go MQTT library + MQTT "github.com/eclipse/paho.mqtt.golang" + "os" + "strings" + "time" +) + +var flag bool = false + +//var wcount int = 0 + +//define a function for the default message handler +var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + topic := msg.Topic() + payload := msg.Payload() + if strings.Compare(string(payload), "\n") > 0 { + fmt.Printf("TOPIC: %s\n", topic) + fmt.Printf("MSG: %s\n", payload) + } + + if strings.Compare("bye\n", string(payload)) == 0 { + fmt.Println("exitting") + flag = true + } +} +func main() { + //create a ClientOptions struct setting the broker address, clientid, turn + //off trace output and set the default message handler + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + opts.SetClientID("Device-sub") + opts.SetDefaultPublishHandler(f) + + //create and start a client using the above ClientOptions + c := MQTT.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + + //subscribe to the topic /go-mqtt/sample and request messages to be delivered + //at a maximum qos of zero, wait for the receipt to confirm the subscription + if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + for flag == false { + time.Sleep(1 * time.Second) + //fmt.Println("waiting: ", wcount) + //wcount += 1 + } + + //unsubscribe from /go-mqtt/sample + if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + c.Disconnect(250) +} +``` + +Now that we have built a tool to publish messages over mqtt, lets build a tool to receive messages. + +```go +package main + +import ( + "bufio" + "fmt" + MQTT "github.com/eclipse/paho.mqtt.golang" + "os" + "strings" + "time" +) + +func main(){ + //create a ClientOptions struct setting the broker address, clientid, turn + //off trace output and set the default message handler + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + opts.SetClientID("Device-pub") + + //create and start a client using the above ClientOptions + c := MQTT.NewClient(opts) + + //we are going to try connecting for max 10 times to the server if the connection fails. + for i := 0; i < 10; i++ { + if token := c.Connect(); token.Wait() && token.Error() == nil { + break + } else { + fmt.Println(token.Error()) + time.Sleep(1 * time.Second) + } + } + + //subscribe to the topic /go-mqtt/sample and request messages to be delivered + //at a maximum qos of zero, wait for the receipt to confirm the subscription + //same thing needs to go here as well. + if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + // this is the shell where we will take input from the user and publish the message on the topic until user enters `bye`. + + for { + var message string + fmt.Print(">> ") + // create a new bffer reader. + reader := bufio.NewReader(os.Stdin) + // read a string. + message, err := reader.ReadString('\n') + if err != nil { + fmt.Println(err) + } + if strings.Compare(message, "\n") > 0 { + // if there is a message, publish it. + token := c.Publish("some_topic", 0, false, message) + if strings.Compare(message, "bye\n") == 0 { + // if message == "bye" then exit the shell. + break + } + token.Wait() + } + } + + //unsubscribe from /go-mqtt/sample + if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + c.Disconnect(250) + +} +```