diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index 9bab5e48..d2d4c9ef 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -97,7 +97,7 @@ func main() { } -func newService(session *gocql.Session, logger mflog.Logger) consumers.Consumer { +func newService(session *gocql.Session, logger mflog.Logger) consumers.BlockingConsumer { repo := cassandra.New(session) repo = api.LoggingMiddleware(repo, logger) counter, latency := internal.MakeMetrics("cassandra", "message_writer") diff --git a/cmd/influxdb-reader/main.go b/cmd/influxdb-reader/main.go index faf6bf87..e51129a8 100644 --- a/cmd/influxdb-reader/main.go +++ b/cmd/influxdb-reader/main.go @@ -67,6 +67,7 @@ func main() { logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err)) } influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port) + repocfg := influxdb.RepoConfig{ Bucket: influxDBConfig.Bucket, Org: influxDBConfig.Org, diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 6fdd89b4..6e559c2c 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -9,11 +9,9 @@ import ( "log" "os" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/influxdb" - "github.com/mainflux/mainflux/internal" influxDBClient "github.com/mainflux/mainflux/internal/clients/influxdb" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" @@ -62,6 +60,7 @@ func main() { logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err)) } influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port) + repocfg := influxdb.RepoConfig{ Bucket: influxDBConfig.Bucket, Org: influxDBConfig.Org, @@ -73,7 +72,16 @@ func main() { } defer client.Close() - repo := newService(client, repocfg, logger) + repo := influxdb.NewAsync(client, repocfg) + + // Start consuming and logging errors. + go func(log mflog.Logger) { + for err := range repo.Errors() { + if err != nil { + log.Error(err.Error()) + } + } + }(logger) if err := consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err)) @@ -97,11 +105,3 @@ func main() { logger.Error(fmt.Sprintf("InfluxDB reader service terminated: %s", err)) } } - -func newService(client influxdb2.Client, repocfg influxdb.RepoConfig, logger mflog.Logger) consumers.Consumer { - repo := influxdb.New(client, repocfg, true) - repo = api.LoggingMiddleware(repo, logger) - counter, latency := internal.MakeMetrics("influxdb", "message_writer") - repo = api.MetricsMiddleware(repo, counter, latency) - return repo -} diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index c8225696..64319984 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -87,7 +87,7 @@ func main() { } } -func newService(db *mongo.Database, logger mflog.Logger) consumers.Consumer { +func newService(db *mongo.Database, logger mflog.Logger) consumers.BlockingConsumer { repo := mongodb.New(db) repo = api.LoggingMiddleware(repo, logger) counter, latency := internal.MakeMetrics("mongodb", "message_writer") diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 6449c7ec..0b6f436a 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -89,7 +89,7 @@ func main() { } } -func newService(db *sqlx.DB, logger mflog.Logger) consumers.Consumer { +func newService(db *sqlx.DB, logger mflog.Logger) consumers.BlockingConsumer { svc := writerPg.New(db) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics("postgres", "message_writer") diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index 808a4839..9a6296bf 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -89,7 +89,7 @@ func main() { } } -func newService(db *sqlx.DB, logger mflog.Logger) consumers.Consumer { +func newService(db *sqlx.DB, logger mflog.Logger) consumers.BlockingConsumer { svc := timescale.New(db) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics("timescale", "message_writer") diff --git a/consumers/consumer.go b/consumers/consumer.go index 00a359f6..78ca8750 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -3,9 +3,26 @@ package consumers -// Consumer specifies message consuming API. -type Consumer interface { - // Consume method is used to consumed received messages. - // A non-nil error is returned to indicate operation failure. - Consume(messages interface{}) error +// AsyncConsumer specifies a non-blocking message-consuming API, +// which can be used for writing data to the DB, publishing messages +// to broker, sending notifications, or any other asynchronous job. +type AsyncConsumer interface { + // ConsumeAsync method is used to asynchronously consume received messages. + ConsumeAsync(messages interface{}) + + // Errors method returns a channel for reading errors which occur during async writes. + // Must be called before performing any writes for errors to be collected. + // The channel is buffered(1) so it allows only 1 error without blocking if not drained. + // The channel may receive nil error to indicate success. + Errors() <-chan error +} + +// BlockingConsumer specifies a blocking message-consuming API, +// which can be used for writing data to the DB, publishing messages +// to broker, sending notifications... BlockingConsumer implementations +// might also support concurrent use, but consult implementation for more details. +type BlockingConsumer interface { + // ConsumeBlocking method is used to consume received messages synchronously. + // A non-nil error is returned to indicate operation failure. + ConsumeBlocking(messages interface{}) error } diff --git a/consumers/messages.go b/consumers/messages.go index 07248dae..277973ac 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -5,7 +5,6 @@ package consumers import ( "fmt" - "io/ioutil" "os" "strings" @@ -33,7 +32,7 @@ var ( // Start method starts consuming messages received from Message broker. // This method transforms messages to SenML format before // using MessageRepository to store them. -func Start(id string, sub messaging.Subscriber, consumer Consumer, configPath string, logger logger.Logger) error { +func Start(id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error { cfg, err := loadConfig(configPath) if err != nil { logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err)) @@ -42,14 +41,24 @@ func Start(id string, sub messaging.Subscriber, consumer Consumer, configPath st transformer := makeTransformer(cfg.TransformerCfg, logger) for _, subject := range cfg.SubscriberCfg.Subjects { - if err := sub.Subscribe(id, subject, handle(transformer, consumer)); err != nil { - return err + switch c := consumer.(type) { + case AsyncConsumer: + if err := sub.Subscribe(id, subject, handleAsync(transformer, c)); err != nil { + return err + } + case BlockingConsumer: + if err := sub.Subscribe(id, subject, handleSync(transformer, c)); err != nil { + return err + } + default: + return errors.ErrInvalidQueryParams } + } return nil } -func handle(t transformers.Transformer, c Consumer) handleFunc { +func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc { return func(msg *messaging.Message) error { m := interface{}(msg) var err error @@ -59,7 +68,23 @@ func handle(t transformers.Transformer, c Consumer) handleFunc { return err } } - return c.Consume(m) + return sc.ConsumeBlocking(m) + } +} + +func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc { + return func(msg *messaging.Message) error { + m := interface{}(msg) + var err error + if t != nil { + m, err = t.Transform(msg) + if err != nil { + return err + } + } + + ac.ConsumeAsync(m) + return nil } } @@ -100,7 +125,7 @@ func loadConfig(configPath string) (config, error) { }, } - data, err := ioutil.ReadFile(configPath) + data, err := os.ReadFile(configPath) if err != nil { return cfg, errors.Wrap(errOpenConfFile, err) } diff --git a/consumers/notifiers/api/logging.go b/consumers/notifiers/api/logging.go index 2ced56f7..935e4ef8 100644 --- a/consumers/notifiers/api/logging.go +++ b/consumers/notifiers/api/logging.go @@ -78,7 +78,7 @@ func (lm *loggingMiddleware) RemoveSubscription(ctx context.Context, token, id s return lm.svc.RemoveSubscription(ctx, token, id) } -func (lm *loggingMiddleware) Consume(msg interface{}) (err error) { +func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) if err != nil { @@ -88,5 +88,5 @@ func (lm *loggingMiddleware) Consume(msg interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Consume(msg) + return lm.svc.ConsumeBlocking(msg) } diff --git a/consumers/notifiers/api/metrics.go b/consumers/notifiers/api/metrics.go index 0462bca4..efaca7e2 100644 --- a/consumers/notifiers/api/metrics.go +++ b/consumers/notifiers/api/metrics.go @@ -66,11 +66,11 @@ func (ms *metricsMiddleware) RemoveSubscription(ctx context.Context, token, id s return ms.svc.RemoveSubscription(ctx, token, id) } -func (ms *metricsMiddleware) Consume(msg interface{}) error { +func (ms *metricsMiddleware) ConsumeBlocking(msg interface{}) error { defer func(begin time.Time) { ms.counter.With("method", "consume").Add(1) ms.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) }(time.Now()) - return ms.svc.Consume(msg) + return ms.svc.ConsumeBlocking(msg) } diff --git a/consumers/notifiers/service.go b/consumers/notifiers/service.go index c40fb004..32598d96 100644 --- a/consumers/notifiers/service.go +++ b/consumers/notifiers/service.go @@ -18,6 +18,8 @@ var ( ErrMessage = errors.New("failed to convert to Mainflux message") ) +var _ consumers.AsyncConsumer = (*notifierService)(nil) + // Service reprents a notification service. type Service interface { // CreateSubscription persists a subscription. @@ -33,7 +35,7 @@ type Service interface { // RemoveSubscription removes the subscription having the provided identifier. RemoveSubscription(ctx context.Context, token, id string) error - consumers.Consumer + consumers.BlockingConsumer } var _ Service = (*notifierService)(nil) @@ -43,6 +45,7 @@ type notifierService struct { subs SubscriptionsRepository idp mainflux.IDProvider notifier Notifier + errCh chan error from string } @@ -53,6 +56,7 @@ func New(auth mainflux.AuthServiceClient, subs SubscriptionsRepository, idp main subs: subs, idp: idp, notifier: notifier, + errCh: make(chan error, 1), from: from, } } @@ -95,7 +99,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str return ns.subs.Remove(ctx, id) } -func (ns *notifierService) Consume(message interface{}) error { +func (ns *notifierService) ConsumeBlocking(message interface{}) error { msg, ok := message.(*messaging.Message) if !ok { return ErrMessage @@ -127,3 +131,39 @@ func (ns *notifierService) Consume(message interface{}) error { return nil } + +func (ns *notifierService) ConsumeAsync(message interface{}) { + msg, ok := message.(*messaging.Message) + if !ok { + ns.errCh <- ErrMessage + return + } + topic := msg.Channel + if msg.Subtopic != "" { + topic = fmt.Sprintf("%s.%s", msg.Channel, msg.Subtopic) + } + pm := PageMetadata{ + Topic: topic, + Offset: 0, + Limit: -1, + } + page, err := ns.subs.RetrieveAll(context.Background(), pm) + if err != nil { + ns.errCh <- err + return + } + + var to []string + for _, sub := range page.Subscriptions { + to = append(to, sub.Contact) + } + if len(to) > 0 { + if err := ns.notifier.Notify(ns.from, to, msg); err != nil { + ns.errCh <- errors.Wrap(ErrNotify, err) + } + } +} + +func (ns *notifierService) Errors() <-chan error { + return ns.errCh +} diff --git a/consumers/notifiers/service_test.go b/consumers/notifiers/service_test.go index 9a786205..fff44d64 100644 --- a/consumers/notifiers/service_test.go +++ b/consumers/notifiers/service_test.go @@ -322,7 +322,7 @@ func TestConsume(t *testing.T) { } for _, tc := range cases { - err := svc.Consume(tc.msg) + err := svc.ConsumeBlocking(tc.msg) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } diff --git a/consumers/writers/api/logging.go b/consumers/writers/api/logging.go index 39bddb07..20a3296a 100644 --- a/consumers/writers/api/logging.go +++ b/consumers/writers/api/logging.go @@ -13,22 +13,22 @@ import ( log "github.com/mainflux/mainflux/logger" ) -var _ consumers.Consumer = (*loggingMiddleware)(nil) +var _ consumers.BlockingConsumer = (*loggingMiddleware)(nil) type loggingMiddleware struct { logger log.Logger - consumer consumers.Consumer + consumer consumers.BlockingConsumer } // LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer { +func LoggingMiddleware(consumer consumers.BlockingConsumer, logger log.Logger) consumers.BlockingConsumer { return &loggingMiddleware{ logger: logger, consumer: consumer, } } -func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) { +func (lm *loggingMiddleware) ConsumeBlocking(msgs interface{}) (err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) if err != nil { @@ -38,5 +38,5 @@ func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.consumer.Consume(msgs) + return lm.consumer.ConsumeBlocking(msgs) } diff --git a/consumers/writers/api/metrics.go b/consumers/writers/api/metrics.go index cbc09c5d..505d980f 100644 --- a/consumers/writers/api/metrics.go +++ b/consumers/writers/api/metrics.go @@ -12,17 +12,17 @@ import ( "github.com/mainflux/mainflux/consumers" ) -var _ consumers.Consumer = (*metricsMiddleware)(nil) +var _ consumers.BlockingConsumer = (*metricsMiddleware)(nil) type metricsMiddleware struct { counter metrics.Counter latency metrics.Histogram - consumer consumers.Consumer + consumer consumers.BlockingConsumer } // MetricsMiddleware returns new message repository // with Save method wrapped to expose metrics. -func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer { +func MetricsMiddleware(consumer consumers.BlockingConsumer, counter metrics.Counter, latency metrics.Histogram) consumers.BlockingConsumer { return &metricsMiddleware{ counter: counter, latency: latency, @@ -30,10 +30,10 @@ func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, lat } } -func (mm *metricsMiddleware) Consume(msgs interface{}) error { +func (mm *metricsMiddleware) ConsumeBlocking(msgs interface{}) error { defer func(begin time.Time) { mm.counter.With("method", "consume").Add(1) mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.consumer.Consume(msgs) + return mm.consumer.ConsumeBlocking(msgs) } diff --git a/consumers/writers/cassandra/consumer.go b/consumers/writers/cassandra/consumer.go index da8451bb..37263a7f 100644 --- a/consumers/writers/cassandra/consumer.go +++ b/consumers/writers/cassandra/consumer.go @@ -18,18 +18,18 @@ var ( errSaveMessage = errors.New("failed to save message to cassandra database") errNoTable = errors.New("table does not exist") ) -var _ consumers.Consumer = (*cassandraRepository)(nil) +var _ consumers.BlockingConsumer = (*cassandraRepository)(nil) type cassandraRepository struct { session *gocql.Session } // New instantiates Cassandra message repository. -func New(session *gocql.Session) consumers.Consumer { +func New(session *gocql.Session) consumers.BlockingConsumer { return &cassandraRepository{session} } -func (cr *cassandraRepository) Consume(message interface{}) error { +func (cr *cassandraRepository) ConsumeBlocking(message interface{}) error { switch m := message.(type) { case mfjson.Messages: return cr.saveJSON(m) diff --git a/consumers/writers/cassandra/consumer_test.go b/consumers/writers/cassandra/consumer_test.go index 91f7ca8f..7561fdf6 100644 --- a/consumers/writers/cassandra/consumer_test.go +++ b/consumers/writers/cassandra/consumer_test.go @@ -72,7 +72,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } @@ -116,6 +116,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/influxdb/consumer.go b/consumers/writers/influxdb/consumer.go index b80de920..c787d722 100644 --- a/consumers/writers/influxdb/consumer.go +++ b/consumers/writers/influxdb/consumer.go @@ -22,7 +22,8 @@ const senmlPoints = "messages" var errSaveMessage = errors.New("failed to save message to influxdb database") -var _ consumers.Consumer = (*influxRepo)(nil) +var _ consumers.AsyncConsumer = (*influxRepo)(nil) +var _ consumers.BlockingConsumer = (*influxRepo)(nil) type RepoConfig struct { Bucket string @@ -32,19 +33,76 @@ type RepoConfig struct { type influxRepo struct { client influxdb2.Client cfg RepoConfig + errCh chan error + writeAPI api.WriteAPI writeAPIBlocking api.WriteAPIBlocking } -// New returns new InfluxDB writer. -func New(client influxdb2.Client, config RepoConfig, async bool) consumers.Consumer { +// NewSync returns new InfluxDB writer. +func NewSync(client influxdb2.Client, config RepoConfig) consumers.BlockingConsumer { return &influxRepo{ client: client, cfg: config, + writeAPI: nil, writeAPIBlocking: client.WriteAPIBlocking(config.Org, config.Bucket), } } -func (repo *influxRepo) Consume(message interface{}) error { +func NewAsync(client influxdb2.Client, config RepoConfig) consumers.AsyncConsumer { + return &influxRepo{ + client: client, + cfg: config, + errCh: make(chan error, 1), + writeAPI: client.WriteAPI(config.Org, config.Bucket), + writeAPIBlocking: nil, + } +} + +func (repo *influxRepo) ConsumeAsync(message interface{}) { + var err error + var pts []*write.Point + switch m := message.(type) { + case json.Messages: + pts, err = repo.jsonPoints(m) + default: + pts, err = repo.senmlPoints(m) + } + if err != nil { + repo.errCh <- err + return + } + + done := make(chan bool) + defer close(done) + + go func(done <-chan bool) { + for { + select { + case err := <-repo.writeAPI.Errors(): + repo.errCh <- err + case <-done: + repo.errCh <- nil // pass nil error to the error channel + return + } + } + }(done) + + for _, pt := range pts { + repo.writeAPI.WritePoint(pt) + } + + repo.writeAPI.Flush() +} + +func (repo *influxRepo) Errors() <-chan error { + if repo.errCh != nil { + return repo.errCh + } + + return nil +} + +func (repo *influxRepo) ConsumeBlocking(message interface{}) error { var err error var pts []*write.Point switch m := message.(type) { diff --git a/consumers/writers/influxdb/consumer_test.go b/consumers/writers/influxdb/consumer_test.go index 5b7371c5..2e505332 100644 --- a/consumers/writers/influxdb/consumer_test.go +++ b/consumers/writers/influxdb/consumer_test.go @@ -117,182 +117,359 @@ func queryDB(fluxQuery string) (int, error) { return rowCount, nil } -func TestSaveSenml(t *testing.T) { - for i := 0; i < 2; i++ { - // Testing both async and sync - repo := writer.New(client, repoCfg, i == 0) +func TestAsyncSaveSenml(t *testing.T) { + asyncRepo := writer.NewAsync(client, repoCfg) - cases := []struct { - desc string - msgsNum int - expectedSize int - }{ - { - desc: "save a single message", - msgsNum: 1, - expectedSize: 1, - }, - { - desc: "save a batch of messages", - msgsNum: streamsSize, - expectedSize: streamsSize, - }, - } - - for _, tc := range cases { - err := resetBucket() - assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) - now := time.Now().UnixNano() - var msgs []senml.Message - - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) - for i := 0; i < tc.msgsNum; i++ { - msg := senml.Message{ - Channel: chanID, - Publisher: pubID, - Protocol: "http", - Name: "test name", - Unit: "km", - UpdateTime: 5456565466, - } - // Mix possible values as well as value sum. - count := i % valueFields - switch count { - case 0: - msg.Subtopic = subtopic - msg.Value = &v - case 1: - msg.BoolValue = &boolV - case 2: - msg.StringValue = &stringV - case 3: - msg.DataValue = &dataV - case 4: - msg.Sum = &sum - } - - msg.Time = float64(now)/float64(1e9) - float64(i) - msgs = append(msgs, msg) - } - - err = repo.Consume(msgs) - assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) - - count, err := queryDB(rowCountSenml) - assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) - assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) - } + cases := []struct { + desc string + msgsNum int + expectedSize int + }{ + { + desc: "save a single message", + msgsNum: 1, + expectedSize: 1, + }, + { + desc: "save a batch of messages", + msgsNum: streamsSize, + expectedSize: streamsSize, + }, } -} -func TestSaveJSON(t *testing.T) { - // Testing both async and sync - for i := 0; i < 2; i++ { - // Testing both async and sync - repo := writer.New(client, repoCfg, i == 0) + for _, tc := range cases { + err := resetBucket() + assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) + now := time.Now().UnixNano() + var msgs []senml.Message chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - msg := json.Message{ - Channel: chanID, - Publisher: pubID, - Created: time.Now().UnixNano(), - Subtopic: "subtopic/format/some_json", - Protocol: "mqtt", - Payload: map[string]interface{}{ - "field_1": 123, - "field_2": "value", - "field_3": false, - "field_4": 12.344, - "field_5": map[string]interface{}{ - "field_1": "value", - "field_2": 42, - }, - }, - } - - invalidKeySepMsg := msg - invalidKeySepMsg.Payload = map[string]interface{}{ - "field_1": 123, - "field_2": "value", - "field_3": false, - "field_4": 12.344, - "field_5": map[string]interface{}{ - "field_1": "value", - "field_2": 42, - }, - "field_6/field_7": "value", - } - invalidKeyNameMsg := msg - invalidKeyNameMsg.Payload = map[string]interface{}{ - "field_1": 123, - "field_2": "value", - "field_3": false, - "field_4": 12.344, - "field_5": map[string]interface{}{ - "field_1": "value", - "field_2": 42, - }, - "publisher": "value", - } - - now := time.Now().UnixNano() - msgs := json.Messages{ - Format: "some_json", - } - invalidKeySepMsgs := json.Messages{ - Format: "some_json", - } - invalidKeyNameMsgs := json.Messages{ - Format: "some_json", - } - - for i := 0; i < streamsSize; i++ { - msg.Created = now - msgs.Data = append(msgs.Data, msg) - invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg) - invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg) - } - - cases := []struct { - desc string - msgs json.Messages - err error - }{ - { - desc: "consume valid json messages", - msgs: msgs, - err: nil, - }, - { - desc: "consume invalid json messages containing invalid key separator", - msgs: invalidKeySepMsgs, - err: json.ErrInvalidKey, - }, - { - desc: "consume invalid json messages containing invalid key name", - msgs: invalidKeySepMsgs, - err: json.ErrInvalidKey, - }, - } - for _, tc := range cases { - err := resetBucket() - assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) - - switch err = repo.Consume(tc.msgs); err { - case nil: - count, err := queryDB(rowCountJson) - assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) - assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count)) - default: - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err)) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) + for i := 0; i < tc.msgsNum; i++ { + msg := senml.Message{ + Channel: chanID, + Publisher: pubID, + Protocol: "http", + Name: "test name", + Unit: "km", + UpdateTime: 5456565466, } + // Mix possible values as well as value sum. + count := i % valueFields + switch count { + case 0: + msg.Subtopic = subtopic + msg.Value = &v + case 1: + msg.BoolValue = &boolV + case 2: + msg.StringValue = &stringV + case 3: + msg.DataValue = &dataV + case 4: + msg.Sum = &sum + } + + msg.Time = float64(now)/float64(1e9) - float64(i) + msgs = append(msgs, msg) + } + + errs := asyncRepo.Errors() + asyncRepo.ConsumeAsync(msgs) + err = <-errs + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + + count, err := queryDB(rowCountSenml) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) + assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) + } +} + +func TestBlockingSaveSenml(t *testing.T) { + syncRepo := writer.NewSync(client, repoCfg) + + cases := []struct { + desc string + msgsNum int + expectedSize int + }{ + { + desc: "save a single message", + msgsNum: 1, + expectedSize: 1, + }, + { + desc: "save a batch of messages", + msgsNum: streamsSize, + expectedSize: streamsSize, + }, + } + + for _, tc := range cases { + err := resetBucket() + assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) + now := time.Now().UnixNano() + var msgs []senml.Message + + chanID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) + pubID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err)) + for i := 0; i < tc.msgsNum; i++ { + msg := senml.Message{ + Channel: chanID, + Publisher: pubID, + Protocol: "http", + Name: "test name", + Unit: "km", + UpdateTime: 5456565466, + } + // Mix possible values as well as value sum. + count := i % valueFields + switch count { + case 0: + msg.Subtopic = subtopic + msg.Value = &v + case 1: + msg.BoolValue = &boolV + case 2: + msg.StringValue = &stringV + case 3: + msg.DataValue = &dataV + case 4: + msg.Sum = &sum + } + + msg.Time = float64(now)/float64(1e9) - float64(i) + msgs = append(msgs, msg) + } + + err = syncRepo.ConsumeBlocking(msgs) + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + + count, err := queryDB(rowCountSenml) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) + assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) + } +} + +func TestAsyncSaveJSON(t *testing.T) { + asyncRepo := writer.NewAsync(client, repoCfg) + + chanID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + pubID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + msg := json.Message{ + Channel: chanID, + Publisher: pubID, + Created: time.Now().UnixNano(), + Subtopic: "subtopic/format/some_json", + Protocol: "mqtt", + Payload: map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + }, + } + + invalidKeySepMsg := msg + invalidKeySepMsg.Payload = map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + "field_6/field_7": "value", + } + invalidKeyNameMsg := msg + invalidKeyNameMsg.Payload = map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + "publisher": "value", + } + + now := time.Now().UnixNano() + msgs := json.Messages{ + Format: "some_json", + } + invalidKeySepMsgs := json.Messages{ + Format: "some_json", + } + invalidKeyNameMsgs := json.Messages{ + Format: "some_json", + } + + for i := 0; i < streamsSize; i++ { + msg.Created = now + msgs.Data = append(msgs.Data, msg) + invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg) + invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg) + } + + cases := []struct { + desc string + msgs json.Messages + err error + }{ + { + desc: "consume valid json messages", + msgs: msgs, + err: nil, + }, + { + desc: "consume invalid json messages containing invalid key separator", + msgs: invalidKeySepMsgs, + err: json.ErrInvalidKey, + }, + { + desc: "consume invalid json messages containing invalid key name", + msgs: invalidKeySepMsgs, + err: json.ErrInvalidKey, + }, + } + + for _, tc := range cases { + err := resetBucket() + assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) + + asyncRepo.ConsumeAsync(msgs) + timer := time.NewTimer(1 * time.Millisecond) + select { + case err = <-asyncRepo.Errors(): + case <-timer.C: + t.Error("errors channel blocked, nothing returned.") + } + switch err { + case nil: + count, err := queryDB(rowCountJson) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) + assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count)) + default: + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err)) + } + } +} + +func TestBlockingSaveJSON(t *testing.T) { + syncRepo := writer.NewSync(client, repoCfg) + + chanID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + pubID, err := idProvider.ID() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + msg := json.Message{ + Channel: chanID, + Publisher: pubID, + Created: time.Now().UnixNano(), + Subtopic: "subtopic/format/some_json", + Protocol: "mqtt", + Payload: map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + }, + } + + invalidKeySepMsg := msg + invalidKeySepMsg.Payload = map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + "field_6/field_7": "value", + } + invalidKeyNameMsg := msg + invalidKeyNameMsg.Payload = map[string]interface{}{ + "field_1": 123, + "field_2": "value", + "field_3": false, + "field_4": 12.344, + "field_5": map[string]interface{}{ + "field_1": "value", + "field_2": 42, + }, + "publisher": "value", + } + + now := time.Now().UnixNano() + msgs := json.Messages{ + Format: "some_json", + } + invalidKeySepMsgs := json.Messages{ + Format: "some_json", + } + invalidKeyNameMsgs := json.Messages{ + Format: "some_json", + } + + for i := 0; i < streamsSize; i++ { + msg.Created = now + msgs.Data = append(msgs.Data, msg) + invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg) + invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg) + } + + cases := []struct { + desc string + msgs json.Messages + err error + }{ + { + desc: "consume valid json messages", + msgs: msgs, + err: nil, + }, + { + desc: "consume invalid json messages containing invalid key separator", + msgs: invalidKeySepMsgs, + err: json.ErrInvalidKey, + }, + { + desc: "consume invalid json messages containing invalid key name", + msgs: invalidKeySepMsgs, + err: json.ErrInvalidKey, + }, + } + + for _, tc := range cases { + err := resetBucket() + assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) + + switch err = syncRepo.ConsumeBlocking(tc.msgs); err { + case nil: + count, err := queryDB(rowCountJson) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) + assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count)) + default: + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err)) } } } diff --git a/consumers/writers/mongodb/consumer.go b/consumers/writers/mongodb/consumer.go index 9cf73f54..3ac291d6 100644 --- a/consumers/writers/mongodb/consumer.go +++ b/consumers/writers/mongodb/consumer.go @@ -18,25 +18,24 @@ const senmlCollection string = "messages" var errSaveMessage = errors.New("failed to save message to mongodb database") -var _ consumers.Consumer = (*mongoRepo)(nil) +var _ consumers.BlockingConsumer = (*mongoRepo)(nil) type mongoRepo struct { db *mongo.Database } // New returns new MongoDB writer. -func New(db *mongo.Database) consumers.Consumer { +func New(db *mongo.Database) consumers.BlockingConsumer { return &mongoRepo{db} } -func (repo *mongoRepo) Consume(message interface{}) error { +func (repo *mongoRepo) ConsumeBlocking(message interface{}) error { switch m := message.(type) { case json.Messages: return repo.saveJSON(m) default: return repo.saveSenml(m) } - } func (repo *mongoRepo) saveSenml(messages interface{}) error { diff --git a/consumers/writers/mongodb/consumer_test.go b/consumers/writers/mongodb/consumer_test.go index 6ca6fd20..31d2f097 100644 --- a/consumers/writers/mongodb/consumer_test.go +++ b/consumers/writers/mongodb/consumer_test.go @@ -83,7 +83,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) @@ -131,6 +131,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/postgres/consumer.go b/consumers/writers/postgres/consumer.go index 4d804fcc..1512cb93 100644 --- a/consumers/writers/postgres/consumer.go +++ b/consumers/writers/postgres/consumer.go @@ -25,18 +25,18 @@ var ( errNoTable = errors.New("relation does not exist") ) -var _ consumers.Consumer = (*postgresRepo)(nil) +var _ consumers.BlockingConsumer = (*postgresRepo)(nil) type postgresRepo struct { db *sqlx.DB } // New returns new PostgreSQL writer. -func New(db *sqlx.DB) consumers.Consumer { +func New(db *sqlx.DB) consumers.BlockingConsumer { return &postgresRepo{db: db} } -func (pr postgresRepo) Consume(message interface{}) (err error) { +func (pr postgresRepo) ConsumeBlocking(message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return pr.saveJSON(m) diff --git a/consumers/writers/postgres/consumer_test.go b/consumers/writers/postgres/consumer_test.go index e45b7379..16eed9bf 100644 --- a/consumers/writers/postgres/consumer_test.go +++ b/consumers/writers/postgres/consumer_test.go @@ -67,7 +67,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } @@ -107,6 +107,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/timescale/consumer.go b/consumers/writers/timescale/consumer.go index c44efb08..6e652ce7 100644 --- a/consumers/writers/timescale/consumer.go +++ b/consumers/writers/timescale/consumer.go @@ -24,18 +24,18 @@ var ( errNoTable = errors.New("relation does not exist") ) -var _ consumers.Consumer = (*timescaleRepo)(nil) +var _ consumers.BlockingConsumer = (*timescaleRepo)(nil) type timescaleRepo struct { db *sqlx.DB } // New returns new TimescaleSQL writer. -func New(db *sqlx.DB) consumers.Consumer { +func New(db *sqlx.DB) consumers.BlockingConsumer { return ×caleRepo{db: db} } -func (tr timescaleRepo) Consume(message interface{}) (err error) { +func (tr *timescaleRepo) ConsumeBlocking(message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return tr.saveJSON(m) diff --git a/consumers/writers/timescale/consumer_test.go b/consumers/writers/timescale/consumer_test.go index 29b8f4ac..66598794 100644 --- a/consumers/writers/timescale/consumer_test.go +++ b/consumers/writers/timescale/consumer_test.go @@ -67,7 +67,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } @@ -107,6 +107,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.Consume(msgs) + err = repo.ConsumeBlocking(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/docker/.env b/docker/.env index f84571c3..45138fb6 100644 --- a/docker/.env +++ b/docker/.env @@ -240,6 +240,7 @@ MF_INFLUXDB_PORT=8086 MF_INFLUXDB_HOST=mainflux-influxdb MF_INFLUXDB_ADMIN_USER=mainflux MF_INFLUXDB_ADMIN_PASSWORD=mainflux +MF_INFLUXDB_ADMIN_URL="http://localhost:8086" MF_INFLUXDB_PROTOCOL=http MF_INFLUXDB_TIMEOUT=1s MF_INFLUXDB_ORG=mainflux diff --git a/docker/addons/influxdb-reader/docker-compose.yml b/docker/addons/influxdb-reader/docker-compose.yml index 65c36842..b6659b06 100644 --- a/docker/addons/influxdb-reader/docker-compose.yml +++ b/docker/addons/influxdb-reader/docker-compose.yml @@ -29,6 +29,7 @@ services: MF_INFLUXDB_PROTOCOL: ${MF_INFLUXDB_PROTOCOL} MF_INFLUXDB_ADMIN_USER: ${MF_INFLUXDB_ADMIN_USER} MF_INFLUXDB_ADMIN_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD} + MF_INFLUXDB_ADMIN_URL: ${MF_INFLUXDB_ADMIN_URL} MF_INFLUX_READER_SERVER_CERT: ${MF_INFLUX_READER_SERVER_CERT} MF_INFLUX_READER_SERVER_KEY: ${MF_INFLUX_READER_SERVER_KEY} MF_JAEGER_URL: ${MF_JAEGER_URL} diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index 1bf9c863..6c16c5a1 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -24,6 +24,7 @@ services: DOCKER_INFLUXDB_INIT_MODE: ${MF_INFLUXDB_INIT_MODE} DOCKER_INFLUXDB_INIT_USERNAME: ${MF_INFLUXDB_ADMIN_USER} DOCKER_INFLUXDB_INIT_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD} + DOCKER_INFLUXDB_ADMIN_URL: ${MF_INFLUXDB_ADMIN_URL} DOCKER_INFLUXDB_INIT_ORG: ${MF_INFLUXDB_ORG} DOCKER_INFLUXDB_INIT_BUCKET: ${MF_INFLUXDB_BUCKET} DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${MF_INFLUXDB_TOKEN} diff --git a/go.mod b/go.mod index 23c0103a..17a5a96a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/vault/api v1.8.1 github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f - github.com/influxdata/influxdb-client-go/v2 v2.12.2 + github.com/influxdata/influxdb-client-go/v2 v2.12.3 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgtype v1.13.0 github.com/jackc/pgx/v5 v5.2.0 diff --git a/go.sum b/go.sum index 235c8c82..d3256d14 100644 --- a/go.sum +++ b/go.sum @@ -378,8 +378,8 @@ github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g= -github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU= +github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0IpXeMSkY/uJa/O/vC4= +github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= diff --git a/internal/clients/influxdb/influxdb.go b/internal/clients/influxdb/influxdb.go index 827d72db..dcea8ece 100644 --- a/internal/clients/influxdb/influxdb.go +++ b/internal/clients/influxdb/influxdb.go @@ -15,14 +15,19 @@ var ( ) type Config struct { - Protocol string `env:"PROTOCOL" envDefault:"http"` - Host string `env:"HOST" envDefault:"localhost"` - Port string `env:"PORT" envDefault:"8086"` - Bucket string `env:"BUCKET" envDefault:"mainflux-bucket"` - Org string `env:"ORG" envDefault:"mainflux"` - Token string `env:"TOKEN" envDefault:"mainflux-token"` - DBUrl string `env:"DBURL" envDefault:""` - Timeout time.Duration `env:"TIMEOUT" envDefault:"1s"` + Protocol string `env:"PROTOCOL" envDefault:"http"` + Host string `env:"HOST" envDefault:"localhost"` + Port string `env:"PORT" envDefault:"8086"` + Username string `env:"ADMIN_USER" envDefault:"mainflux"` + Password string `env:"ADMIN_PASSWORD" envDefault:"mainflux"` + DbName string `env:"DB" envDefault:"mainflux"` + Bucket string `env:"BUCKET" envDefault:"mainflux-bucket"` + Org string `env:"ORG" envDefault:"mainflux"` + Token string `env:"TOKEN" envDefault:"mainflux-token"` + DBUrl string `env:"DBURL" envDefault:""` + UserAgent string `env:"USER_AGENT" envDefault:"InfluxDBClient"` + Timeout time.Duration `env:"TIMEOUT"` // Influxdb client configuration by default has no timeout duration , this field will not have a fallback default timeout duration. Reference: https://pkg.go.dev/github.com/influxdata/influxdb@v1.10.0/client/v2#HTTPConfig + InsecureSkipVerify bool `env:"INSECURE_SKIP_VERIFY" envDefault:"false"` } // Setup load configuration from environment variable, create InfluxDB client and connect to InfluxDB server @@ -36,7 +41,10 @@ func Setup(envPrefix string, ctx context.Context) (influxdb2.Client, error) { // Connect create InfluxDB client and connect to InfluxDB server func Connect(config Config, ctx context.Context) (influxdb2.Client, error) { - client := influxdb2.NewClient(config.DBUrl, config.Token) + client := influxdb2.NewClientWithOptions(config.DBUrl, config.Token, + influxdb2.DefaultOptions(). + SetUseGZip(true). + SetFlushInterval(100)) ctx, cancel := context.WithTimeout(ctx, config.Timeout) defer cancel() if _, err := client.Ready(ctx); err != nil { diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index fb55fb00..8a768fa0 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -114,7 +114,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.Consume(messages) + err = writer.ConsumeBlocking(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) reader := creader.New(session) @@ -450,7 +450,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs1 = append(msgs1, m) } - err = writer.Consume(messages1) + + err = writer.ConsumeBlocking(messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -483,7 +484,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs2 = append(msgs2, m) } - err = writer.Consume(messages2) + + err = writer.ConsumeBlocking(messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 09f52911..a55b0ce8 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -50,7 +50,7 @@ var ( ) func TestReadSenml(t *testing.T) { - writer := iwriter.New(client, repoCfg, true) + asyncWriter := iwriter.NewAsync(client, repoCfg) chanID, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -110,8 +110,10 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.Consume(messages) - require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) + errs := asyncWriter.Errors() + asyncWriter.ConsumeAsync(messages) + err = <-errs + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) reader := ireader.New(client, repoCfg) @@ -407,7 +409,7 @@ func TestReadSenml(t *testing.T) { } func TestReadJSON(t *testing.T) { - writer := iwriter.New(client, repoCfg, true) + asyncWriter := iwriter.NewAsync(client, repoCfg) id1, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -432,8 +434,11 @@ func TestReadJSON(t *testing.T) { m := toMap(m) msgs1 = append(msgs1, m) } - err = writer.Consume(messages1) - require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) + + errs := asyncWriter.Errors() + asyncWriter.ConsumeAsync(messages1) + err = <-errs + require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) id2, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -460,8 +465,11 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs2 = append(msgs2, m) } - err = writer.Consume(messages2) - assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) + + // Test async + asyncWriter.ConsumeAsync(messages2) + err = <-errs + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) httpMsgs := []map[string]interface{}{} for i := 0; i < msgsNum; i += 2 { diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 89c9327e..67dbf57e 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -109,7 +109,7 @@ func TestReadSenml(t *testing.T) { } messages = append(messages, msg) } - err = writer.Consume(messages) + err = writer.ConsumeBlocking(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) reader := mreader.New(db) @@ -416,7 +416,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs1 = append(msgs1, m) } - err = writer.Consume(messages1) + + err = writer.ConsumeBlocking(messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -449,7 +450,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs2 = append(msgs2, m) } - err = writer.Consume(messages2) + + err = writer.ConsumeBlocking(messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 55535d60..440f3f48 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -99,7 +99,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.Consume(messages) + err = writer.ConsumeBlocking(messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := preader.New(db) @@ -408,7 +408,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs1 = append(msgs1, m) } - err = writer.Consume(messages1) + + err = writer.ConsumeBlocking(messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -438,7 +439,8 @@ func TestReadJSON(t *testing.T) { m := toMap(msg) msgs2 = append(msgs2, m) } - err = writer.Consume(messages2) + + err = writer.ConsumeBlocking(messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/timescale/messages_test.go b/readers/timescale/messages_test.go index ec4086f8..1a92131d 100644 --- a/readers/timescale/messages_test.go +++ b/readers/timescale/messages_test.go @@ -98,7 +98,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.Consume(messages) + err = writer.ConsumeBlocking(messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := treader.New(db) @@ -410,7 +410,8 @@ func TestReadJSON(t *testing.T) { mapped := toMap(msg) msgs1 = append(msgs1, mapped) } - err = writer.Consume(messages1) + + err = writer.ConsumeBlocking(messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -441,7 +442,8 @@ func TestReadJSON(t *testing.T) { mapped := toMap(msg) msgs2 = append(msgs2, mapped) } - err = writer.Consume(messages2) + + err = writer.ConsumeBlocking(messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/vendor/github.com/influxdata/influxdb-client-go/v2/CHANGELOG.md b/vendor/github.com/influxdata/influxdb-client-go/v2/CHANGELOG.md index 5cabee14..93b8fc6f 100644 --- a/vendor/github.com/influxdata/influxdb-client-go/v2/CHANGELOG.md +++ b/vendor/github.com/influxdata/influxdb-client-go/v2/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.12.3 [2023-03-29] +### Bug fixes +- Update golang.org/x/net from 0.0.0-20210119194325-5f4716e94777 to 0.7.0 + ## 2.12.2 [2023-01-26] ### Bug fixes - [#368](https://github.com/influxdata/influxdb-client-go/pull/368) Allowing proxy from environment variable diff --git a/vendor/github.com/influxdata/influxdb-client-go/v2/version.go b/vendor/github.com/influxdata/influxdb-client-go/v2/version.go index 2cfddcae..92f315e4 100644 --- a/vendor/github.com/influxdata/influxdb-client-go/v2/version.go +++ b/vendor/github.com/influxdata/influxdb-client-go/v2/version.go @@ -13,7 +13,7 @@ import ( const ( // Version defines current version - Version = "2.12.2" + Version = "2.12.3" ) func init() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 73565379..b83f14a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -270,7 +270,9 @@ github.com/imdario/mergo # github.com/inconshreveable/mousetrap v1.0.1 ## explicit; go 1.18 github.com/inconshreveable/mousetrap -# github.com/influxdata/influxdb-client-go/v2 v2.12.2 +# github.com/influxdata/influxdb-client-go v1.4.0 +## explicit; go 1.13 +# github.com/influxdata/influxdb-client-go/v2 v2.12.3 ## explicit; go 1.17 github.com/influxdata/influxdb-client-go/v2 github.com/influxdata/influxdb-client-go/v2/api