NOISSUE - Add measuring time from pub to sub (#839)
* refactor code Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * connect each thing with each channel Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * revert some names Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * move meausuring time start Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * improve sync between pub and sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add random payload Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * revert changes for config.toml Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add random payload Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * remove printfs Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add logging Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add payload Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add payload Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * rename variable Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add payload Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * small changes Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * refactor sync Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * refactor sync Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * refactor results Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * change sync and result collecting for sub Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * fix comments Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
This commit is contained in:
parent
115d94bd1b
commit
97327ab05f
|
@ -5,6 +5,7 @@ package bench
|
|||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/cisco/senml"
|
||||
)
|
||||
|
||||
// Keep struct names exported, otherwise Viper unmarshaling won't work
|
||||
|
@ -22,10 +24,11 @@ type mqttBrokerConfig struct {
|
|||
}
|
||||
|
||||
type mqttMessageConfig struct {
|
||||
Size int `toml:"size" mapstructure:"size"`
|
||||
Format string `toml:"format" mapstructure:"format"`
|
||||
QoS int `toml:"qos" mapstructure:"qos"`
|
||||
Retain bool `toml:"retain" mapstructure:"retain"`
|
||||
Size int `toml:"size" mapstructure:"size"`
|
||||
Payload string `toml:"payload" mapstructure:"payload"`
|
||||
Format string `toml:"format" mapstructure:"format"`
|
||||
QoS int `toml:"qos" mapstructure:"qos"`
|
||||
Retain bool `toml:"retain" mapstructure:"retain"`
|
||||
}
|
||||
|
||||
type mqttTLSConfig struct {
|
||||
|
@ -70,6 +73,12 @@ type mainflux struct {
|
|||
Channels []mfChannel `toml:"channels" mapstructure:"channels"`
|
||||
}
|
||||
|
||||
type testMsg struct {
|
||||
ClientID string
|
||||
Sent float64
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
// Config struct holds benchmark configuration
|
||||
type Config struct {
|
||||
MQTT mqttConfig `toml:"mqtt" mapstructure:"mqtt"`
|
||||
|
@ -90,7 +99,7 @@ func Benchmark(cfg Config) {
|
|||
var err error
|
||||
|
||||
checkConnection(cfg.MQTT.Broker.URL, 1)
|
||||
subTimes := make(subTimes)
|
||||
var subsResults map[string](*[]float64)
|
||||
var caByte []byte
|
||||
if cfg.MQTT.TLS.MTLS {
|
||||
caFile, err := os.Open(cfg.MQTT.TLS.CA)
|
||||
|
@ -98,26 +107,40 @@ func Benchmark(cfg Config) {
|
|||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
caByte, _ = ioutil.ReadAll(caFile)
|
||||
}
|
||||
|
||||
payload := string(make([]byte, cfg.MQTT.Message.Size))
|
||||
|
||||
mf := mainflux{}
|
||||
if _, err := toml.DecodeFile(cfg.Mf.ConnFile, &mf); err != nil {
|
||||
log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile)
|
||||
}
|
||||
|
||||
resCh := make(chan *runResults)
|
||||
done := make(chan bool)
|
||||
donePub := make(chan bool)
|
||||
finishedPub := make(chan bool)
|
||||
finishedSub := make(chan bool)
|
||||
|
||||
resR := make(chan *map[string](*[]float64))
|
||||
startStamp := time.Now()
|
||||
|
||||
n := len(mf.Channels)
|
||||
var cert tls.Certificate
|
||||
|
||||
var msg *senml.SenML
|
||||
getPload := getBytePayload
|
||||
|
||||
if len(cfg.MQTT.Message.Payload) > 0 {
|
||||
m := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload)
|
||||
msg = &m
|
||||
getPload = getSenMLPayload
|
||||
}
|
||||
|
||||
getSenML := func() *senml.SenML {
|
||||
return msg
|
||||
}
|
||||
// Subscribers
|
||||
for i := 0; i < cfg.Test.Subs; i++ {
|
||||
mfChann := mf.Channels[i%n]
|
||||
mfChan := mf.Channels[i%n]
|
||||
mfThing := mf.Things[i%n]
|
||||
|
||||
if cfg.MQTT.TLS.MTLS {
|
||||
|
@ -132,7 +155,7 @@ func Benchmark(cfg Config) {
|
|||
BrokerURL: cfg.MQTT.Broker.URL,
|
||||
BrokerUser: mfThing.ThingID,
|
||||
BrokerPass: mfThing.ThingKey,
|
||||
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
|
||||
MsgTopic: getTopic(mfChan.ChannelID, startStamp),
|
||||
MsgSize: cfg.MQTT.Message.Size,
|
||||
MsgCount: cfg.Test.Count,
|
||||
MsgQoS: byte(cfg.MQTT.Message.QoS),
|
||||
|
@ -142,20 +165,21 @@ func Benchmark(cfg Config) {
|
|||
CA: caByte,
|
||||
ClientCert: cert,
|
||||
Retain: cfg.MQTT.Message.Retain,
|
||||
Message: payload,
|
||||
GetSenML: getSenML,
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go c.runSubscriber(&wg, &subTimes, &done)
|
||||
go c.runSubscriber(&wg, cfg.Test.Count*cfg.Test.Pubs, &donePub, &resR)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
start := time.Now()
|
||||
// Publishers
|
||||
start := time.Now()
|
||||
|
||||
for i := 0; i < cfg.Test.Pubs; i++ {
|
||||
mfChann := mf.Channels[i%n]
|
||||
mfChan := mf.Channels[i%n]
|
||||
mfThing := mf.Things[i%n]
|
||||
|
||||
if cfg.MQTT.TLS.MTLS {
|
||||
|
@ -170,7 +194,7 @@ func Benchmark(cfg Config) {
|
|||
BrokerURL: cfg.MQTT.Broker.URL,
|
||||
BrokerUser: mfThing.ThingID,
|
||||
BrokerPass: mfThing.ThingKey,
|
||||
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
|
||||
MsgTopic: getTopic(mfChan.ChannelID, startStamp),
|
||||
MsgSize: cfg.MQTT.Message.Size,
|
||||
MsgCount: cfg.Test.Count,
|
||||
MsgQoS: byte(cfg.MQTT.Message.QoS),
|
||||
|
@ -180,7 +204,8 @@ func Benchmark(cfg Config) {
|
|||
CA: caByte,
|
||||
ClientCert: cert,
|
||||
Retain: cfg.MQTT.Message.Retain,
|
||||
Message: payload,
|
||||
Message: getPload,
|
||||
GetSenML: getSenML,
|
||||
}
|
||||
|
||||
go c.runPublisher(resCh)
|
||||
|
@ -191,13 +216,43 @@ func Benchmark(cfg Config) {
|
|||
if cfg.Test.Pubs > 0 {
|
||||
results = make([]*runResults, cfg.Test.Pubs)
|
||||
}
|
||||
// Wait for publishers to be don
|
||||
go func() {
|
||||
for i := 0; i < cfg.Test.Pubs; i++ {
|
||||
select {
|
||||
case result := <-resCh:
|
||||
{
|
||||
results[i] = result
|
||||
}
|
||||
}
|
||||
}
|
||||
finishedPub <- true
|
||||
}()
|
||||
|
||||
for i := 0; i < cfg.Test.Pubs; i++ {
|
||||
results[i] = <-resCh
|
||||
go func() {
|
||||
for i := 0; i < cfg.Test.Subs; i++ {
|
||||
select {
|
||||
case r := <-resR:
|
||||
{
|
||||
for k, v := range *r {
|
||||
subsResults[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finishedSub <- true
|
||||
}()
|
||||
|
||||
<-finishedPub
|
||||
// Send signal to subscribers that all the publishers are done
|
||||
for i := 0; i < cfg.Test.Subs; i++ {
|
||||
donePub <- true
|
||||
}
|
||||
|
||||
<-finishedSub
|
||||
|
||||
totalTime := time.Now().Sub(start)
|
||||
totals := calculateTotalResults(results, totalTime, &subTimes)
|
||||
totals := calculateTotalResults(results, totalTime, subsResults)
|
||||
if totals == nil {
|
||||
return
|
||||
}
|
||||
|
@ -205,3 +260,93 @@ func Benchmark(cfg Config) {
|
|||
// Print sats
|
||||
printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet)
|
||||
}
|
||||
|
||||
func getSenMLTimeStamp() senml.SenMLRecord {
|
||||
t := (float64)(time.Now().UnixNano())
|
||||
timeStamp := senml.SenMLRecord{
|
||||
BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57",
|
||||
Value: &t,
|
||||
}
|
||||
return timeStamp
|
||||
}
|
||||
|
||||
func buildSenML(sz int, payload string) senml.SenML {
|
||||
timeStamp := getSenMLTimeStamp()
|
||||
|
||||
tsByte, err := json.Marshal(timeStamp)
|
||||
if err != nil || len(payload) == 0 {
|
||||
log.Fatalf("Failed to create test message")
|
||||
}
|
||||
|
||||
sml := senml.SenMLRecord{}
|
||||
err = json.Unmarshal([]byte(payload), &sml)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot unmarshal payload")
|
||||
}
|
||||
|
||||
msgByte, err := json.Marshal(sml)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create test message")
|
||||
}
|
||||
|
||||
// How many records to make message long sz bytes
|
||||
n := (sz-len(tsByte))/len(msgByte) + 1
|
||||
if sz < len(tsByte) {
|
||||
n = 1
|
||||
}
|
||||
|
||||
records := make([]senml.SenMLRecord, n)
|
||||
records[0] = timeStamp
|
||||
for i := 1; i < n; i++ {
|
||||
// Timestamp for each record when saving to db
|
||||
sml.Time = float64(time.Now().UnixNano())
|
||||
records[i] = sml
|
||||
}
|
||||
|
||||
s := senml.SenML{
|
||||
Records: records,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func getBytePayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) {
|
||||
|
||||
msg := testMsg{}
|
||||
msg.ClientID = cid
|
||||
msg.Sent = time
|
||||
|
||||
tsByte, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create test message")
|
||||
}
|
||||
|
||||
// TODO - Need to sort this out
|
||||
m := 500 - len(tsByte)
|
||||
if m < 0 {
|
||||
return tsByte, nil
|
||||
}
|
||||
add := make([]byte, m)
|
||||
msg.Payload = add
|
||||
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func getSenMLPayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) {
|
||||
s := *getSenML()
|
||||
s.Records[0].Value = &time
|
||||
s.Records[0].BaseName = cid
|
||||
payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
func getTopic(ch string, start time.Time) string {
|
||||
return fmt.Sprintf("channels/%s/messages/%d/test", ch, start.UnixNano())
|
||||
}
|
||||
|
|
|
@ -10,11 +10,13 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cisco/senml"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
mat "gonum.org/v1/gonum/mat"
|
||||
stat "gonum.org/v1/gonum/stat"
|
||||
|
@ -27,7 +29,8 @@ type Client struct {
|
|||
BrokerUser string
|
||||
BrokerPass string
|
||||
MsgTopic string
|
||||
Message string
|
||||
Message func(cid string, time float64, f func() *senml.SenML) ([]byte, error)
|
||||
GetSenML func() *senml.SenML
|
||||
MsgSize int
|
||||
MsgCount int
|
||||
MsgQoS byte
|
||||
|
@ -65,28 +68,29 @@ func (c *Client) runPublisher(r chan *runResults) {
|
|||
doneGen := make(chan bool)
|
||||
donePub := make(chan bool)
|
||||
runResults := new(runResults)
|
||||
|
||||
started := time.Now()
|
||||
Inf := float64(math.Inf(+1))
|
||||
var diff float64
|
||||
|
||||
// Start generator
|
||||
go c.generate(newMsgs, doneGen)
|
||||
|
||||
started := time.Now()
|
||||
// Start publisher
|
||||
go c.publish(newMsgs, pubMsgs, doneGen, donePub)
|
||||
|
||||
times := []float64{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case m := <-pubMsgs:
|
||||
cid := m.ID
|
||||
if m.Error {
|
||||
runResults.Failures++
|
||||
diff = Inf
|
||||
} else {
|
||||
runResults.Successes++
|
||||
runResults.ID = cid
|
||||
times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds
|
||||
diff = float64(m.Delivered.Sub(m.Sent).Nanoseconds() / 1000) // in microseconds
|
||||
}
|
||||
runResults.ID = cid
|
||||
times = append(times, diff)
|
||||
case <-donePub:
|
||||
// Calculate results
|
||||
duration := time.Now().Sub(started)
|
||||
|
@ -106,11 +110,8 @@ func (c *Client) runPublisher(r chan *runResults) {
|
|||
}
|
||||
|
||||
// Subscriber
|
||||
func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) {
|
||||
defer wg.Done()
|
||||
|
||||
// Start subscriber
|
||||
c.subscribe(wg, subTimes, done)
|
||||
func (c *Client) runSubscriber(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) {
|
||||
c.subscribe(wg, tot, donePub, res)
|
||||
}
|
||||
|
||||
func (c *Client) generate(ch chan *message, done chan bool) {
|
||||
|
@ -127,36 +128,71 @@ func (c *Client) generate(ch chan *message, done chan bool) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) {
|
||||
func (c *Client) subscribe(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) {
|
||||
clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID)
|
||||
c.ID = clientID
|
||||
subsResults := make(map[string](*[]float64), 1)
|
||||
i := 1
|
||||
a := []float64{}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-*donePub:
|
||||
time.Sleep(2 * time.Second)
|
||||
subsResults[c.MsgTopic] = &a
|
||||
*res <- &subsResults
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
onConnected := func(client mqtt.Client) {
|
||||
wg.Done()
|
||||
if !c.Quiet {
|
||||
log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL)
|
||||
}
|
||||
}
|
||||
|
||||
connLost := func(client mqtt.Client, reason error) {
|
||||
log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error())
|
||||
}
|
||||
c.connect(onConnected, connLost)
|
||||
if c.connect(onConnected, connLost) != nil {
|
||||
wg.Done()
|
||||
log.Printf("Client %v failed connecting to the broker\n", c.ID)
|
||||
}
|
||||
|
||||
token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) {
|
||||
mp := messagePayload{}
|
||||
err := json.Unmarshal(msg.Payload(), &mp)
|
||||
if err != nil {
|
||||
log.Printf("Client %s failed to decode message\n", clientID)
|
||||
}
|
||||
})
|
||||
|
||||
arrival := float64(time.Now().UnixNano())
|
||||
var timeSent float64
|
||||
|
||||
if c.GetSenML() != nil {
|
||||
mp, err := senml.Decode(msg.Payload(), senml.JSON)
|
||||
if err != nil && !c.Quiet {
|
||||
log.Printf("Failed to decode message %s\n", err.Error())
|
||||
}
|
||||
timeSent = *mp.Records[0].Value
|
||||
} else {
|
||||
tst := testMsg{}
|
||||
json.Unmarshal(msg.Payload(), &tst)
|
||||
timeSent = tst.Sent
|
||||
}
|
||||
|
||||
a = append(a, (arrival - timeSent))
|
||||
i++
|
||||
if i == tot {
|
||||
subsResults[c.MsgTopic] = &a
|
||||
*res <- &subsResults
|
||||
}
|
||||
|
||||
})
|
||||
token.Wait()
|
||||
}
|
||||
|
||||
func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) {
|
||||
clientID := fmt.Sprintf("pub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID)
|
||||
c.ID = clientID
|
||||
ctr := 0
|
||||
ctr := 1
|
||||
onConnected := func(client mqtt.Client) {
|
||||
if !c.Quiet {
|
||||
log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL)
|
||||
|
@ -166,9 +202,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan
|
|||
case m := <-in:
|
||||
m.Sent = time.Now()
|
||||
m.ID = clientID
|
||||
m.Payload.Sent = m.Sent
|
||||
|
||||
pload, err := json.Marshal(m.Payload)
|
||||
pload, err := c.Message(m.ID, float64(m.Sent.UnixNano()), c.GetSenML)
|
||||
if err != nil {
|
||||
log.Printf("Failed to marshal payload - %s", err.Error())
|
||||
}
|
||||
|
@ -202,6 +236,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan
|
|||
if ctr < c.MsgCount {
|
||||
flushMessages := make([]message, c.MsgCount-ctr)
|
||||
for _, m := range flushMessages {
|
||||
m.Error = true
|
||||
out <- &m
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +244,12 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan
|
|||
}
|
||||
|
||||
if c.connect(onConnected, connLost) != nil {
|
||||
out <- &message{}
|
||||
log.Printf("Failed to connect %s\n", c.ID)
|
||||
flushMessages := make([]message, c.MsgCount-ctr)
|
||||
for _, m := range flushMessages {
|
||||
m.Error = true
|
||||
out <- &m
|
||||
}
|
||||
donePub <- true
|
||||
}
|
||||
|
||||
|
@ -269,10 +309,11 @@ func checkConnection(broker string, timeoutSecs int) {
|
|||
host := strings.Trim(s[1], "/")
|
||||
port := s[2]
|
||||
|
||||
log.Println("Testing connection...")
|
||||
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", host, port), time.Duration(timeoutSecs)*time.Second)
|
||||
conClose := func() {
|
||||
if conn != nil {
|
||||
log.Println("Closing connection...")
|
||||
log.Println("Closing testing connection...")
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ package main
|
|||
import (
|
||||
"log"
|
||||
|
||||
"github.com/mainflux/mainflux/tools/mqtt-bench"
|
||||
bench "github.com/mainflux/mainflux/tools/mqtt-bench"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
@ -45,6 +45,7 @@ Complete documentation is available at https://mainflux.readthedocs.io`,
|
|||
|
||||
// MQTT Message
|
||||
rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.Size, "size", "z", 100, "Size of message payload bytes")
|
||||
rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Payload, "payload", "l", "", "Template message")
|
||||
rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Format, "format", "f", "text", "Output format: text|json")
|
||||
rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.QoS, "qos", "q", 0, "QoS for published messages, values 0 1 2")
|
||||
rootCmd.PersistentFlags().BoolVarP(&bconf.MQTT.Message.Retain, "retain", "r", false, "Retain mqtt messages")
|
||||
|
|
|
@ -30,7 +30,7 @@ type runResults struct {
|
|||
MsgsPerSec float64 `json:"msgs_per_sec"`
|
||||
}
|
||||
|
||||
type subTimes map[string][]float64
|
||||
type subsResults map[string](*[]float64)
|
||||
|
||||
type totalResults struct {
|
||||
Ratio float64 `json:"ratio"`
|
||||
|
@ -50,12 +50,11 @@ type totalResults struct {
|
|||
AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"`
|
||||
}
|
||||
|
||||
func calculateTotalResults(results []*runResults, totalTime time.Duration, subTimes *subTimes) *totalResults {
|
||||
func calculateTotalResults(results []*runResults, totalTime time.Duration, sr subsResults) *totalResults {
|
||||
if results == nil || len(results) < 1 {
|
||||
return nil
|
||||
}
|
||||
totals := new(totalResults)
|
||||
subTimeRunResults := runResults{}
|
||||
msgTimeMeans := make([]float64, len(results))
|
||||
msgTimeMeansDelivered := make([]float64, len(results))
|
||||
msgsPerSecs := make([]float64, len(results))
|
||||
|
@ -66,19 +65,6 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi
|
|||
|
||||
totals.MsgTimeMin = results[0].MsgTimeMin
|
||||
for i, res := range results {
|
||||
if len(*subTimes) > 0 {
|
||||
times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID])
|
||||
|
||||
subTimeRunResults.MsgTimeMin = mat.Min(times)
|
||||
subTimeRunResults.MsgTimeMax = mat.Max(times)
|
||||
subTimeRunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil)
|
||||
subTimeRunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil)
|
||||
|
||||
}
|
||||
res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin
|
||||
res.MsgDelTimeMax = subTimeRunResults.MsgTimeMax
|
||||
res.MsgDelTimeMean = subTimeRunResults.MsgTimeMean
|
||||
res.MsgDelTimeStd = subTimeRunResults.MsgTimeStd
|
||||
|
||||
totals.Successes += res.Successes
|
||||
totals.Failures += res.Failures
|
||||
|
@ -92,21 +78,29 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi
|
|||
totals.MsgTimeMax = res.MsgTimeMax
|
||||
}
|
||||
|
||||
if subTimeRunResults.MsgTimeMin < totals.MsgDelTimeMin {
|
||||
totals.MsgDelTimeMin = subTimeRunResults.MsgTimeMin
|
||||
if res.MsgDelTimeMin < totals.MsgDelTimeMin {
|
||||
totals.MsgDelTimeMin = res.MsgDelTimeMin
|
||||
}
|
||||
|
||||
if subTimeRunResults.MsgTimeMax > totals.MsgDelTimeMax {
|
||||
totals.MsgDelTimeMax = subTimeRunResults.MsgTimeMax
|
||||
if res.MsgDelTimeMax > totals.MsgDelTimeMax {
|
||||
totals.MsgDelTimeMax = res.MsgDelTimeMax
|
||||
}
|
||||
|
||||
msgTimeMeansDelivered[i] = subTimeRunResults.MsgTimeMean
|
||||
msgTimeMeansDelivered[i] = res.MsgDelTimeMean
|
||||
msgTimeMeans[i] = res.MsgTimeMean
|
||||
msgsPerSecs[i] = res.MsgsPerSec
|
||||
runTimes[i] = res.RunTime
|
||||
bws[i] = res.MsgsPerSec
|
||||
}
|
||||
|
||||
for _, v := range sr {
|
||||
times := mat.NewDense(1, len(*v), *v)
|
||||
totals.MsgDelTimeMin = mat.Min(times) / 1000
|
||||
totals.MsgDelTimeMax = mat.Max(times) / 1000
|
||||
totals.MsgDelTimeMeanAvg = stat.Mean(*v, nil) / 1000
|
||||
totals.MsgDelTimeMeanStd = stat.StdDev(*v, nil) / 1000
|
||||
}
|
||||
|
||||
totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures)
|
||||
totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil)
|
||||
totals.AvgRunTime = stat.Mean(runTimes, nil)
|
||||
|
@ -142,7 +136,12 @@ func printResults(results []*runResults, totals *totalResults, format string, qu
|
|||
fmt.Printf("Msg time min (us): %.3f\n", res.MsgTimeMin)
|
||||
fmt.Printf("Msg time max (us): %.3f\n", res.MsgTimeMax)
|
||||
fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean)
|
||||
fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd)
|
||||
fmt.Printf("Msg time std (us): %.3f\n\n", res.MsgTimeStd)
|
||||
|
||||
fmt.Printf("Msg del time min (us): %.3f\n", res.MsgDelTimeMin)
|
||||
fmt.Printf("Msg del time max (us): %.3f\n", res.MsgDelTimeMax)
|
||||
fmt.Printf("Msg del time mean (us): %.3f\n", res.MsgDelTimeMean)
|
||||
fmt.Printf("Msg del time std (us): %.3f\n", res.MsgDelTimeStd)
|
||||
|
||||
fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec)
|
||||
}
|
||||
|
@ -156,6 +155,11 @@ func printResults(results []*runResults, totals *totalResults, format string, qu
|
|||
fmt.Printf("Msg time mean mean (us): %.3f\n", totals.MsgTimeMeanAvg)
|
||||
fmt.Printf("Msg time mean std (us): %.3f\n", totals.MsgTimeMeanStd)
|
||||
|
||||
fmt.Printf("Msg del time min (us): %.3f\n", totals.MsgDelTimeMin)
|
||||
fmt.Printf("Msg del time max (us): %.3f\n", totals.MsgDelTimeMax)
|
||||
fmt.Printf("Msg del time mean (us): %.3f\n", totals.MsgDelTimeMeanAvg)
|
||||
fmt.Printf("Msg del time std (us): %.3f\n", totals.MsgDelTimeMeanStd)
|
||||
|
||||
fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec)
|
||||
fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = false
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = false
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = true
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = false
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = true
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = true
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = true
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = false
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = true
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = true
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
format = "text"
|
||||
qos = 2
|
||||
retain = true
|
||||
payload = ""
|
||||
|
||||
[mqtt.tls]
|
||||
mtls = false
|
||||
|
|
|
@ -65,7 +65,7 @@ func migrateDB(db *sqlx.DB) error {
|
|||
bool_value BOOL,
|
||||
data_value TEXT,
|
||||
value_sum FLOAT,
|
||||
time FlOAT,
|
||||
time FLOAT,
|
||||
update_time FLOAT,
|
||||
link TEXT,
|
||||
PRIMARY KEY (id)
|
||||
|
|
Loading…
Reference in New Issue