From b18c9e79dcb94dd59b37be2fe175f7f24ab3a4a4 Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Tue, 14 Dec 2021 11:17:23 +0100 Subject: [PATCH] NOISSUE - Add timestamp transformation rules for specifc JSON fields (#1514) * NOISSUE - Add timestamp transformation rules for specifc JSON fields Signed-off-by: Manuel Imperiale * Add tests and defaults Signed-off-by: Manuel Imperiale * Fix reviews Signed-off-by: Manuel Imperiale * Fix Created Signed-off-by: Manuel Imperiale * Fix READMEs Signed-off-by: Manuel Imperiale * Fix Config file Signed-off-by: Manuel Imperiale * Improve the timestamp Config structure Signed-off-by: Manuel Imperiale * Fix typos Signed-off-by: Manuel Imperiale * Fix smpp Signed-off-by: Manuel Imperiale * Fix reviews Signed-off-by: Manuel Imperiale * Rm duplicated Unmarshal Signed-off-by: Manuel Imperiale * Extract the error var Signed-off-by: Manuel Imperiale * Fix review Signed-off-by: Manuel Imperiale * Return error if time field transformation fails Signed-off-by: Manuel Imperiale --- cmd/cassandra-writer/main.go | 85 ++++------ cmd/influxdb-writer/main.go | 102 +++++------- cmd/mongodb-writer/main.go | 86 ++++------ cmd/postgres-writer/main.go | 50 ++---- cmd/smpp-notifier/main.go | 2 +- cmd/smtp-notifier/main.go | 2 +- consumers/messages.go | 72 +++++++-- consumers/notifiers/smpp/notifier.go | 4 +- consumers/writers/cassandra/README.md | 30 ++-- consumers/writers/influxdb/README.md | 27 ++-- consumers/writers/mongodb/README.md | 24 ++- consumers/writers/postgres/README.md | 38 ++--- docker/.env | 8 - docker/addons/cassandra-writer/config.toml | 15 +- .../cassandra-writer/docker-compose.yml | 1 - docker/addons/influxdb-writer/config.toml | 15 +- .../addons/influxdb-writer/docker-compose.yml | 1 - docker/addons/mongodb-writer/config.toml | 15 +- .../addons/mongodb-writer/docker-compose.yml | 1 - docker/addons/postgres-writer/config.toml | 15 +- .../addons/postgres-writer/docker-compose.yml | 1 - docker/addons/smpp-notifier/config.toml | 4 +- docker/addons/smtp-notifier/config.toml | 4 +- pkg/transformers/json/time.go | 152 ++++++++++++++++++ pkg/transformers/json/transformer.go | 78 +++++++-- pkg/transformers/json/transformer_test.go | 95 ++++++++++- 26 files changed, 582 insertions(+), 345 deletions(-) create mode 100644 pkg/transformers/json/time.go diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index ccd31f24..ffa3844a 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -21,9 +21,6 @@ import ( "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" - "github.com/mainflux/mainflux/pkg/transformers/json" - "github.com/mainflux/mainflux/pkg/transformers/senml" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -31,39 +28,33 @@ const ( svcName = "cassandra-writer" sep = "," - defNatsURL = "nats://localhost:4222" - defLogLevel = "error" - defPort = "8180" - defCluster = "127.0.0.1" - defKeyspace = "mainflux" - defDBUser = "mainflux" - defDBPass = "mainflux" - defDBPort = "9042" - defConfigPath = "/config.toml" - defContentType = "application/senml+json" - defTransformer = "senml" + defNatsURL = "nats://localhost:4222" + defLogLevel = "error" + defPort = "8180" + defCluster = "127.0.0.1" + defKeyspace = "mainflux" + defDBUser = "mainflux" + defDBPass = "mainflux" + defDBPort = "9042" + defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" - envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL" - envPort = "MF_CASSANDRA_WRITER_PORT" - envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER" - envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE" - envDBUser = "MF_CASSANDRA_WRITER_DB_USER" - envDBPass = "MF_CASSANDRA_WRITER_DB_PASS" - envDBPort = "MF_CASSANDRA_WRITER_DB_PORT" - envConfigPath = "MF_CASSANDRA_WRITER_CONFIG_PATH" - envContentType = "MF_CASSANDRA_WRITER_CONTENT_TYPE" - envTransformer = "MF_CASSANDRA_WRITER_TRANSFORMER" + envNatsURL = "MF_NATS_URL" + envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL" + envPort = "MF_CASSANDRA_WRITER_PORT" + envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER" + envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE" + envDBUser = "MF_CASSANDRA_WRITER_DB_USER" + envDBPass = "MF_CASSANDRA_WRITER_DB_PASS" + envDBPort = "MF_CASSANDRA_WRITER_DB_PORT" + envConfigPath = "MF_CASSANDRA_WRITER_CONFIG_PATH" ) type config struct { - natsURL string - logLevel string - port string - configPath string - contentType string - transformer string - dbCfg cassandra.DBConfig + natsURL string + logLevel string + port string + configPath string + dbCfg cassandra.DBConfig } func main() { @@ -85,9 +76,8 @@ func main() { defer session.Close() repo := newService(session, logger) - t := makeTransformer(cfg, logger) - if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } @@ -120,13 +110,11 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - configPath: mainflux.Env(envConfigPath, defConfigPath), - contentType: mainflux.Env(envContentType, defContentType), - transformer: mainflux.Env(envTransformer, defTransformer), - dbCfg: dbCfg, + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + configPath: mainflux.Env(envConfigPath, defConfigPath), + dbCfg: dbCfg, } } @@ -162,21 +150,6 @@ func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer return repo } -func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer { - switch strings.ToUpper(cfg.transformer) { - case "SENML": - logger.Info("Using SenML transformer") - return senml.New(cfg.contentType) - case "JSON": - logger.Info("Using JSON transformer") - return json.New() - default: - logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer)) - os.Exit(1) - return nil - } -} - func startHTTPServer(port string, errs chan error, logger logger.Logger) { p := fmt.Sprintf(":%s", port) logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port)) diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index e664799b..0a43f4a1 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -9,7 +9,6 @@ import ( "net/http" "os" "os/signal" - "strings" "syscall" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -20,52 +19,43 @@ import ( "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" - "github.com/mainflux/mainflux/pkg/transformers/json" - "github.com/mainflux/mainflux/pkg/transformers/senml" stdprometheus "github.com/prometheus/client_golang/prometheus" ) const ( svcName = "influxdb-writer" - defNatsURL = "nats://localhost:4222" - defLogLevel = "error" - defPort = "8180" - defDB = "mainflux" - defDBHost = "localhost" - defDBPort = "8086" - defDBUser = "mainflux" - defDBPass = "mainflux" - defConfigPath = "/config.toml" - defContentType = "application/senml+json" - defTransformer = "senml" + defNatsURL = "nats://localhost:4222" + defLogLevel = "error" + defPort = "8180" + defDB = "mainflux" + defDBHost = "localhost" + defDBPort = "8086" + defDBUser = "mainflux" + defDBPass = "mainflux" + defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" - envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" - envPort = "MF_INFLUX_WRITER_PORT" - envDB = "MF_INFLUXDB_DB" - envDBHost = "MF_INFLUX_WRITER_DB_HOST" - envDBPort = "MF_INFLUXDB_PORT" - envDBUser = "MF_INFLUXDB_ADMIN_USER" - envDBPass = "MF_INFLUXDB_ADMIN_PASSWORD" - envConfigPath = "MF_INFLUX_WRITER_CONFIG_PATH" - envContentType = "MF_INFLUX_WRITER_CONTENT_TYPE" - envTransformer = "MF_INFLUX_WRITER_TRANSFORMER" + envNatsURL = "MF_NATS_URL" + envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" + envPort = "MF_INFLUX_WRITER_PORT" + envDB = "MF_INFLUXDB_DB" + envDBHost = "MF_INFLUX_WRITER_DB_HOST" + envDBPort = "MF_INFLUXDB_PORT" + envDBUser = "MF_INFLUXDB_ADMIN_USER" + envDBPass = "MF_INFLUXDB_ADMIN_PASSWORD" + envConfigPath = "MF_INFLUX_WRITER_CONFIG_PATH" ) type config struct { - natsURL string - logLevel string - port string - dbName string - dbHost string - dbPort string - dbUser string - dbPass string - configPath string - contentType string - transformer string + natsURL string + logLevel string + port string + dbName string + dbHost string + dbPort string + dbUser string + dbPass string + configPath string } func main() { @@ -95,9 +85,8 @@ func main() { counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - t := makeTransformer(cfg, logger) - if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err)) os.Exit(1) } @@ -117,17 +106,15 @@ func main() { func loadConfigs() (config, influxdata.HTTPConfig) { cfg := config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - dbName: mainflux.Env(envDB, defDB), - dbHost: mainflux.Env(envDBHost, defDBHost), - dbPort: mainflux.Env(envDBPort, defDBPort), - dbUser: mainflux.Env(envDBUser, defDBUser), - dbPass: mainflux.Env(envDBPass, defDBPass), - configPath: mainflux.Env(envConfigPath, defConfigPath), - contentType: mainflux.Env(envContentType, defContentType), - transformer: mainflux.Env(envTransformer, defTransformer), + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + dbName: mainflux.Env(envDB, defDB), + dbHost: mainflux.Env(envDBHost, defDBHost), + dbPort: mainflux.Env(envDBPort, defDBPort), + dbUser: mainflux.Env(envDBUser, defDBUser), + dbPass: mainflux.Env(envDBPass, defDBPass), + configPath: mainflux.Env(envConfigPath, defConfigPath), } clientCfg := influxdata.HTTPConfig{ @@ -157,21 +144,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) { return counter, latency } -func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer { - switch strings.ToUpper(cfg.transformer) { - case "SENML": - logger.Info("Using SenML transformer") - return senml.New(cfg.contentType) - case "JSON": - logger.Info("Using JSON transformer") - return json.New() - default: - logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer)) - os.Exit(1) - return nil - } -} - func startHTTPService(port string, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", port) logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p)) diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 28c1ae79..1dad9f1e 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -10,7 +10,6 @@ import ( "net/http" "os" "os/signal" - "strings" "syscall" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -20,9 +19,6 @@ import ( "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" - "github.com/mainflux/mainflux/pkg/transformers/json" - "github.com/mainflux/mainflux/pkg/transformers/senml" stdprometheus "github.com/prometheus/client_golang/prometheus" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -31,37 +27,31 @@ import ( const ( svcName = "mongodb-writer" - defLogLevel = "error" - defNatsURL = "nats://localhost:4222" - defPort = "8180" - defDB = "mainflux" - defDBHost = "localhost" - defDBPort = "27017" - defConfigPath = "/config.toml" - defContentType = "application/senml+json" - defTransformer = "senml" + defLogLevel = "error" + defNatsURL = "nats://localhost:4222" + defPort = "8180" + defDB = "mainflux" + defDBHost = "localhost" + defDBPort = "27017" + defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" - envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL" - envPort = "MF_MONGO_WRITER_PORT" - envDB = "MF_MONGO_WRITER_DB" - envDBHost = "MF_MONGO_WRITER_DB_HOST" - envDBPort = "MF_MONGO_WRITER_DB_PORT" - envConfigPath = "MF_MONGO_WRITER_CONFIG_PATH" - envContentType = "MF_MONGO_WRITER_CONTENT_TYPE" - envTransformer = "MF_MONGO_WRITER_TRANSFORMER" + envNatsURL = "MF_NATS_URL" + envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL" + envPort = "MF_MONGO_WRITER_PORT" + envDB = "MF_MONGO_WRITER_DB" + envDBHost = "MF_MONGO_WRITER_DB_HOST" + envDBPort = "MF_MONGO_WRITER_DB_PORT" + envConfigPath = "MF_MONGO_WRITER_CONFIG_PATH" ) type config struct { - natsURL string - logLevel string - port string - dbName string - dbHost string - dbPort string - configPath string - contentType string - transformer string + natsURL string + logLevel string + port string + dbName string + dbHost string + dbPort string + configPath string } func main() { @@ -92,9 +82,8 @@ func main() { counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - t := makeTransformer(cfg, logger) - if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err)) os.Exit(1) } @@ -114,15 +103,13 @@ func main() { func loadConfigs() config { return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - dbName: mainflux.Env(envDB, defDB), - dbHost: mainflux.Env(envDBHost, defDBHost), - dbPort: mainflux.Env(envDBPort, defDBPort), - configPath: mainflux.Env(envConfigPath, defConfigPath), - contentType: mainflux.Env(envContentType, defContentType), - transformer: mainflux.Env(envTransformer, defTransformer), + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + dbName: mainflux.Env(envDB, defDB), + dbHost: mainflux.Env(envDBHost, defDBHost), + dbPort: mainflux.Env(envDBPort, defDBPort), + configPath: mainflux.Env(envConfigPath, defConfigPath), } } @@ -144,21 +131,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) { return counter, latency } -func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer { - switch strings.ToUpper(cfg.transformer) { - case "SENML": - logger.Info("Using SenML transformer") - return senml.New(cfg.contentType) - case "JSON": - logger.Info("Using JSON transformer") - return json.New() - default: - logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer)) - os.Exit(1) - return nil - } -} - func startHTTPService(port string, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", port) logger.Info(fmt.Sprintf("Mongodb writer service started, exposed port %s", p)) diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 4e883a5c..49d37263 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -9,7 +9,6 @@ import ( "net/http" "os" "os/signal" - "strings" "syscall" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -20,9 +19,6 @@ import ( "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" - "github.com/mainflux/mainflux/pkg/transformers/json" - "github.com/mainflux/mainflux/pkg/transformers/senml" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -43,8 +39,6 @@ const ( defDBSSLKey = "" defDBSSLRootCert = "" defConfigPath = "/config.toml" - defContentType = "application/senml+json" - defTransformer = "senml" envNatsURL = "MF_NATS_URL" envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL" @@ -59,18 +53,14 @@ const ( envDBSSLKey = "MF_POSTGRES_WRITER_DB_SSL_KEY" envDBSSLRootCert = "MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT" envConfigPath = "MF_POSTGRES_WRITER_CONFIG_PATH" - envContentType = "MF_POSTGRES_WRITER_CONTENT_TYPE" - envTransformer = "MF_POSTGRES_WRITER_TRANSFORMER" ) type config struct { - natsURL string - logLevel string - port string - configPath string - contentType string - transformer string - dbConfig postgres.Config + natsURL string + logLevel string + port string + configPath string + dbConfig postgres.Config } func main() { @@ -92,9 +82,8 @@ func main() { defer db.Close() repo := newService(db, logger) - t := makeTransformer(cfg, logger) - if err = consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil { + if err = consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } @@ -126,13 +115,11 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - configPath: mainflux.Env(envConfigPath, defConfigPath), - contentType: mainflux.Env(envContentType, defContentType), - transformer: mainflux.Env(envTransformer, defTransformer), - dbConfig: dbConfig, + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + configPath: mainflux.Env(envConfigPath, defConfigPath), + dbConfig: dbConfig, } } @@ -167,21 +154,6 @@ func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer { return svc } -func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer { - switch strings.ToUpper(cfg.transformer) { - case "SENML": - logger.Info("Using SenML transformer") - return senml.New(cfg.contentType) - case "JSON": - logger.Info("Using JSON transformer") - return json.New() - default: - logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer)) - os.Exit(1) - return nil - } -} - func startHTTPServer(port string, errs chan error, logger logger.Logger) { p := fmt.Sprintf(":%s", port) logger.Info(fmt.Sprintf("Postgres writer service started, exposed port %s", port)) diff --git a/cmd/smpp-notifier/main.go b/cmd/smpp-notifier/main.go index cb764ffd..8ce1fba9 100644 --- a/cmd/smpp-notifier/main.go +++ b/cmd/smpp-notifier/main.go @@ -154,7 +154,7 @@ func main() { svc := newService(db, dbTracer, auth, cfg, logger) errs := make(chan error, 2) - if err = consumers.Start(pubSub, svc, nil, cfg.configPath, logger); err != nil { + if err = consumers.Start(pubSub, svc, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } diff --git a/cmd/smtp-notifier/main.go b/cmd/smtp-notifier/main.go index 8007ad5b..50053e97 100644 --- a/cmd/smtp-notifier/main.go +++ b/cmd/smtp-notifier/main.go @@ -154,7 +154,7 @@ func main() { svc := newService(db, dbTracer, auth, cfg, logger) errs := make(chan error, 2) - if err = consumers.Start(pubSub, svc, nil, cfg.configPath, logger); err != nil { + if err = consumers.Start(pubSub, svc, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } diff --git a/consumers/messages.go b/consumers/messages.go index e30cf081..2a4b85fe 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -6,6 +6,8 @@ package consumers import ( "fmt" "io/ioutil" + "os" + "strings" "github.com/pelletier/go-toml" @@ -14,6 +16,13 @@ import ( "github.com/mainflux/mainflux/pkg/messaging" pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers" + "github.com/mainflux/mainflux/pkg/transformers/json" + "github.com/mainflux/mainflux/pkg/transformers/senml" +) + +const ( + defContentType = "application/senml+json" + defFormat = "senml" ) var ( @@ -24,13 +33,15 @@ var ( // Start method starts consuming messages received from NATS. // This method transforms messages to SenML format before // using MessageRepository to store them. -func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { - subjects, err := loadSubjectsConfig(subjectsCfgPath) +func Start(sub messaging.Subscriber, consumer Consumer, configPath string, logger logger.Logger) error { + cfg, err := loadConfig(configPath) if err != nil { - logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) + logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err)) } - for _, subject := range subjects { + transformer := makeTransformer(cfg.TransformerCfg, logger) + + for _, subject := range cfg.SubscriberCfg.Subjects { if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil { return err } @@ -52,24 +63,55 @@ func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler { } } -type filterConfig struct { - Filter []string `toml:"filter"` +type subscriberConfig struct { + Subjects []string `toml:"subjects"` } -type subjectsConfig struct { - Subjects filterConfig `toml:"subjects"` +type transformerConfig struct { + Format string `toml:"format"` + ContentType string `toml:"content_type"` + TimeFields []json.TimeField `toml:"time_fields"` } -func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) { - data, err := ioutil.ReadFile(subjectsConfigPath) +type config struct { + SubscriberCfg subscriberConfig `toml:"subscriber"` + TransformerCfg transformerConfig `toml:"transformer"` +} + +func loadConfig(configPath string) (config, error) { + cfg := config{ + SubscriberCfg: subscriberConfig{ + Subjects: []string{pubsub.SubjectAllChannels}, + }, + TransformerCfg: transformerConfig{ + Format: defFormat, + ContentType: defContentType, + }, + } + + data, err := ioutil.ReadFile(configPath) if err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err) + return cfg, errors.Wrap(errOpenConfFile, err) } - var subjectsCfg subjectsConfig - if err := toml.Unmarshal(data, &subjectsCfg); err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err) + if err := toml.Unmarshal(data, &cfg); err != nil { + return cfg, errors.Wrap(errParseConfFile, err) } - return subjectsCfg.Subjects.Filter, nil + return cfg, nil +} + +func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer { + switch strings.ToUpper(cfg.Format) { + case "SENML": + logger.Info("Using SenML transformer") + return senml.New(cfg.ContentType) + case "JSON": + logger.Info("Using JSON transformer") + return json.New(cfg.TimeFields) + default: + logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format)) + os.Exit(1) + return nil + } } diff --git a/consumers/notifiers/smpp/notifier.go b/consumers/notifiers/smpp/notifier.go index 74fca912..36e6c2c2 100644 --- a/consumers/notifiers/smpp/notifier.go +++ b/consumers/notifiers/smpp/notifier.go @@ -19,7 +19,7 @@ var _ notifiers.Notifier = (*notifier)(nil) type notifier struct { transmitter *smpp.Transmitter - tranformer transformers.Transformer + transformer transformers.Transformer sourceAddrTON uint8 sourceAddrNPI uint8 destAddrTON uint8 @@ -38,7 +38,7 @@ func New(cfg Config) notifiers.Notifier { t.Bind() ret := ¬ifier{ transmitter: t, - tranformer: json.New(), + transformer: json.New([]json.TimeField{}), sourceAddrTON: cfg.SourceAddrTON, destAddrTON: cfg.DestAddrTON, sourceAddrNPI: cfg.SourceAddrNPI, diff --git a/consumers/writers/cassandra/README.md b/consumers/writers/cassandra/README.md index 27fc32ed..3ff9dc7f 100644 --- a/consumers/writers/cassandra/README.md +++ b/consumers/writers/cassandra/README.md @@ -8,23 +8,20 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| -------------------------------- | --------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error | -| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 | -| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 | -| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux | -| MF_CASSANDRA_WRITER_DB_USER | Cassandra DB username | | -| MF_CASSANDRA_WRITER_DB_PASS | Cassandra DB password | | -| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 | -| MF_CASSANDRA_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | -| MF_CASSANDRA_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json | -| MF_CASSANDRA_WRITER_TRANSFORMER | Message transformer type | senml | +| Variable | Description | Default | +| -------------------------------- | ----------------------------------------------------------------------- | --------------------- | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error | +| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 | +| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 | +| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux | +| MF_CASSANDRA_WRITER_DB_USER | Cassandra DB username | | +| MF_CASSANDRA_WRITER_DB_PASS | Cassandra DB password | | +| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 | +| MF_CASSANDRA_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | ## Deployment -The service itself is distributed as Docker container. Check the [`cassandra-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/cassandra-writer/docker-compose.yml#L30-L49) service section in -docker-compose to see how service is deployed. +The service itself is distributed as Docker container. Check the [`cassandra-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/cassandra-writer/docker-compose.yml#L30-L49) service section in docker-compose to see how service is deployed. To start the service, execute the following shell script: @@ -49,8 +46,7 @@ MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] \ MF_CASSANDRA_READER_DB_USER=[Cassandra DB username] \ MF_CASSANDRA_READER_DB_PASS=[Cassandra DB password] \ MF_CASSANDRA_READER_DB_PORT=[Cassandra DB port] \ -MF_CASSANDRA_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \ -MF_CASSANDRA_WRITER_TRANSFORMER=[Message transformer type] \ +MF_CASSANDRA_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \ $GOBIN/mainflux-cassandra-writer ``` diff --git a/consumers/writers/influxdb/README.md b/consumers/writers/influxdb/README.md index 29d5a47f..4f62f1e6 100644 --- a/consumers/writers/influxdb/README.md +++ b/consumers/writers/influxdb/README.md @@ -8,19 +8,17 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------- | -------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | -| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | -| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | -| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 | -| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux | -| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux | -| MF_INFLUXDB_DB | InfluxDB database name | mainflux | -| MF_INFLUX_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /configs.toml | -| MF_INFLUX_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json | -| MF_INFLUX_WRITER_TRANSFORMER | Message transformer type | senml | +| Variable | Description | Default | +| ----------------------------- | ----------------------------------------------------------------------- | ---------------------- | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | +| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | +| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | +| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 | +| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux | +| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux | +| MF_INFLUXDB_DB | InfluxDB database name | mainflux | +| MF_INFLUX_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /configs.toml | ## Deployment @@ -49,8 +47,7 @@ MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] \ MF_INFLUXDB_PORT=[InfluxDB database port] \ MF_INFLUXDB_ADMIN_USER=[InfluxDB admin user] \ MF_INFLUXDB_ADMIN_PASSWORD=[InfluxDB admin password] \ -MF_INFLUX_WRITER_CONFIG_PATH=[Configuration file path with filters list] \ -MF_POSTGRES_WRITER_TRANSFORMER=[Message transformer type] \ +MF_INFLUX_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \ $GOBIN/mainflux-influxdb ``` diff --git a/consumers/writers/mongodb/README.md b/consumers/writers/mongodb/README.md index 3cb4d5a9..6c46eb72 100644 --- a/consumers/writers/mongodb/README.md +++ b/consumers/writers/mongodb/README.md @@ -8,22 +8,19 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------- | ----------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error | -| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 | -| MF_MONGO_WRITER_DB | Default MongoDB database name | messages | -| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost | -| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 | -| MF_MONGO_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | -| MF_MONGO_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json | -| MF_MONGO_WRITER_TRANSFORMER | Message transformer type | senml | +| Variable | Description | Default | +| ---------------------------- | ----------------------------------------------------------------------- | ---------------------- | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error | +| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 | +| MF_MONGO_WRITER_DB | Default MongoDB database name | messages | +| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost | +| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 | +| MF_MONGO_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | ## Deployment -The service itself is distributed as Docker container. Check the [`mongodb-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/mongodb-writer/docker-compose.yml#L36-L55) service section in -docker-compose to see how service is deployed. +The service itself is distributed as Docker container. Check the [`mongodb-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/mongodb-writer/docker-compose.yml#L36-L55) service section in docker-compose to see how service is deployed. To start the service, execute the following shell script: @@ -47,7 +44,6 @@ MF_MONGO_WRITER_DB=[MongoDB database name] \ MF_MONGO_WRITER_DB_HOST=[MongoDB database host] \ MF_MONGO_WRITER_DB_PORT=[MongoDB database port] \ MF_MONGO_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \ -MF_MONGO_WRITER_TRANSFORMER=[Transformer type to be used] \ $GOBIN/mainflux-mongodb-writer ``` diff --git a/consumers/writers/postgres/README.md b/consumers/writers/postgres/README.md index 5a0854fd..a1f4957d 100644 --- a/consumers/writers/postgres/README.md +++ b/consumers/writers/postgres/README.md @@ -8,28 +8,25 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------------- | ----------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error | -| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 | -| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres | -| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 | -| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux | -| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux | -| MF_POSTGRES_WRITER_DB | Postgres database name | messages | -| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled | -| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" | -| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" | -| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | -| MF_POSTGRES_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | -| MF_POSTGRES_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json | -| MF_POSTGRES_WRITER_TRANSFORMER | Message transformer type | senml | +| Variable | Description | Default | +| ----------------------------------- | ----------------------------------------------------------------------- | ---------------------- | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error | +| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 | +| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres | +| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 | +| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux | +| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux | +| MF_POSTGRES_WRITER_DB | Postgres database name | messages | +| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled | +| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" | +| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" | +| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | +| MF_POSTGRES_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | ## Deployment -The service itself is distributed as Docker container. Check the [`postgres-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/postgres-writer/docker-compose.yml#L34-L59) service section in -docker-compose to see how service is deployed. +The service itself is distributed as Docker container. Check the [`postgres-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/postgres-writer/docker-compose.yml#L34-L59) service section in docker-compose to see how service is deployed. To start the service, execute the following shell script: @@ -58,8 +55,7 @@ MF_POSTGRES_WRITER_DB_SSL_MODE=[Postgres SSL mode] \ MF_POSTGRES_WRITER_DB_SSL_CERT=[Postgres SSL cert] \ MF_POSTGRES_WRITER_DB_SSL_KEY=[Postgres SSL key] \ MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] \ -MF_POSTGRES_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \ -MF_POSTGRES_WRITER_TRANSFORMER=[Message transformer type] \ +MF_POSTGRES_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \ $GOBIN/mainflux-postgres-writer ``` diff --git a/docker/.env b/docker/.env index 6b17eb10..3f932310 100644 --- a/docker/.env +++ b/docker/.env @@ -206,8 +206,6 @@ MF_CASSANDRA_WRITER_PORT=8902 MF_CASSANDRA_WRITER_DB_PORT=9042 MF_CASSANDRA_WRITER_DB_CLUSTER=mainflux-cassandra MF_CASSANDRA_WRITER_DB_KEYSPACE=mainflux -MF_CASSANDRA_WRITER_CONTENT_TYPE=application/senml+json -MF_CASSANDRA_WRITER_TRANSFORMER=senml ### Cassandra Reader MF_CASSANDRA_READER_LOG_LEVEL=debug @@ -231,8 +229,6 @@ MF_INFLUX_WRITER_PORT=8900 MF_INFLUX_WRITER_BATCH_SIZE=5000 MF_INFLUX_WRITER_BATCH_TIMEOUT=5 MF_INFLUX_WRITER_GRAFANA_PORT=3001 -MF_INFLUX_WRITER_CONTENT_TYPE=application/senml+json -MF_INFLUX_WRITER_TRANSFORMER=senml ### InfluxDB Reader MF_INFLUX_READER_LOG_LEVEL=debug @@ -245,8 +241,6 @@ MF_MONGO_WRITER_LOG_LEVEL=debug MF_MONGO_WRITER_PORT=8901 MF_MONGO_WRITER_DB=mainflux MF_MONGO_WRITER_DB_PORT=27017 -MF_MONGO_WRITER_CONTENT_TYPE=application/senml+json -MF_MONGO_WRITER_TRANSFORMER=senml ### MongoDB Reader MF_MONGO_READER_LOG_LEVEL=debug @@ -267,8 +261,6 @@ MF_POSTGRES_WRITER_DB_SSL_MODE=disable MF_POSTGRES_WRITER_DB_SSL_CERT="" MF_POSTGRES_WRITER_DB_SSL_KEY="" MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT="" -MF_POSTGRES_WRITER_CONTENT_TYPE=application/senml+json -MF_POSTGRES_WRITER_TRANSFORMER=senml ### Postgres Reader MF_POSTGRES_READER_LOG_LEVEL=debug diff --git a/docker/addons/cassandra-writer/config.toml b/docker/addons/cassandra-writer/config.toml index fa1b3569..07cb5f46 100644 --- a/docker/addons/cassandra-writer/config.toml +++ b/docker/addons/cassandra-writer/config.toml @@ -1,5 +1,16 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] + +[transformer] +# SenML or JSON +format = "senml" +# Used if format is SenML +content_type = "application/senml+json" +# Used as timestamp fields if format is JSON +time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"}, + { field_name = "millis_key", field_format = "unix_ms", location = "UTC"}, + { field_name = "micros_key", field_format = "unix_us", location = "UTC"}, + { field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}] diff --git a/docker/addons/cassandra-writer/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml index f333042f..97b3b589 100644 --- a/docker/addons/cassandra-writer/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -40,7 +40,6 @@ services: MF_CASSANDRA_WRITER_DB_PORT: ${MF_CASSANDRA_WRITER_DB_PORT} MF_CASSANDRA_WRITER_DB_CLUSTER: ${MF_CASSANDRA_WRITER_DB_CLUSTER} MF_CASSANDRA_WRITER_DB_KEYSPACE: ${MF_CASSANDRA_WRITER_DB_KEYSPACE} - MF_CASSANDRA_WRITER_TRANSFORMER: ${MF_CASSANDRA_WRITER_TRANSFORMER} ports: - ${MF_CASSANDRA_WRITER_PORT}:${MF_CASSANDRA_WRITER_PORT} networks: diff --git a/docker/addons/influxdb-writer/config.toml b/docker/addons/influxdb-writer/config.toml index fa1b3569..07cb5f46 100644 --- a/docker/addons/influxdb-writer/config.toml +++ b/docker/addons/influxdb-writer/config.toml @@ -1,5 +1,16 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] + +[transformer] +# SenML or JSON +format = "senml" +# Used if format is SenML +content_type = "application/senml+json" +# Used as timestamp fields if format is JSON +time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"}, + { field_name = "millis_key", field_format = "unix_ms", location = "UTC"}, + { field_name = "micros_key", field_format = "unix_us", location = "UTC"}, + { field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}] diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index 50c8ee40..5a721713 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -50,7 +50,6 @@ services: MF_INFLUXDB_PORT: ${MF_INFLUXDB_PORT} MF_INFLUXDB_ADMIN_USER: ${MF_INFLUXDB_ADMIN_USER} MF_INFLUXDB_ADMIN_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD} - MF_INFLUX_WRITER_TRANSFORMER: ${MF_INFLUX_WRITER_TRANSFORMER} ports: - ${MF_INFLUX_WRITER_PORT}:${MF_INFLUX_WRITER_PORT} networks: diff --git a/docker/addons/mongodb-writer/config.toml b/docker/addons/mongodb-writer/config.toml index fa1b3569..07cb5f46 100644 --- a/docker/addons/mongodb-writer/config.toml +++ b/docker/addons/mongodb-writer/config.toml @@ -1,5 +1,16 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] + +[transformer] +# SenML or JSON +format = "senml" +# Used if format is SenML +content_type = "application/senml+json" +# Used as timestamp fields if format is JSON +time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"}, + { field_name = "millis_key", field_format = "unix_ms", location = "UTC"}, + { field_name = "micros_key", field_format = "unix_us", location = "UTC"}, + { field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}] diff --git a/docker/addons/mongodb-writer/docker-compose.yml b/docker/addons/mongodb-writer/docker-compose.yml index f85c1058..16f293fe 100644 --- a/docker/addons/mongodb-writer/docker-compose.yml +++ b/docker/addons/mongodb-writer/docker-compose.yml @@ -46,7 +46,6 @@ services: MF_MONGO_WRITER_DB: ${MF_MONGO_WRITER_DB} MF_MONGO_WRITER_DB_HOST: mongodb MF_MONGO_WRITER_DB_PORT: ${MF_MONGO_WRITER_DB_PORT} - MF_MONGO_WRITER_TRANSFORMER: ${MF_MONGO_WRITER_TRANSFORMER} ports: - ${MF_MONGO_WRITER_PORT}:${MF_MONGO_WRITER_PORT} networks: diff --git a/docker/addons/postgres-writer/config.toml b/docker/addons/postgres-writer/config.toml index fa1b3569..07cb5f46 100644 --- a/docker/addons/postgres-writer/config.toml +++ b/docker/addons/postgres-writer/config.toml @@ -1,5 +1,16 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] + +[transformer] +# SenML or JSON +format = "senml" +# Used if format is SenML +content_type = "application/senml+json" +# Used as timestamp fields if format is JSON +time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"}, + { field_name = "millis_key", field_format = "unix_ms", location = "UTC"}, + { field_name = "micros_key", field_format = "unix_us", location = "UTC"}, + { field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}] diff --git a/docker/addons/postgres-writer/docker-compose.yml b/docker/addons/postgres-writer/docker-compose.yml index 17a80af2..e46ba145 100644 --- a/docker/addons/postgres-writer/docker-compose.yml +++ b/docker/addons/postgres-writer/docker-compose.yml @@ -50,7 +50,6 @@ services: MF_POSTGRES_WRITER_DB_SSL_CERT: ${MF_POSTGRES_WRITER_DB_SSL_CERT} MF_POSTGRES_WRITER_DB_SSL_KEY: ${MF_POSTGRES_WRITER_DB_SSL_KEY} MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT: ${MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT} - MF_POSTGRES_WRITER_TRANSFORMER: ${MF_POSTGRES_WRITER_TRANSFORMER} ports: - ${MF_POSTGRES_WRITER_PORT}:${MF_POSTGRES_WRITER_PORT} networks: diff --git a/docker/addons/smpp-notifier/config.toml b/docker/addons/smpp-notifier/config.toml index fa1b3569..cab67b02 100644 --- a/docker/addons/smpp-notifier/config.toml +++ b/docker/addons/smpp-notifier/config.toml @@ -1,5 +1,5 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] diff --git a/docker/addons/smtp-notifier/config.toml b/docker/addons/smtp-notifier/config.toml index fa1b3569..cab67b02 100644 --- a/docker/addons/smtp-notifier/config.toml +++ b/docker/addons/smtp-notifier/config.toml @@ -1,5 +1,5 @@ # To listen all messsage broker subjects use default value "channels.>". # To subscribe to specific subjects use values starting by "channels." and # followed by a subtopic (e.g ["channels..sub.topic.x", ...]). -[subjects] -filter = ["channels.>"] +[subscriber] +subjects = ["channels.>"] diff --git a/pkg/transformers/json/time.go b/pkg/transformers/json/time.go new file mode 100644 index 00000000..fa49c905 --- /dev/null +++ b/pkg/transformers/json/time.go @@ -0,0 +1,152 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package json + +import ( + "math" + "strconv" + "strings" + "time" + + "github.com/mainflux/mainflux/pkg/errors" +) + +var errUnsupportedFormat = errors.New("unsupported time format") + +func parseTimestamp(format string, timestamp interface{}, location string) (time.Time, error) { + switch format { + case "unix", "unix_ms", "unix_us", "unix_ns": + return parseUnix(format, timestamp) + default: + if location == "" { + location = "UTC" + } + return parseTime(format, timestamp, location) + } +} + +func parseUnix(format string, timestamp interface{}) (time.Time, error) { + integer, fractional, err := parseComponents(timestamp) + if err != nil { + return time.Unix(0, 0), err + } + + switch strings.ToLower(format) { + case "unix": + return time.Unix(integer, fractional).UTC(), nil + case "unix_ms": + return time.Unix(0, integer*1e6).UTC(), nil + case "unix_us": + return time.Unix(0, integer*1e3).UTC(), nil + case "unix_ns": + return time.Unix(0, integer).UTC(), nil + default: + return time.Unix(0, 0), errUnsupportedFormat + } +} + +func parseComponents(timestamp interface{}) (int64, int64, error) { + switch ts := timestamp.(type) { + case string: + parts := strings.SplitN(ts, ".", 2) + if len(parts) == 2 { + return parseUnixTimeComponents(parts[0], parts[1]) + } + + parts = strings.SplitN(ts, ",", 2) + if len(parts) == 2 { + return parseUnixTimeComponents(parts[0], parts[1]) + } + + integer, err := strconv.ParseInt(ts, 10, 64) + if err != nil { + return 0, 0, err + } + return integer, 0, nil + case int8: + return int64(ts), 0, nil + case int16: + return int64(ts), 0, nil + case int32: + return int64(ts), 0, nil + case int64: + return ts, 0, nil + case uint8: + return int64(ts), 0, nil + case uint16: + return int64(ts), 0, nil + case uint32: + return int64(ts), 0, nil + case uint64: + return int64(ts), 0, nil + case float32: + integer, fractional := math.Modf(float64(ts)) + return int64(integer), int64(fractional * 1e9), nil + case float64: + integer, fractional := math.Modf(ts) + return int64(integer), int64(fractional * 1e9), nil + default: + return 0, 0, errUnsupportedFormat + } +} + +func parseUnixTimeComponents(first, second string) (int64, int64, error) { + integer, err := strconv.ParseInt(first, 10, 64) + if err != nil { + return 0, 0, err + } + + // Convert to nanoseconds, dropping any greater precision. + buf := []byte("000000000") + copy(buf, second) + + fractional, err := strconv.ParseInt(string(buf), 10, 64) + if err != nil { + return 0, 0, err + } + return integer, fractional, nil +} + +func parseTime(format string, timestamp interface{}, location string) (time.Time, error) { + switch ts := timestamp.(type) { + case string: + loc, err := time.LoadLocation(location) + if err != nil { + return time.Unix(0, 0), err + } + switch strings.ToLower(format) { + case "ansic": + format = time.ANSIC + case "unixdate": + format = time.UnixDate + case "rubydate": + format = time.RubyDate + case "rfc822": + format = time.RFC822 + case "rfc822z": + format = time.RFC822Z + case "rfc850": + format = time.RFC850 + case "rfc1123": + format = time.RFC1123 + case "rfc1123z": + format = time.RFC1123Z + case "rfc3339": + format = time.RFC3339 + case "rfc3339nano": + format = time.RFC3339Nano + case "stamp": + format = time.Stamp + case "stampmilli": + format = time.StampMilli + case "stampmicro": + format = time.StampMicro + case "stampnano": + format = time.StampNano + } + return time.ParseInLocation(format, ts, loc) + default: + return time.Unix(0, 0), errUnsupportedFormat + } +} diff --git a/pkg/transformers/json/transformer.go b/pkg/transformers/json/transformer.go index ce047660..cf2042d7 100644 --- a/pkg/transformers/json/transformer.go +++ b/pkg/transformers/json/transformer.go @@ -14,30 +14,41 @@ import ( const sep = "/" -var keys = [...]string{"publisher", "protocol", "channel", "subtopic"} - var ( + keys = [...]string{"publisher", "protocol", "channel", "subtopic"} + // ErrTransform represents an error during parsing message. - ErrTransform = errors.New("unable to parse JSON object") - ErrInvalidKey = errors.New("invalid object key") + ErrTransform = errors.New("unable to parse JSON object") + // ErrInvalidKey represents the use of a reserved message field. + ErrInvalidKey = errors.New("invalid object key") + // ErrInvalidTimeField represents the use an invalid time field. + ErrInvalidTimeField = errors.New("invalid time field") + errUnknownFormat = errors.New("unknown format of JSON message") errInvalidFormat = errors.New("invalid JSON object") errInvalidNestedJSON = errors.New("invalid nested JSON object") ) -type funcTransformer func(messaging.Message) (interface{}, error) +// TimeField represents the message fields to use as timestamp +type TimeField struct { + FieldName string `toml:"field_name"` + FieldFormat string `toml:"field_format"` + Location string `toml:"location"` +} + +type transformerService struct { + timeFields []TimeField +} // New returns a new JSON transformer. -func New() transformers.Transformer { - return funcTransformer(transformer) +func New(tfs []TimeField) transformers.Transformer { + return &transformerService{ + timeFields: tfs, + } } // Transform transforms Mainflux message to a list of JSON messages. -func (fh funcTransformer) Transform(msg messaging.Message) (interface{}, error) { - return fh(msg) -} - -func transformer(msg messaging.Message) (interface{}, error) { +func (ts *transformerService) Transform(msg messaging.Message) (interface{}, error) { ret := Message{ Publisher: msg.Publisher, Created: msg.Created, @@ -45,21 +56,35 @@ func transformer(msg messaging.Message) (interface{}, error) { Channel: msg.Channel, Subtopic: msg.Subtopic, } + if ret.Subtopic == "" { return nil, errors.Wrap(ErrTransform, errUnknownFormat) } + subs := strings.Split(ret.Subtopic, ".") if len(subs) == 0 { return nil, errors.Wrap(ErrTransform, errUnknownFormat) } + format := subs[len(subs)-1] var payload interface{} if err := json.Unmarshal(msg.Payload, &payload); err != nil { return nil, errors.Wrap(ErrTransform, err) } + switch p := payload.(type) { case map[string]interface{}: ret.Payload = p + + // Apply timestamp transformation rules depending on key/unit pairs + ts, err := ts.transformTimeField(p) + if err != nil { + return nil, errors.Wrap(ErrInvalidTimeField, err) + } + if ts != 0 { + ret.Created = ts + } + return Messages{[]Message{ret}, format}, nil case []interface{}: res := []Message{} @@ -70,6 +95,16 @@ func transformer(msg messaging.Message) (interface{}, error) { return nil, errors.Wrap(ErrTransform, errInvalidNestedJSON) } newMsg := ret + + // Apply timestamp transformation rules depending on key/unit pairs + ts, err := ts.transformTimeField(v) + if err != nil { + return nil, errors.Wrap(ErrInvalidTimeField, err) + } + if ts != 0 { + ret.Created = ts + } + newMsg.Payload = v res = append(res, newMsg) } @@ -140,3 +175,22 @@ func flatten(prefix string, m, m1 map[string]interface{}) (map[string]interface{ } return m, nil } + +func (ts *transformerService) transformTimeField(payload map[string]interface{}) (int64, error) { + if len(ts.timeFields) == 0 { + return 0, nil + } + + for _, tf := range ts.timeFields { + if val, ok := payload[tf.FieldName]; ok { + t, err := parseTimestamp(tf.FieldFormat, val, tf.Location) + if err != nil { + return 0, err + } + + return t.UnixNano(), nil + } + } + + return 0, nil +} diff --git a/pkg/transformers/json/transformer_test.go b/pkg/transformers/json/transformer_test.go index 80acdac6..b2ce5d05 100644 --- a/pkg/transformers/json/transformer_test.go +++ b/pkg/transformers/json/transformer_test.go @@ -15,14 +15,26 @@ import ( ) const ( - validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` - listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]` - invalidPayload = `{"key1": }` + validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` + tsPayload = `{"custom_ts_key": "1638310819", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` + microsPayload = `{"custom_ts_micro_key": "1638310819000000", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` + invalidTsPayload = `{"custom_ts_key": "abc", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}` + listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]` + invalidPayload = `{"key1": }` ) func TestTransformJSON(t *testing.T) { now := time.Now().Unix() - tr := json.New() + ts := []json.TimeField{ + { + FieldName: "custom_ts_key", + FieldFormat: "unix", + }, { + FieldName: "custom_ts_micro_key", + FieldFormat: "unix_us", + }, + } + tr := json.New(ts) msg := messaging.Message{ Channel: "channel-1", Subtopic: "subtopic-1", @@ -37,6 +49,18 @@ func TestTransformJSON(t *testing.T) { listMsg := msg listMsg.Payload = []byte(listPayload) + tsMsg := msg + tsMsg.Payload = []byte(tsPayload) + + microsMsg := msg + microsMsg.Payload = []byte(microsPayload) + + invalidFmt := msg + invalidFmt.Subtopic = "" + + invalidTimeField := msg + invalidTimeField.Payload = []byte(invalidTsPayload) + jsonMsgs := json.Messages{ Data: []json.Message{ { @@ -58,8 +82,49 @@ func TestTransformJSON(t *testing.T) { Format: msg.Subtopic, } - invalidFmt := msg - invalidFmt.Subtopic = "" + jsonTsMsgs := json.Messages{ + Data: []json.Message{ + { + Channel: msg.Channel, + Subtopic: msg.Subtopic, + Publisher: msg.Publisher, + Protocol: msg.Protocol, + Created: int64(1638310819000000000), + Payload: map[string]interface{}{ + "custom_ts_key": "1638310819", + "key1": "val1", + "key2": float64(123), + "key3": "val3", + "key4": map[string]interface{}{ + "key5": "val5", + }, + }, + }, + }, + Format: msg.Subtopic, + } + + jsonMicrosMsgs := json.Messages{ + Data: []json.Message{ + { + Channel: msg.Channel, + Subtopic: msg.Subtopic, + Publisher: msg.Publisher, + Protocol: msg.Protocol, + Created: int64(1638310819000000000), + Payload: map[string]interface{}{ + "custom_ts_micro_key": "1638310819000000", + "key1": "val1", + "key2": float64(123), + "key3": "val3", + "key4": map[string]interface{}{ + "key5": "val5", + }, + }, + }, + }, + Format: msg.Subtopic, + } listJSON := json.Messages{ Data: []json.Message{ @@ -127,6 +192,24 @@ func TestTransformJSON(t *testing.T) { json: nil, err: json.ErrTransform, }, + { + desc: "test transform JSON with timestamp transformation", + msg: tsMsg, + json: jsonTsMsgs, + err: nil, + }, + { + desc: "test transform JSON with timestamp transformation in micros", + msg: microsMsg, + json: jsonMicrosMsgs, + err: nil, + }, + { + desc: "test transform JSON with invalid timestamp transformation in micros", + msg: invalidTimeField, + json: nil, + err: json.ErrInvalidTimeField, + }, } for _, tc := range cases {