From 97327ab05fc4426569843c36077f581029291741 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 6 Sep 2019 12:59:35 +0000 Subject: [PATCH] NOISSUE - Add measuring time from pub to sub (#839) * refactor code Signed-off-by: Mirko Teodorovic * connect each thing with each channel Signed-off-by: Mirko Teodorovic * reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic * reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic * revert some names Signed-off-by: Mirko Teodorovic * move meausuring time start Signed-off-by: Mirko Teodorovic * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * improve sync between pub and sub Signed-off-by: Mirko Teodorovic * add random payload Signed-off-by: Mirko Teodorovic * revert changes for config.toml Signed-off-by: Mirko Teodorovic * add random payload Signed-off-by: Mirko Teodorovic * remove printfs Signed-off-by: Mirko Teodorovic * add logging Signed-off-by: Mirko Teodorovic * add payload Signed-off-by: Mirko Teodorovic * add payload Signed-off-by: Mirko Teodorovic * rename variable Signed-off-by: Mirko Teodorovic * add payload Signed-off-by: Mirko Teodorovic * small changes Signed-off-by: Mirko Teodorovic * refactor sync Signed-off-by: Mirko Teodorovic * refactor sync Signed-off-by: Mirko Teodorovic * refactor results Signed-off-by: Mirko Teodorovic * change sync and result collecting for sub Signed-off-by: Mirko Teodorovic * fix comments Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 185 ++++++++++++++++-- tools/mqtt-bench/client.go | 95 ++++++--- tools/mqtt-bench/cmd/main.go | 3 +- tools/mqtt-bench/results.go | 48 ++--- tools/mqtt-bench/templates/config.toml | 1 + tools/mqtt-bench/templates/fanin.toml | 1 + tools/mqtt-bench/templates/fanout-mtls.toml | 1 + tools/mqtt-bench/templates/fanout.toml | 1 + .../mqtt-bench/templates/subscribe-mtls.toml | 1 + tools/mqtt-bench/templates/subscribe.toml | 1 + writers/postgres/init.go | 2 +- 11 files changed, 268 insertions(+), 71 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 69e064aa..fdc411e5 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -5,6 +5,7 @@ package bench import ( "crypto/tls" + "encoding/json" "fmt" "io/ioutil" "log" @@ -14,6 +15,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/cisco/senml" ) // Keep struct names exported, otherwise Viper unmarshaling won't work @@ -22,10 +24,11 @@ type mqttBrokerConfig struct { } type mqttMessageConfig struct { - Size int `toml:"size" mapstructure:"size"` - Format string `toml:"format" mapstructure:"format"` - QoS int `toml:"qos" mapstructure:"qos"` - Retain bool `toml:"retain" mapstructure:"retain"` + Size int `toml:"size" mapstructure:"size"` + Payload string `toml:"payload" mapstructure:"payload"` + Format string `toml:"format" mapstructure:"format"` + QoS int `toml:"qos" mapstructure:"qos"` + Retain bool `toml:"retain" mapstructure:"retain"` } type mqttTLSConfig struct { @@ -70,6 +73,12 @@ type mainflux struct { Channels []mfChannel `toml:"channels" mapstructure:"channels"` } +type testMsg struct { + ClientID string + Sent float64 + Payload []byte +} + // Config struct holds benchmark configuration type Config struct { MQTT mqttConfig `toml:"mqtt" mapstructure:"mqtt"` @@ -90,7 +99,7 @@ func Benchmark(cfg Config) { var err error checkConnection(cfg.MQTT.Broker.URL, 1) - subTimes := make(subTimes) + var subsResults map[string](*[]float64) var caByte []byte if cfg.MQTT.TLS.MTLS { caFile, err := os.Open(cfg.MQTT.TLS.CA) @@ -98,26 +107,40 @@ func Benchmark(cfg Config) { if err != nil { fmt.Println(err) } - caByte, _ = ioutil.ReadAll(caFile) } - payload := string(make([]byte, cfg.MQTT.Message.Size)) - mf := mainflux{} if _, err := toml.DecodeFile(cfg.Mf.ConnFile, &mf); err != nil { log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile) } resCh := make(chan *runResults) - done := make(chan bool) + donePub := make(chan bool) + finishedPub := make(chan bool) + finishedSub := make(chan bool) + + resR := make(chan *map[string](*[]float64)) + startStamp := time.Now() n := len(mf.Channels) var cert tls.Certificate + var msg *senml.SenML + getPload := getBytePayload + + if len(cfg.MQTT.Message.Payload) > 0 { + m := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + msg = &m + getPload = getSenMLPayload + } + + getSenML := func() *senml.SenML { + return msg + } // Subscribers for i := 0; i < cfg.Test.Subs; i++ { - mfChann := mf.Channels[i%n] + mfChan := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -132,7 +155,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID), + MsgTopic: getTopic(mfChan.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -142,20 +165,21 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: payload, + GetSenML: getSenML, } wg.Add(1) - go c.runSubscriber(&wg, &subTimes, &done) + go c.runSubscriber(&wg, cfg.Test.Count*cfg.Test.Pubs, &donePub, &resR) } wg.Wait() - start := time.Now() // Publishers + start := time.Now() + for i := 0; i < cfg.Test.Pubs; i++ { - mfChann := mf.Channels[i%n] + mfChan := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -170,7 +194,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID), + MsgTopic: getTopic(mfChan.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -180,7 +204,8 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: payload, + Message: getPload, + GetSenML: getSenML, } go c.runPublisher(resCh) @@ -191,13 +216,43 @@ func Benchmark(cfg Config) { if cfg.Test.Pubs > 0 { results = make([]*runResults, cfg.Test.Pubs) } + // Wait for publishers to be don + go func() { + for i := 0; i < cfg.Test.Pubs; i++ { + select { + case result := <-resCh: + { + results[i] = result + } + } + } + finishedPub <- true + }() - for i := 0; i < cfg.Test.Pubs; i++ { - results[i] = <-resCh + go func() { + for i := 0; i < cfg.Test.Subs; i++ { + select { + case r := <-resR: + { + for k, v := range *r { + subsResults[k] = v + } + } + } + } + finishedSub <- true + }() + + <-finishedPub + // Send signal to subscribers that all the publishers are done + for i := 0; i < cfg.Test.Subs; i++ { + donePub <- true } + <-finishedSub + totalTime := time.Now().Sub(start) - totals := calculateTotalResults(results, totalTime, &subTimes) + totals := calculateTotalResults(results, totalTime, subsResults) if totals == nil { return } @@ -205,3 +260,93 @@ func Benchmark(cfg Config) { // Print sats printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet) } + +func getSenMLTimeStamp() senml.SenMLRecord { + t := (float64)(time.Now().UnixNano()) + timeStamp := senml.SenMLRecord{ + BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57", + Value: &t, + } + return timeStamp +} + +func buildSenML(sz int, payload string) senml.SenML { + timeStamp := getSenMLTimeStamp() + + tsByte, err := json.Marshal(timeStamp) + if err != nil || len(payload) == 0 { + log.Fatalf("Failed to create test message") + } + + sml := senml.SenMLRecord{} + err = json.Unmarshal([]byte(payload), &sml) + if err != nil { + log.Fatalf("Cannot unmarshal payload") + } + + msgByte, err := json.Marshal(sml) + if err != nil { + log.Fatalf("Failed to create test message") + } + + // How many records to make message long sz bytes + n := (sz-len(tsByte))/len(msgByte) + 1 + if sz < len(tsByte) { + n = 1 + } + + records := make([]senml.SenMLRecord, n) + records[0] = timeStamp + for i := 1; i < n; i++ { + // Timestamp for each record when saving to db + sml.Time = float64(time.Now().UnixNano()) + records[i] = sml + } + + s := senml.SenML{ + Records: records, + } + + return s +} + +func getBytePayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) { + + msg := testMsg{} + msg.ClientID = cid + msg.Sent = time + + tsByte, err := json.Marshal(msg) + if err != nil { + log.Fatalf("Failed to create test message") + } + + // TODO - Need to sort this out + m := 500 - len(tsByte) + if m < 0 { + return tsByte, nil + } + add := make([]byte, m) + msg.Payload = add + + b, err := json.Marshal(msg) + if err != nil { + return nil, err + } + return b, nil +} + +func getSenMLPayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) { + s := *getSenML() + s.Records[0].Value = &time + s.Records[0].BaseName = cid + payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{}) + if err != nil { + return nil, err + } + return payload, nil +} + +func getTopic(ch string, start time.Time) string { + return fmt.Sprintf("channels/%s/messages/%d/test", ch, start.UnixNano()) +} diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index f8b1fcbc..d118b4e0 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -10,11 +10,13 @@ import ( "encoding/json" "fmt" "log" + "math" "net" "strings" "sync" "time" + "github.com/cisco/senml" mqtt "github.com/eclipse/paho.mqtt.golang" mat "gonum.org/v1/gonum/mat" stat "gonum.org/v1/gonum/stat" @@ -27,7 +29,8 @@ type Client struct { BrokerUser string BrokerPass string MsgTopic string - Message string + Message func(cid string, time float64, f func() *senml.SenML) ([]byte, error) + GetSenML func() *senml.SenML MsgSize int MsgCount int MsgQoS byte @@ -65,28 +68,29 @@ func (c *Client) runPublisher(r chan *runResults) { doneGen := make(chan bool) donePub := make(chan bool) runResults := new(runResults) - - started := time.Now() + Inf := float64(math.Inf(+1)) + var diff float64 // Start generator go c.generate(newMsgs, doneGen) + started := time.Now() // Start publisher go c.publish(newMsgs, pubMsgs, doneGen, donePub) - times := []float64{} - for { select { case m := <-pubMsgs: cid := m.ID if m.Error { runResults.Failures++ + diff = Inf } else { runResults.Successes++ - runResults.ID = cid - times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds + diff = float64(m.Delivered.Sub(m.Sent).Nanoseconds() / 1000) // in microseconds } + runResults.ID = cid + times = append(times, diff) case <-donePub: // Calculate results duration := time.Now().Sub(started) @@ -106,11 +110,8 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) { - defer wg.Done() - - // Start subscriber - c.subscribe(wg, subTimes, done) +func (c *Client) runSubscriber(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) { + c.subscribe(wg, tot, donePub, res) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -127,36 +128,71 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) { +func (c *Client) subscribe(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) { clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID + subsResults := make(map[string](*[]float64), 1) + i := 1 + a := []float64{} + + go func() { + for { + select { + case <-*donePub: + time.Sleep(2 * time.Second) + subsResults[c.MsgTopic] = &a + *res <- &subsResults + return + } + } + }() onConnected := func(client mqtt.Client) { + wg.Done() if !c.Quiet { log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL) } } - connLost := func(client mqtt.Client, reason error) { log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error()) } - c.connect(onConnected, connLost) + if c.connect(onConnected, connLost) != nil { + wg.Done() + log.Printf("Client %v failed connecting to the broker\n", c.ID) + } token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { - mp := messagePayload{} - err := json.Unmarshal(msg.Payload(), &mp) - if err != nil { - log.Printf("Client %s failed to decode message\n", clientID) - } - }) + arrival := float64(time.Now().UnixNano()) + var timeSent float64 + + if c.GetSenML() != nil { + mp, err := senml.Decode(msg.Payload(), senml.JSON) + if err != nil && !c.Quiet { + log.Printf("Failed to decode message %s\n", err.Error()) + } + timeSent = *mp.Records[0].Value + } else { + tst := testMsg{} + json.Unmarshal(msg.Payload(), &tst) + timeSent = tst.Sent + } + + a = append(a, (arrival - timeSent)) + i++ + if i == tot { + subsResults[c.MsgTopic] = &a + *res <- &subsResults + } + + }) token.Wait() } func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) { clientID := fmt.Sprintf("pub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID - ctr := 0 + ctr := 1 onConnected := func(client mqtt.Client) { if !c.Quiet { log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL) @@ -166,9 +202,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan case m := <-in: m.Sent = time.Now() m.ID = clientID - m.Payload.Sent = m.Sent - - pload, err := json.Marshal(m.Payload) + pload, err := c.Message(m.ID, float64(m.Sent.UnixNano()), c.GetSenML) if err != nil { log.Printf("Failed to marshal payload - %s", err.Error()) } @@ -202,6 +236,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan if ctr < c.MsgCount { flushMessages := make([]message, c.MsgCount-ctr) for _, m := range flushMessages { + m.Error = true out <- &m } } @@ -209,7 +244,12 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan } if c.connect(onConnected, connLost) != nil { - out <- &message{} + log.Printf("Failed to connect %s\n", c.ID) + flushMessages := make([]message, c.MsgCount-ctr) + for _, m := range flushMessages { + m.Error = true + out <- &m + } donePub <- true } @@ -269,10 +309,11 @@ func checkConnection(broker string, timeoutSecs int) { host := strings.Trim(s[1], "/") port := s[2] + log.Println("Testing connection...") conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", host, port), time.Duration(timeoutSecs)*time.Second) conClose := func() { if conn != nil { - log.Println("Closing connection...") + log.Println("Closing testing connection...") conn.Close() } } diff --git a/tools/mqtt-bench/cmd/main.go b/tools/mqtt-bench/cmd/main.go index 22d50ed3..2c676466 100644 --- a/tools/mqtt-bench/cmd/main.go +++ b/tools/mqtt-bench/cmd/main.go @@ -6,7 +6,7 @@ package main import ( "log" - "github.com/mainflux/mainflux/tools/mqtt-bench" + bench "github.com/mainflux/mainflux/tools/mqtt-bench" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -45,6 +45,7 @@ Complete documentation is available at https://mainflux.readthedocs.io`, // MQTT Message rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.Size, "size", "z", 100, "Size of message payload bytes") + rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Payload, "payload", "l", "", "Template message") rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Format, "format", "f", "text", "Output format: text|json") rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.QoS, "qos", "q", 0, "QoS for published messages, values 0 1 2") rootCmd.PersistentFlags().BoolVarP(&bconf.MQTT.Message.Retain, "retain", "r", false, "Retain mqtt messages") diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 86a49e24..f9dfbe24 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -30,7 +30,7 @@ type runResults struct { MsgsPerSec float64 `json:"msgs_per_sec"` } -type subTimes map[string][]float64 +type subsResults map[string](*[]float64) type totalResults struct { Ratio float64 `json:"ratio"` @@ -50,12 +50,11 @@ type totalResults struct { AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"` } -func calculateTotalResults(results []*runResults, totalTime time.Duration, subTimes *subTimes) *totalResults { +func calculateTotalResults(results []*runResults, totalTime time.Duration, sr subsResults) *totalResults { if results == nil || len(results) < 1 { return nil } totals := new(totalResults) - subTimeRunResults := runResults{} msgTimeMeans := make([]float64, len(results)) msgTimeMeansDelivered := make([]float64, len(results)) msgsPerSecs := make([]float64, len(results)) @@ -66,19 +65,6 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi totals.MsgTimeMin = results[0].MsgTimeMin for i, res := range results { - if len(*subTimes) > 0 { - times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID]) - - subTimeRunResults.MsgTimeMin = mat.Min(times) - subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) - subTimeRunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) - - } - res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin - res.MsgDelTimeMax = subTimeRunResults.MsgTimeMax - res.MsgDelTimeMean = subTimeRunResults.MsgTimeMean - res.MsgDelTimeStd = subTimeRunResults.MsgTimeStd totals.Successes += res.Successes totals.Failures += res.Failures @@ -92,21 +78,29 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi totals.MsgTimeMax = res.MsgTimeMax } - if subTimeRunResults.MsgTimeMin < totals.MsgDelTimeMin { - totals.MsgDelTimeMin = subTimeRunResults.MsgTimeMin + if res.MsgDelTimeMin < totals.MsgDelTimeMin { + totals.MsgDelTimeMin = res.MsgDelTimeMin } - if subTimeRunResults.MsgTimeMax > totals.MsgDelTimeMax { - totals.MsgDelTimeMax = subTimeRunResults.MsgTimeMax + if res.MsgDelTimeMax > totals.MsgDelTimeMax { + totals.MsgDelTimeMax = res.MsgDelTimeMax } - msgTimeMeansDelivered[i] = subTimeRunResults.MsgTimeMean + msgTimeMeansDelivered[i] = res.MsgDelTimeMean msgTimeMeans[i] = res.MsgTimeMean msgsPerSecs[i] = res.MsgsPerSec runTimes[i] = res.RunTime bws[i] = res.MsgsPerSec } + for _, v := range sr { + times := mat.NewDense(1, len(*v), *v) + totals.MsgDelTimeMin = mat.Min(times) / 1000 + totals.MsgDelTimeMax = mat.Max(times) / 1000 + totals.MsgDelTimeMeanAvg = stat.Mean(*v, nil) / 1000 + totals.MsgDelTimeMeanStd = stat.StdDev(*v, nil) / 1000 + } + totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures) totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil) totals.AvgRunTime = stat.Mean(runTimes, nil) @@ -142,7 +136,12 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time min (us): %.3f\n", res.MsgTimeMin) fmt.Printf("Msg time max (us): %.3f\n", res.MsgTimeMax) fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) - fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) + fmt.Printf("Msg time std (us): %.3f\n\n", res.MsgTimeStd) + + fmt.Printf("Msg del time min (us): %.3f\n", res.MsgDelTimeMin) + fmt.Printf("Msg del time max (us): %.3f\n", res.MsgDelTimeMax) + fmt.Printf("Msg del time mean (us): %.3f\n", res.MsgDelTimeMean) + fmt.Printf("Msg del time std (us): %.3f\n", res.MsgDelTimeStd) fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) } @@ -156,6 +155,11 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time mean mean (us): %.3f\n", totals.MsgTimeMeanAvg) fmt.Printf("Msg time mean std (us): %.3f\n", totals.MsgTimeMeanStd) + fmt.Printf("Msg del time min (us): %.3f\n", totals.MsgDelTimeMin) + fmt.Printf("Msg del time max (us): %.3f\n", totals.MsgDelTimeMax) + fmt.Printf("Msg del time mean (us): %.3f\n", totals.MsgDelTimeMeanAvg) + fmt.Printf("Msg del time std (us): %.3f\n", totals.MsgDelTimeMeanStd) + fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec) fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec) } diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index 11895618..fba30879 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = false + payload = "" [mqtt.tls] mtls = false diff --git a/tools/mqtt-bench/templates/fanin.toml b/tools/mqtt-bench/templates/fanin.toml index 96a307ea..74d45cec 100644 --- a/tools/mqtt-bench/templates/fanin.toml +++ b/tools/mqtt-bench/templates/fanin.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false diff --git a/tools/mqtt-bench/templates/fanout-mtls.toml b/tools/mqtt-bench/templates/fanout-mtls.toml index be50ba79..6725333b 100644 --- a/tools/mqtt-bench/templates/fanout-mtls.toml +++ b/tools/mqtt-bench/templates/fanout-mtls.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = true diff --git a/tools/mqtt-bench/templates/fanout.toml b/tools/mqtt-bench/templates/fanout.toml index 4f78879f..a56a2ac3 100644 --- a/tools/mqtt-bench/templates/fanout.toml +++ b/tools/mqtt-bench/templates/fanout.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false diff --git a/tools/mqtt-bench/templates/subscribe-mtls.toml b/tools/mqtt-bench/templates/subscribe-mtls.toml index 8949577e..400a07c9 100644 --- a/tools/mqtt-bench/templates/subscribe-mtls.toml +++ b/tools/mqtt-bench/templates/subscribe-mtls.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = true diff --git a/tools/mqtt-bench/templates/subscribe.toml b/tools/mqtt-bench/templates/subscribe.toml index 84ccf114..010c3c2e 100644 --- a/tools/mqtt-bench/templates/subscribe.toml +++ b/tools/mqtt-bench/templates/subscribe.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false diff --git a/writers/postgres/init.go b/writers/postgres/init.go index aa1c9715..b40a49bf 100644 --- a/writers/postgres/init.go +++ b/writers/postgres/init.go @@ -65,7 +65,7 @@ func migrateDB(db *sqlx.DB) error { bool_value BOOL, data_value TEXT, value_sum FLOAT, - time FlOAT, + time FLOAT, update_time FLOAT, link TEXT, PRIMARY KEY (id)