Mainflux.mainflux/consumers/messages.go

160 lines
3.9 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package consumers
import (
"context"
"fmt"
"os"
"strings"
"github.com/mainflux/mainflux/internal/apiutil"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/pelletier/go-toml"
)
const (
defContentType = "application/senml+json"
defFormat = "senml"
)
var (
errOpenConfFile = errors.New("unable to open configuration file")
errParseConfFile = errors.New("unable to parse configuration file")
)
// Start method starts consuming messages received from Message broker.
// This method transforms messages to SenML format before
// using MessageRepository to store them.
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error {
cfg, err := loadConfig(configPath)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
}
transformer := makeTransformer(cfg.TransformerCfg, logger)
for _, subject := range cfg.SubscriberCfg.Subjects {
subCfg := messaging.SubscriberConfig{
ID: id,
Topic: subject,
DeliveryPolicy: messaging.DeliverAllPolicy,
}
switch c := consumer.(type) {
case AsyncConsumer:
subCfg.Handler = handleAsync(ctx, transformer, c)
if err := sub.Subscribe(ctx, subCfg); err != nil {
return err
}
case BlockingConsumer:
subCfg.Handler = handleSync(ctx, transformer, c)
if err := sub.Subscribe(ctx, subCfg); err != nil {
return err
}
default:
return apiutil.ErrInvalidQueryParams
}
}
return nil
}
func handleSync(ctx context.Context, t transformers.Transformer, sc BlockingConsumer) handleFunc {
return func(msg *messaging.Message) error {
m := interface{}(msg)
var err error
if t != nil {
m, err = t.Transform(msg)
if err != nil {
return err
}
}
return sc.ConsumeBlocking(ctx, m)
}
}
func handleAsync(ctx context.Context, t transformers.Transformer, ac AsyncConsumer) handleFunc {
return func(msg *messaging.Message) error {
m := interface{}(msg)
var err error
if t != nil {
m, err = t.Transform(msg)
if err != nil {
return err
}
}
ac.ConsumeAsync(ctx, m)
return nil
}
}
type handleFunc func(msg *messaging.Message) error
func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)
}
func (h handleFunc) Cancel() error {
return nil
}
type subscriberConfig struct {
Subjects []string `toml:"subjects"`
}
type transformerConfig struct {
Format string `toml:"format"`
ContentType string `toml:"content_type"`
TimeFields []json.TimeField `toml:"time_fields"`
}
type config struct {
SubscriberCfg subscriberConfig `toml:"subscriber"`
TransformerCfg transformerConfig `toml:"transformer"`
}
func loadConfig(configPath string) (config, error) {
cfg := config{
SubscriberCfg: subscriberConfig{
Subjects: []string{brokers.SubjectAllChannels},
},
TransformerCfg: transformerConfig{
Format: defFormat,
ContentType: defContentType,
},
}
data, err := os.ReadFile(configPath)
if err != nil {
return cfg, errors.Wrap(errOpenConfFile, err)
}
if err := toml.Unmarshal(data, &cfg); err != nil {
return cfg, errors.Wrap(errParseConfFile, err)
}
return cfg, nil
}
func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.Format) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.ContentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New(cfg.TimeFields)
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
os.Exit(1)
return nil
}
}