NOISSUE - Blocking and Async Consumer Interface (#1742)
* Add Async Consumer Support Author: aryan <aryangodara03@gmail.com> Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * update consumer_async, fix flush interval Signed-off-by: aryan <aryangodara03@gmail.com> * update influxdb from 1.4.0 to 2.12.3 Signed-off-by: aryan <aryangodara03@gmail.com> * separate tests and update logging and metrics middlewares Signed-off-by: aryan <aryangodara03@gmail.com> * fix typos and comments Signed-off-by: aryan <aryangodara03@gmail.com> * fix interfaces and tests Signed-off-by: aryan <aryangodara03@gmail.com> * fix interface and add docs Signed-off-by: aryan <aryangodara03@gmail.com> * update Consumer interface godoc Signed-off-by: aryan <aryangodara03@gmail.com> * update influx-writer logger Signed-off-by: aryan <aryangodara03@gmail.com> --------- Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: aryan <aryangodara03@gmail.com> Co-authored-by: dusanb94 <dusan.borovcanin@mainflux.com>
This commit is contained in:
parent
ffd67ae154
commit
23bc094ec3
|
@ -97,7 +97,7 @@ func main() {
|
|||
|
||||
}
|
||||
|
||||
func newService(session *gocql.Session, logger mflog.Logger) consumers.Consumer {
|
||||
func newService(session *gocql.Session, logger mflog.Logger) consumers.BlockingConsumer {
|
||||
repo := cassandra.New(session)
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
counter, latency := internal.MakeMetrics("cassandra", "message_writer")
|
||||
|
|
|
@ -67,6 +67,7 @@ func main() {
|
|||
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
|
||||
}
|
||||
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)
|
||||
|
||||
repocfg := influxdb.RepoConfig{
|
||||
Bucket: influxDBConfig.Bucket,
|
||||
Org: influxDBConfig.Org,
|
||||
|
|
|
@ -9,11 +9,9 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
"github.com/mainflux/mainflux/consumers"
|
||||
"github.com/mainflux/mainflux/consumers/writers/api"
|
||||
"github.com/mainflux/mainflux/consumers/writers/influxdb"
|
||||
"github.com/mainflux/mainflux/internal"
|
||||
influxDBClient "github.com/mainflux/mainflux/internal/clients/influxdb"
|
||||
"github.com/mainflux/mainflux/internal/env"
|
||||
"github.com/mainflux/mainflux/internal/server"
|
||||
|
@ -62,6 +60,7 @@ func main() {
|
|||
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
|
||||
}
|
||||
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)
|
||||
|
||||
repocfg := influxdb.RepoConfig{
|
||||
Bucket: influxDBConfig.Bucket,
|
||||
Org: influxDBConfig.Org,
|
||||
|
@ -73,7 +72,16 @@ func main() {
|
|||
}
|
||||
defer client.Close()
|
||||
|
||||
repo := newService(client, repocfg, logger)
|
||||
repo := influxdb.NewAsync(client, repocfg)
|
||||
|
||||
// Start consuming and logging errors.
|
||||
go func(log mflog.Logger) {
|
||||
for err := range repo.Errors() {
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}
|
||||
}(logger)
|
||||
|
||||
if err := consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
|
||||
|
@ -97,11 +105,3 @@ func main() {
|
|||
logger.Error(fmt.Sprintf("InfluxDB reader service terminated: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
func newService(client influxdb2.Client, repocfg influxdb.RepoConfig, logger mflog.Logger) consumers.Consumer {
|
||||
repo := influxdb.New(client, repocfg, true)
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
counter, latency := internal.MakeMetrics("influxdb", "message_writer")
|
||||
repo = api.MetricsMiddleware(repo, counter, latency)
|
||||
return repo
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func newService(db *mongo.Database, logger mflog.Logger) consumers.Consumer {
|
||||
func newService(db *mongo.Database, logger mflog.Logger) consumers.BlockingConsumer {
|
||||
repo := mongodb.New(db)
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
counter, latency := internal.MakeMetrics("mongodb", "message_writer")
|
||||
|
|
|
@ -89,7 +89,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func newService(db *sqlx.DB, logger mflog.Logger) consumers.Consumer {
|
||||
func newService(db *sqlx.DB, logger mflog.Logger) consumers.BlockingConsumer {
|
||||
svc := writerPg.New(db)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics("postgres", "message_writer")
|
||||
|
|
|
@ -89,7 +89,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func newService(db *sqlx.DB, logger mflog.Logger) consumers.Consumer {
|
||||
func newService(db *sqlx.DB, logger mflog.Logger) consumers.BlockingConsumer {
|
||||
svc := timescale.New(db)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics("timescale", "message_writer")
|
||||
|
|
|
@ -3,9 +3,26 @@
|
|||
|
||||
package consumers
|
||||
|
||||
// Consumer specifies message consuming API.
|
||||
type Consumer interface {
|
||||
// Consume method is used to consumed received messages.
|
||||
// A non-nil error is returned to indicate operation failure.
|
||||
Consume(messages interface{}) error
|
||||
// AsyncConsumer specifies a non-blocking message-consuming API,
|
||||
// which can be used for writing data to the DB, publishing messages
|
||||
// to broker, sending notifications, or any other asynchronous job.
|
||||
type AsyncConsumer interface {
|
||||
// ConsumeAsync method is used to asynchronously consume received messages.
|
||||
ConsumeAsync(messages interface{})
|
||||
|
||||
// Errors method returns a channel for reading errors which occur during async writes.
|
||||
// Must be called before performing any writes for errors to be collected.
|
||||
// The channel is buffered(1) so it allows only 1 error without blocking if not drained.
|
||||
// The channel may receive nil error to indicate success.
|
||||
Errors() <-chan error
|
||||
}
|
||||
|
||||
// BlockingConsumer specifies a blocking message-consuming API,
|
||||
// which can be used for writing data to the DB, publishing messages
|
||||
// to broker, sending notifications... BlockingConsumer implementations
|
||||
// might also support concurrent use, but consult implementation for more details.
|
||||
type BlockingConsumer interface {
|
||||
// ConsumeBlocking method is used to consume received messages synchronously.
|
||||
// A non-nil error is returned to indicate operation failure.
|
||||
ConsumeBlocking(messages interface{}) error
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ package consumers
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
|
@ -33,7 +32,7 @@ var (
|
|||
// Start method starts consuming messages received from Message broker.
|
||||
// This method transforms messages to SenML format before
|
||||
// using MessageRepository to store them.
|
||||
func Start(id string, sub messaging.Subscriber, consumer Consumer, configPath string, logger logger.Logger) error {
|
||||
func Start(id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error {
|
||||
cfg, err := loadConfig(configPath)
|
||||
if err != nil {
|
||||
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
|
||||
|
@ -42,14 +41,24 @@ func Start(id string, sub messaging.Subscriber, consumer Consumer, configPath st
|
|||
transformer := makeTransformer(cfg.TransformerCfg, logger)
|
||||
|
||||
for _, subject := range cfg.SubscriberCfg.Subjects {
|
||||
if err := sub.Subscribe(id, subject, handle(transformer, consumer)); err != nil {
|
||||
return err
|
||||
switch c := consumer.(type) {
|
||||
case AsyncConsumer:
|
||||
if err := sub.Subscribe(id, subject, handleAsync(transformer, c)); err != nil {
|
||||
return err
|
||||
}
|
||||
case BlockingConsumer:
|
||||
if err := sub.Subscribe(id, subject, handleSync(transformer, c)); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return errors.ErrInvalidQueryParams
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handle(t transformers.Transformer, c Consumer) handleFunc {
|
||||
func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc {
|
||||
return func(msg *messaging.Message) error {
|
||||
m := interface{}(msg)
|
||||
var err error
|
||||
|
@ -59,7 +68,23 @@ func handle(t transformers.Transformer, c Consumer) handleFunc {
|
|||
return err
|
||||
}
|
||||
}
|
||||
return c.Consume(m)
|
||||
return sc.ConsumeBlocking(m)
|
||||
}
|
||||
}
|
||||
|
||||
func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc {
|
||||
return func(msg *messaging.Message) error {
|
||||
m := interface{}(msg)
|
||||
var err error
|
||||
if t != nil {
|
||||
m, err = t.Transform(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ac.ConsumeAsync(m)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,7 +125,7 @@ func loadConfig(configPath string) (config, error) {
|
|||
},
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadFile(configPath)
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return cfg, errors.Wrap(errOpenConfFile, err)
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ func (lm *loggingMiddleware) RemoveSubscription(ctx context.Context, token, id s
|
|||
return lm.svc.RemoveSubscription(ctx, token, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Consume(msg interface{}) (err error) {
|
||||
func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
|
@ -88,5 +88,5 @@ func (lm *loggingMiddleware) Consume(msg interface{}) (err error) {
|
|||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Consume(msg)
|
||||
return lm.svc.ConsumeBlocking(msg)
|
||||
}
|
||||
|
|
|
@ -66,11 +66,11 @@ func (ms *metricsMiddleware) RemoveSubscription(ctx context.Context, token, id s
|
|||
return ms.svc.RemoveSubscription(ctx, token, id)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) Consume(msg interface{}) error {
|
||||
func (ms *metricsMiddleware) ConsumeBlocking(msg interface{}) error {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "consume").Add(1)
|
||||
ms.latency.With("method", "consume").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
return ms.svc.Consume(msg)
|
||||
return ms.svc.ConsumeBlocking(msg)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@ var (
|
|||
ErrMessage = errors.New("failed to convert to Mainflux message")
|
||||
)
|
||||
|
||||
var _ consumers.AsyncConsumer = (*notifierService)(nil)
|
||||
|
||||
// Service reprents a notification service.
|
||||
type Service interface {
|
||||
// CreateSubscription persists a subscription.
|
||||
|
@ -33,7 +35,7 @@ type Service interface {
|
|||
// RemoveSubscription removes the subscription having the provided identifier.
|
||||
RemoveSubscription(ctx context.Context, token, id string) error
|
||||
|
||||
consumers.Consumer
|
||||
consumers.BlockingConsumer
|
||||
}
|
||||
|
||||
var _ Service = (*notifierService)(nil)
|
||||
|
@ -43,6 +45,7 @@ type notifierService struct {
|
|||
subs SubscriptionsRepository
|
||||
idp mainflux.IDProvider
|
||||
notifier Notifier
|
||||
errCh chan error
|
||||
from string
|
||||
}
|
||||
|
||||
|
@ -53,6 +56,7 @@ func New(auth mainflux.AuthServiceClient, subs SubscriptionsRepository, idp main
|
|||
subs: subs,
|
||||
idp: idp,
|
||||
notifier: notifier,
|
||||
errCh: make(chan error, 1),
|
||||
from: from,
|
||||
}
|
||||
}
|
||||
|
@ -95,7 +99,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str
|
|||
return ns.subs.Remove(ctx, id)
|
||||
}
|
||||
|
||||
func (ns *notifierService) Consume(message interface{}) error {
|
||||
func (ns *notifierService) ConsumeBlocking(message interface{}) error {
|
||||
msg, ok := message.(*messaging.Message)
|
||||
if !ok {
|
||||
return ErrMessage
|
||||
|
@ -127,3 +131,39 @@ func (ns *notifierService) Consume(message interface{}) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ns *notifierService) ConsumeAsync(message interface{}) {
|
||||
msg, ok := message.(*messaging.Message)
|
||||
if !ok {
|
||||
ns.errCh <- ErrMessage
|
||||
return
|
||||
}
|
||||
topic := msg.Channel
|
||||
if msg.Subtopic != "" {
|
||||
topic = fmt.Sprintf("%s.%s", msg.Channel, msg.Subtopic)
|
||||
}
|
||||
pm := PageMetadata{
|
||||
Topic: topic,
|
||||
Offset: 0,
|
||||
Limit: -1,
|
||||
}
|
||||
page, err := ns.subs.RetrieveAll(context.Background(), pm)
|
||||
if err != nil {
|
||||
ns.errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
var to []string
|
||||
for _, sub := range page.Subscriptions {
|
||||
to = append(to, sub.Contact)
|
||||
}
|
||||
if len(to) > 0 {
|
||||
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
|
||||
ns.errCh <- errors.Wrap(ErrNotify, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ns *notifierService) Errors() <-chan error {
|
||||
return ns.errCh
|
||||
}
|
||||
|
|
|
@ -322,7 +322,7 @@ func TestConsume(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
err := svc.Consume(tc.msg)
|
||||
err := svc.ConsumeBlocking(tc.msg)
|
||||
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,22 +13,22 @@ import (
|
|||
log "github.com/mainflux/mainflux/logger"
|
||||
)
|
||||
|
||||
var _ consumers.Consumer = (*loggingMiddleware)(nil)
|
||||
var _ consumers.BlockingConsumer = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger log.Logger
|
||||
consumer consumers.Consumer
|
||||
consumer consumers.BlockingConsumer
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the adapter.
|
||||
func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer {
|
||||
func LoggingMiddleware(consumer consumers.BlockingConsumer, logger log.Logger) consumers.BlockingConsumer {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
consumer: consumer,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) {
|
||||
func (lm *loggingMiddleware) ConsumeBlocking(msgs interface{}) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
|
@ -38,5 +38,5 @@ func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) {
|
|||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.consumer.Consume(msgs)
|
||||
return lm.consumer.ConsumeBlocking(msgs)
|
||||
}
|
||||
|
|
|
@ -12,17 +12,17 @@ import (
|
|||
"github.com/mainflux/mainflux/consumers"
|
||||
)
|
||||
|
||||
var _ consumers.Consumer = (*metricsMiddleware)(nil)
|
||||
var _ consumers.BlockingConsumer = (*metricsMiddleware)(nil)
|
||||
|
||||
type metricsMiddleware struct {
|
||||
counter metrics.Counter
|
||||
latency metrics.Histogram
|
||||
consumer consumers.Consumer
|
||||
consumer consumers.BlockingConsumer
|
||||
}
|
||||
|
||||
// MetricsMiddleware returns new message repository
|
||||
// with Save method wrapped to expose metrics.
|
||||
func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer {
|
||||
func MetricsMiddleware(consumer consumers.BlockingConsumer, counter metrics.Counter, latency metrics.Histogram) consumers.BlockingConsumer {
|
||||
return &metricsMiddleware{
|
||||
counter: counter,
|
||||
latency: latency,
|
||||
|
@ -30,10 +30,10 @@ func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, lat
|
|||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) Consume(msgs interface{}) error {
|
||||
func (mm *metricsMiddleware) ConsumeBlocking(msgs interface{}) error {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "consume").Add(1)
|
||||
mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
return mm.consumer.Consume(msgs)
|
||||
return mm.consumer.ConsumeBlocking(msgs)
|
||||
}
|
||||
|
|
|
@ -18,18 +18,18 @@ var (
|
|||
errSaveMessage = errors.New("failed to save message to cassandra database")
|
||||
errNoTable = errors.New("table does not exist")
|
||||
)
|
||||
var _ consumers.Consumer = (*cassandraRepository)(nil)
|
||||
var _ consumers.BlockingConsumer = (*cassandraRepository)(nil)
|
||||
|
||||
type cassandraRepository struct {
|
||||
session *gocql.Session
|
||||
}
|
||||
|
||||
// New instantiates Cassandra message repository.
|
||||
func New(session *gocql.Session) consumers.Consumer {
|
||||
func New(session *gocql.Session) consumers.BlockingConsumer {
|
||||
return &cassandraRepository{session}
|
||||
}
|
||||
|
||||
func (cr *cassandraRepository) Consume(message interface{}) error {
|
||||
func (cr *cassandraRepository) ConsumeBlocking(message interface{}) error {
|
||||
switch m := message.(type) {
|
||||
case mfjson.Messages:
|
||||
return cr.saveJSON(m)
|
||||
|
|
|
@ -72,7 +72,7 @@ func TestSaveSenml(t *testing.T) {
|
|||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err))
|
||||
}
|
||||
|
||||
|
@ -116,6 +116,6 @@ func TestSaveJSON(t *testing.T) {
|
|||
msgs.Data = append(msgs.Data, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
|
|
@ -22,7 +22,8 @@ const senmlPoints = "messages"
|
|||
|
||||
var errSaveMessage = errors.New("failed to save message to influxdb database")
|
||||
|
||||
var _ consumers.Consumer = (*influxRepo)(nil)
|
||||
var _ consumers.AsyncConsumer = (*influxRepo)(nil)
|
||||
var _ consumers.BlockingConsumer = (*influxRepo)(nil)
|
||||
|
||||
type RepoConfig struct {
|
||||
Bucket string
|
||||
|
@ -32,19 +33,76 @@ type RepoConfig struct {
|
|||
type influxRepo struct {
|
||||
client influxdb2.Client
|
||||
cfg RepoConfig
|
||||
errCh chan error
|
||||
writeAPI api.WriteAPI
|
||||
writeAPIBlocking api.WriteAPIBlocking
|
||||
}
|
||||
|
||||
// New returns new InfluxDB writer.
|
||||
func New(client influxdb2.Client, config RepoConfig, async bool) consumers.Consumer {
|
||||
// NewSync returns new InfluxDB writer.
|
||||
func NewSync(client influxdb2.Client, config RepoConfig) consumers.BlockingConsumer {
|
||||
return &influxRepo{
|
||||
client: client,
|
||||
cfg: config,
|
||||
writeAPI: nil,
|
||||
writeAPIBlocking: client.WriteAPIBlocking(config.Org, config.Bucket),
|
||||
}
|
||||
}
|
||||
|
||||
func (repo *influxRepo) Consume(message interface{}) error {
|
||||
func NewAsync(client influxdb2.Client, config RepoConfig) consumers.AsyncConsumer {
|
||||
return &influxRepo{
|
||||
client: client,
|
||||
cfg: config,
|
||||
errCh: make(chan error, 1),
|
||||
writeAPI: client.WriteAPI(config.Org, config.Bucket),
|
||||
writeAPIBlocking: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func (repo *influxRepo) ConsumeAsync(message interface{}) {
|
||||
var err error
|
||||
var pts []*write.Point
|
||||
switch m := message.(type) {
|
||||
case json.Messages:
|
||||
pts, err = repo.jsonPoints(m)
|
||||
default:
|
||||
pts, err = repo.senmlPoints(m)
|
||||
}
|
||||
if err != nil {
|
||||
repo.errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
done := make(chan bool)
|
||||
defer close(done)
|
||||
|
||||
go func(done <-chan bool) {
|
||||
for {
|
||||
select {
|
||||
case err := <-repo.writeAPI.Errors():
|
||||
repo.errCh <- err
|
||||
case <-done:
|
||||
repo.errCh <- nil // pass nil error to the error channel
|
||||
return
|
||||
}
|
||||
}
|
||||
}(done)
|
||||
|
||||
for _, pt := range pts {
|
||||
repo.writeAPI.WritePoint(pt)
|
||||
}
|
||||
|
||||
repo.writeAPI.Flush()
|
||||
}
|
||||
|
||||
func (repo *influxRepo) Errors() <-chan error {
|
||||
if repo.errCh != nil {
|
||||
return repo.errCh
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (repo *influxRepo) ConsumeBlocking(message interface{}) error {
|
||||
var err error
|
||||
var pts []*write.Point
|
||||
switch m := message.(type) {
|
||||
|
|
|
@ -117,182 +117,359 @@ func queryDB(fluxQuery string) (int, error) {
|
|||
return rowCount, nil
|
||||
}
|
||||
|
||||
func TestSaveSenml(t *testing.T) {
|
||||
for i := 0; i < 2; i++ {
|
||||
// Testing both async and sync
|
||||
repo := writer.New(client, repoCfg, i == 0)
|
||||
func TestAsyncSaveSenml(t *testing.T) {
|
||||
asyncRepo := writer.NewAsync(client, repoCfg)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgsNum int
|
||||
expectedSize int
|
||||
}{
|
||||
{
|
||||
desc: "save a single message",
|
||||
msgsNum: 1,
|
||||
expectedSize: 1,
|
||||
},
|
||||
{
|
||||
desc: "save a batch of messages",
|
||||
msgsNum: streamsSize,
|
||||
expectedSize: streamsSize,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
now := time.Now().UnixNano()
|
||||
var msgs []senml.Message
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
pubID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
for i := 0; i < tc.msgsNum; i++ {
|
||||
msg := senml.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Protocol: "http",
|
||||
Name: "test name",
|
||||
Unit: "km",
|
||||
UpdateTime: 5456565466,
|
||||
}
|
||||
// Mix possible values as well as value sum.
|
||||
count := i % valueFields
|
||||
switch count {
|
||||
case 0:
|
||||
msg.Subtopic = subtopic
|
||||
msg.Value = &v
|
||||
case 1:
|
||||
msg.BoolValue = &boolV
|
||||
case 2:
|
||||
msg.StringValue = &stringV
|
||||
case 3:
|
||||
msg.DataValue = &dataV
|
||||
case 4:
|
||||
msg.Sum = &sum
|
||||
}
|
||||
|
||||
msg.Time = float64(now)/float64(1e9) - float64(i)
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
count, err := queryDB(rowCountSenml)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count))
|
||||
}
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgsNum int
|
||||
expectedSize int
|
||||
}{
|
||||
{
|
||||
desc: "save a single message",
|
||||
msgsNum: 1,
|
||||
expectedSize: 1,
|
||||
},
|
||||
{
|
||||
desc: "save a batch of messages",
|
||||
msgsNum: streamsSize,
|
||||
expectedSize: streamsSize,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveJSON(t *testing.T) {
|
||||
// Testing both async and sync
|
||||
for i := 0; i < 2; i++ {
|
||||
// Testing both async and sync
|
||||
repo := writer.New(client, repoCfg, i == 0)
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
now := time.Now().UnixNano()
|
||||
var msgs []senml.Message
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
pubID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
msg := json.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Created: time.Now().UnixNano(),
|
||||
Subtopic: "subtopic/format/some_json",
|
||||
Protocol: "mqtt",
|
||||
Payload: map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
invalidKeySepMsg := msg
|
||||
invalidKeySepMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"field_6/field_7": "value",
|
||||
}
|
||||
invalidKeyNameMsg := msg
|
||||
invalidKeyNameMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"publisher": "value",
|
||||
}
|
||||
|
||||
now := time.Now().UnixNano()
|
||||
msgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeySepMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeyNameMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
|
||||
for i := 0; i < streamsSize; i++ {
|
||||
msg.Created = now
|
||||
msgs.Data = append(msgs.Data, msg)
|
||||
invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg)
|
||||
invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgs json.Messages
|
||||
err error
|
||||
}{
|
||||
{
|
||||
desc: "consume valid json messages",
|
||||
msgs: msgs,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key separator",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key name",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
|
||||
switch err = repo.Consume(tc.msgs); err {
|
||||
case nil:
|
||||
count, err := queryDB(rowCountJson)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count))
|
||||
default:
|
||||
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
for i := 0; i < tc.msgsNum; i++ {
|
||||
msg := senml.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Protocol: "http",
|
||||
Name: "test name",
|
||||
Unit: "km",
|
||||
UpdateTime: 5456565466,
|
||||
}
|
||||
// Mix possible values as well as value sum.
|
||||
count := i % valueFields
|
||||
switch count {
|
||||
case 0:
|
||||
msg.Subtopic = subtopic
|
||||
msg.Value = &v
|
||||
case 1:
|
||||
msg.BoolValue = &boolV
|
||||
case 2:
|
||||
msg.StringValue = &stringV
|
||||
case 3:
|
||||
msg.DataValue = &dataV
|
||||
case 4:
|
||||
msg.Sum = &sum
|
||||
}
|
||||
|
||||
msg.Time = float64(now)/float64(1e9) - float64(i)
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
errs := asyncRepo.Errors()
|
||||
asyncRepo.ConsumeAsync(msgs)
|
||||
err = <-errs
|
||||
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
count, err := queryDB(rowCountSenml)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockingSaveSenml(t *testing.T) {
|
||||
syncRepo := writer.NewSync(client, repoCfg)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgsNum int
|
||||
expectedSize int
|
||||
}{
|
||||
{
|
||||
desc: "save a single message",
|
||||
msgsNum: 1,
|
||||
expectedSize: 1,
|
||||
},
|
||||
{
|
||||
desc: "save a batch of messages",
|
||||
msgsNum: streamsSize,
|
||||
expectedSize: streamsSize,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
now := time.Now().UnixNano()
|
||||
var msgs []senml.Message
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
pubID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s\n", err))
|
||||
for i := 0; i < tc.msgsNum; i++ {
|
||||
msg := senml.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Protocol: "http",
|
||||
Name: "test name",
|
||||
Unit: "km",
|
||||
UpdateTime: 5456565466,
|
||||
}
|
||||
// Mix possible values as well as value sum.
|
||||
count := i % valueFields
|
||||
switch count {
|
||||
case 0:
|
||||
msg.Subtopic = subtopic
|
||||
msg.Value = &v
|
||||
case 1:
|
||||
msg.BoolValue = &boolV
|
||||
case 2:
|
||||
msg.StringValue = &stringV
|
||||
case 3:
|
||||
msg.DataValue = &dataV
|
||||
case 4:
|
||||
msg.Sum = &sum
|
||||
}
|
||||
|
||||
msg.Time = float64(now)/float64(1e9) - float64(i)
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = syncRepo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
count, err := queryDB(rowCountSenml)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsyncSaveJSON(t *testing.T) {
|
||||
asyncRepo := writer.NewAsync(client, repoCfg)
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
pubID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
msg := json.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Created: time.Now().UnixNano(),
|
||||
Subtopic: "subtopic/format/some_json",
|
||||
Protocol: "mqtt",
|
||||
Payload: map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
invalidKeySepMsg := msg
|
||||
invalidKeySepMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"field_6/field_7": "value",
|
||||
}
|
||||
invalidKeyNameMsg := msg
|
||||
invalidKeyNameMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"publisher": "value",
|
||||
}
|
||||
|
||||
now := time.Now().UnixNano()
|
||||
msgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeySepMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeyNameMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
|
||||
for i := 0; i < streamsSize; i++ {
|
||||
msg.Created = now
|
||||
msgs.Data = append(msgs.Data, msg)
|
||||
invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg)
|
||||
invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgs json.Messages
|
||||
err error
|
||||
}{
|
||||
{
|
||||
desc: "consume valid json messages",
|
||||
msgs: msgs,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key separator",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key name",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
|
||||
asyncRepo.ConsumeAsync(msgs)
|
||||
timer := time.NewTimer(1 * time.Millisecond)
|
||||
select {
|
||||
case err = <-asyncRepo.Errors():
|
||||
case <-timer.C:
|
||||
t.Error("errors channel blocked, nothing returned.")
|
||||
}
|
||||
switch err {
|
||||
case nil:
|
||||
count, err := queryDB(rowCountJson)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count))
|
||||
default:
|
||||
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockingSaveJSON(t *testing.T) {
|
||||
syncRepo := writer.NewSync(client, repoCfg)
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
pubID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
msg := json.Message{
|
||||
Channel: chanID,
|
||||
Publisher: pubID,
|
||||
Created: time.Now().UnixNano(),
|
||||
Subtopic: "subtopic/format/some_json",
|
||||
Protocol: "mqtt",
|
||||
Payload: map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
invalidKeySepMsg := msg
|
||||
invalidKeySepMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"field_6/field_7": "value",
|
||||
}
|
||||
invalidKeyNameMsg := msg
|
||||
invalidKeyNameMsg.Payload = map[string]interface{}{
|
||||
"field_1": 123,
|
||||
"field_2": "value",
|
||||
"field_3": false,
|
||||
"field_4": 12.344,
|
||||
"field_5": map[string]interface{}{
|
||||
"field_1": "value",
|
||||
"field_2": 42,
|
||||
},
|
||||
"publisher": "value",
|
||||
}
|
||||
|
||||
now := time.Now().UnixNano()
|
||||
msgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeySepMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
invalidKeyNameMsgs := json.Messages{
|
||||
Format: "some_json",
|
||||
}
|
||||
|
||||
for i := 0; i < streamsSize; i++ {
|
||||
msg.Created = now
|
||||
msgs.Data = append(msgs.Data, msg)
|
||||
invalidKeySepMsgs.Data = append(invalidKeySepMsgs.Data, invalidKeySepMsg)
|
||||
invalidKeyNameMsgs.Data = append(invalidKeyNameMsgs.Data, invalidKeyNameMsg)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
msgs json.Messages
|
||||
err error
|
||||
}{
|
||||
{
|
||||
desc: "consume valid json messages",
|
||||
msgs: msgs,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key separator",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
{
|
||||
desc: "consume invalid json messages containing invalid key name",
|
||||
msgs: invalidKeySepMsgs,
|
||||
err: json.ErrInvalidKey,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
err := resetBucket()
|
||||
assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
|
||||
|
||||
switch err = syncRepo.ConsumeBlocking(tc.msgs); err {
|
||||
case nil:
|
||||
count, err := queryDB(rowCountJson)
|
||||
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
|
||||
assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count))
|
||||
default:
|
||||
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,25 +18,24 @@ const senmlCollection string = "messages"
|
|||
|
||||
var errSaveMessage = errors.New("failed to save message to mongodb database")
|
||||
|
||||
var _ consumers.Consumer = (*mongoRepo)(nil)
|
||||
var _ consumers.BlockingConsumer = (*mongoRepo)(nil)
|
||||
|
||||
type mongoRepo struct {
|
||||
db *mongo.Database
|
||||
}
|
||||
|
||||
// New returns new MongoDB writer.
|
||||
func New(db *mongo.Database) consumers.Consumer {
|
||||
func New(db *mongo.Database) consumers.BlockingConsumer {
|
||||
return &mongoRepo{db}
|
||||
}
|
||||
|
||||
func (repo *mongoRepo) Consume(message interface{}) error {
|
||||
func (repo *mongoRepo) ConsumeBlocking(message interface{}) error {
|
||||
switch m := message.(type) {
|
||||
case json.Messages:
|
||||
return repo.saveJSON(m)
|
||||
default:
|
||||
return repo.saveSenml(m)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (repo *mongoRepo) saveSenml(messages interface{}) error {
|
||||
|
|
|
@ -83,7 +83,7 @@ func TestSaveSenml(t *testing.T) {
|
|||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{})
|
||||
|
@ -131,6 +131,6 @@ func TestSaveJSON(t *testing.T) {
|
|||
msgs.Data = append(msgs.Data, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
|
|
@ -25,18 +25,18 @@ var (
|
|||
errNoTable = errors.New("relation does not exist")
|
||||
)
|
||||
|
||||
var _ consumers.Consumer = (*postgresRepo)(nil)
|
||||
var _ consumers.BlockingConsumer = (*postgresRepo)(nil)
|
||||
|
||||
type postgresRepo struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
// New returns new PostgreSQL writer.
|
||||
func New(db *sqlx.DB) consumers.Consumer {
|
||||
func New(db *sqlx.DB) consumers.BlockingConsumer {
|
||||
return &postgresRepo{db: db}
|
||||
}
|
||||
|
||||
func (pr postgresRepo) Consume(message interface{}) (err error) {
|
||||
func (pr postgresRepo) ConsumeBlocking(message interface{}) (err error) {
|
||||
switch m := message.(type) {
|
||||
case mfjson.Messages:
|
||||
return pr.saveJSON(m)
|
||||
|
|
|
@ -67,7 +67,7 @@ func TestSaveSenml(t *testing.T) {
|
|||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,6 @@ func TestSaveJSON(t *testing.T) {
|
|||
msgs.Data = append(msgs.Data, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
|
|
@ -24,18 +24,18 @@ var (
|
|||
errNoTable = errors.New("relation does not exist")
|
||||
)
|
||||
|
||||
var _ consumers.Consumer = (*timescaleRepo)(nil)
|
||||
var _ consumers.BlockingConsumer = (*timescaleRepo)(nil)
|
||||
|
||||
type timescaleRepo struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
// New returns new TimescaleSQL writer.
|
||||
func New(db *sqlx.DB) consumers.Consumer {
|
||||
func New(db *sqlx.DB) consumers.BlockingConsumer {
|
||||
return ×caleRepo{db: db}
|
||||
}
|
||||
|
||||
func (tr timescaleRepo) Consume(message interface{}) (err error) {
|
||||
func (tr *timescaleRepo) ConsumeBlocking(message interface{}) (err error) {
|
||||
switch m := message.(type) {
|
||||
case mfjson.Messages:
|
||||
return tr.saveJSON(m)
|
||||
|
|
|
@ -67,7 +67,7 @@ func TestSaveSenml(t *testing.T) {
|
|||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
||||
|
@ -107,6 +107,6 @@ func TestSaveJSON(t *testing.T) {
|
|||
msgs.Data = append(msgs.Data, msg)
|
||||
}
|
||||
|
||||
err = repo.Consume(msgs)
|
||||
err = repo.ConsumeBlocking(msgs)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
}
|
||||
|
|
|
@ -240,6 +240,7 @@ MF_INFLUXDB_PORT=8086
|
|||
MF_INFLUXDB_HOST=mainflux-influxdb
|
||||
MF_INFLUXDB_ADMIN_USER=mainflux
|
||||
MF_INFLUXDB_ADMIN_PASSWORD=mainflux
|
||||
MF_INFLUXDB_ADMIN_URL="http://localhost:8086"
|
||||
MF_INFLUXDB_PROTOCOL=http
|
||||
MF_INFLUXDB_TIMEOUT=1s
|
||||
MF_INFLUXDB_ORG=mainflux
|
||||
|
|
|
@ -29,6 +29,7 @@ services:
|
|||
MF_INFLUXDB_PROTOCOL: ${MF_INFLUXDB_PROTOCOL}
|
||||
MF_INFLUXDB_ADMIN_USER: ${MF_INFLUXDB_ADMIN_USER}
|
||||
MF_INFLUXDB_ADMIN_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD}
|
||||
MF_INFLUXDB_ADMIN_URL: ${MF_INFLUXDB_ADMIN_URL}
|
||||
MF_INFLUX_READER_SERVER_CERT: ${MF_INFLUX_READER_SERVER_CERT}
|
||||
MF_INFLUX_READER_SERVER_KEY: ${MF_INFLUX_READER_SERVER_KEY}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
|
|
|
@ -24,6 +24,7 @@ services:
|
|||
DOCKER_INFLUXDB_INIT_MODE: ${MF_INFLUXDB_INIT_MODE}
|
||||
DOCKER_INFLUXDB_INIT_USERNAME: ${MF_INFLUXDB_ADMIN_USER}
|
||||
DOCKER_INFLUXDB_INIT_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD}
|
||||
DOCKER_INFLUXDB_ADMIN_URL: ${MF_INFLUXDB_ADMIN_URL}
|
||||
DOCKER_INFLUXDB_INIT_ORG: ${MF_INFLUXDB_ORG}
|
||||
DOCKER_INFLUXDB_INIT_BUCKET: ${MF_INFLUXDB_BUCKET}
|
||||
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${MF_INFLUXDB_TOKEN}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -21,7 +21,7 @@ require (
|
|||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/hashicorp/vault/api v1.8.1
|
||||
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.2
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3
|
||||
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
|
||||
github.com/jackc/pgtype v1.13.0
|
||||
github.com/jackc/pgx/v5 v5.2.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -378,8 +378,8 @@ github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK
|
|||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
|
||||
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0IpXeMSkY/uJa/O/vC4=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
|
||||
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
|
||||
|
|
|
@ -15,14 +15,19 @@ var (
|
|||
)
|
||||
|
||||
type Config struct {
|
||||
Protocol string `env:"PROTOCOL" envDefault:"http"`
|
||||
Host string `env:"HOST" envDefault:"localhost"`
|
||||
Port string `env:"PORT" envDefault:"8086"`
|
||||
Bucket string `env:"BUCKET" envDefault:"mainflux-bucket"`
|
||||
Org string `env:"ORG" envDefault:"mainflux"`
|
||||
Token string `env:"TOKEN" envDefault:"mainflux-token"`
|
||||
DBUrl string `env:"DBURL" envDefault:""`
|
||||
Timeout time.Duration `env:"TIMEOUT" envDefault:"1s"`
|
||||
Protocol string `env:"PROTOCOL" envDefault:"http"`
|
||||
Host string `env:"HOST" envDefault:"localhost"`
|
||||
Port string `env:"PORT" envDefault:"8086"`
|
||||
Username string `env:"ADMIN_USER" envDefault:"mainflux"`
|
||||
Password string `env:"ADMIN_PASSWORD" envDefault:"mainflux"`
|
||||
DbName string `env:"DB" envDefault:"mainflux"`
|
||||
Bucket string `env:"BUCKET" envDefault:"mainflux-bucket"`
|
||||
Org string `env:"ORG" envDefault:"mainflux"`
|
||||
Token string `env:"TOKEN" envDefault:"mainflux-token"`
|
||||
DBUrl string `env:"DBURL" envDefault:""`
|
||||
UserAgent string `env:"USER_AGENT" envDefault:"InfluxDBClient"`
|
||||
Timeout time.Duration `env:"TIMEOUT"` // Influxdb client configuration by default has no timeout duration , this field will not have a fallback default timeout duration. Reference: https://pkg.go.dev/github.com/influxdata/influxdb@v1.10.0/client/v2#HTTPConfig
|
||||
InsecureSkipVerify bool `env:"INSECURE_SKIP_VERIFY" envDefault:"false"`
|
||||
}
|
||||
|
||||
// Setup load configuration from environment variable, create InfluxDB client and connect to InfluxDB server
|
||||
|
@ -36,7 +41,10 @@ func Setup(envPrefix string, ctx context.Context) (influxdb2.Client, error) {
|
|||
|
||||
// Connect create InfluxDB client and connect to InfluxDB server
|
||||
func Connect(config Config, ctx context.Context) (influxdb2.Client, error) {
|
||||
client := influxdb2.NewClient(config.DBUrl, config.Token)
|
||||
client := influxdb2.NewClientWithOptions(config.DBUrl, config.Token,
|
||||
influxdb2.DefaultOptions().
|
||||
SetUseGZip(true).
|
||||
SetFlushInterval(100))
|
||||
ctx, cancel := context.WithTimeout(ctx, config.Timeout)
|
||||
defer cancel()
|
||||
if _, err := client.Ready(ctx); err != nil {
|
||||
|
|
|
@ -114,7 +114,7 @@ func TestReadSenml(t *testing.T) {
|
|||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
err = writer.Consume(messages)
|
||||
err = writer.ConsumeBlocking(messages)
|
||||
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))
|
||||
|
||||
reader := creader.New(session)
|
||||
|
@ -450,7 +450,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs1 = append(msgs1, m)
|
||||
}
|
||||
err = writer.Consume(messages1)
|
||||
|
||||
err = writer.ConsumeBlocking(messages1)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
id2, err := idProvider.ID()
|
||||
|
@ -483,7 +484,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs2 = append(msgs2, m)
|
||||
}
|
||||
err = writer.Consume(messages2)
|
||||
|
||||
err = writer.ConsumeBlocking(messages2)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
httpMsgs := []map[string]interface{}{}
|
||||
|
|
|
@ -50,7 +50,7 @@ var (
|
|||
)
|
||||
|
||||
func TestReadSenml(t *testing.T) {
|
||||
writer := iwriter.New(client, repoCfg, true)
|
||||
asyncWriter := iwriter.NewAsync(client, repoCfg)
|
||||
|
||||
chanID, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
@ -110,8 +110,10 @@ func TestReadSenml(t *testing.T) {
|
|||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
err = writer.Consume(messages)
|
||||
require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err))
|
||||
errs := asyncWriter.Errors()
|
||||
asyncWriter.ConsumeAsync(messages)
|
||||
err = <-errs
|
||||
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
reader := ireader.New(client, repoCfg)
|
||||
|
||||
|
@ -407,7 +409,7 @@ func TestReadSenml(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestReadJSON(t *testing.T) {
|
||||
writer := iwriter.New(client, repoCfg, true)
|
||||
asyncWriter := iwriter.NewAsync(client, repoCfg)
|
||||
|
||||
id1, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
@ -432,8 +434,11 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(m)
|
||||
msgs1 = append(msgs1, m)
|
||||
}
|
||||
err = writer.Consume(messages1)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
errs := asyncWriter.Errors()
|
||||
asyncWriter.ConsumeAsync(messages1)
|
||||
err = <-errs
|
||||
require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
id2, err := idProvider.ID()
|
||||
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
@ -460,8 +465,11 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs2 = append(msgs2, m)
|
||||
}
|
||||
err = writer.Consume(messages2)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
// Test async
|
||||
asyncWriter.ConsumeAsync(messages2)
|
||||
err = <-errs
|
||||
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
|
||||
|
||||
httpMsgs := []map[string]interface{}{}
|
||||
for i := 0; i < msgsNum; i += 2 {
|
||||
|
|
|
@ -109,7 +109,7 @@ func TestReadSenml(t *testing.T) {
|
|||
}
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
err = writer.Consume(messages)
|
||||
err = writer.ConsumeBlocking(messages)
|
||||
require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err))
|
||||
reader := mreader.New(db)
|
||||
|
||||
|
@ -416,7 +416,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs1 = append(msgs1, m)
|
||||
}
|
||||
err = writer.Consume(messages1)
|
||||
|
||||
err = writer.ConsumeBlocking(messages1)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
id2, err := idProvider.ID()
|
||||
|
@ -449,7 +450,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs2 = append(msgs2, m)
|
||||
}
|
||||
err = writer.Consume(messages2)
|
||||
|
||||
err = writer.ConsumeBlocking(messages2)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
httpMsgs := []map[string]interface{}{}
|
||||
|
|
|
@ -99,7 +99,7 @@ func TestReadSenml(t *testing.T) {
|
|||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
err = writer.Consume(messages)
|
||||
err = writer.ConsumeBlocking(messages)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
reader := preader.New(db)
|
||||
|
@ -408,7 +408,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs1 = append(msgs1, m)
|
||||
}
|
||||
err = writer.Consume(messages1)
|
||||
|
||||
err = writer.ConsumeBlocking(messages1)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
id2, err := idProvider.ID()
|
||||
|
@ -438,7 +439,8 @@ func TestReadJSON(t *testing.T) {
|
|||
m := toMap(msg)
|
||||
msgs2 = append(msgs2, m)
|
||||
}
|
||||
err = writer.Consume(messages2)
|
||||
|
||||
err = writer.ConsumeBlocking(messages2)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
httpMsgs := []map[string]interface{}{}
|
||||
|
|
|
@ -98,7 +98,7 @@ func TestReadSenml(t *testing.T) {
|
|||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
err = writer.Consume(messages)
|
||||
err = writer.ConsumeBlocking(messages)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
reader := treader.New(db)
|
||||
|
@ -410,7 +410,8 @@ func TestReadJSON(t *testing.T) {
|
|||
mapped := toMap(msg)
|
||||
msgs1 = append(msgs1, mapped)
|
||||
}
|
||||
err = writer.Consume(messages1)
|
||||
|
||||
err = writer.ConsumeBlocking(messages1)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
id2, err := idProvider.ID()
|
||||
|
@ -441,7 +442,8 @@ func TestReadJSON(t *testing.T) {
|
|||
mapped := toMap(msg)
|
||||
msgs2 = append(msgs2, mapped)
|
||||
}
|
||||
err = writer.Consume(messages2)
|
||||
|
||||
err = writer.ConsumeBlocking(messages2)
|
||||
require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
|
||||
httpMsgs := []map[string]interface{}{}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
## 2.12.3 [2023-03-29]
|
||||
### Bug fixes
|
||||
- Update golang.org/x/net from 0.0.0-20210119194325-5f4716e94777 to 0.7.0
|
||||
|
||||
## 2.12.2 [2023-01-26]
|
||||
### Bug fixes
|
||||
- [#368](https://github.com/influxdata/influxdb-client-go/pull/368) Allowing proxy from environment variable
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
|
||||
const (
|
||||
// Version defines current version
|
||||
Version = "2.12.2"
|
||||
Version = "2.12.3"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -270,7 +270,9 @@ github.com/imdario/mergo
|
|||
# github.com/inconshreveable/mousetrap v1.0.1
|
||||
## explicit; go 1.18
|
||||
github.com/inconshreveable/mousetrap
|
||||
# github.com/influxdata/influxdb-client-go/v2 v2.12.2
|
||||
# github.com/influxdata/influxdb-client-go v1.4.0
|
||||
## explicit; go 1.13
|
||||
# github.com/influxdata/influxdb-client-go/v2 v2.12.3
|
||||
## explicit; go 1.17
|
||||
github.com/influxdata/influxdb-client-go/v2
|
||||
github.com/influxdata/influxdb-client-go/v2/api
|
||||
|
|
Loading…
Reference in New Issue