NOISSUE - Add timestamp transformation rules for specifc JSON fields (#1514)

* NOISSUE - Add timestamp transformation rules for specifc JSON fields

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add tests and defaults

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix Created

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix READMEs

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix Config file

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Improve the timestamp Config structure

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix typos

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix smpp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rm duplicated Unmarshal

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Extract the error var

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix review

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Return error if time field transformation fails

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2021-12-14 11:17:23 +01:00 committed by GitHub
parent 7e9ab453d4
commit b18c9e79dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 582 additions and 345 deletions

View File

@ -21,9 +21,6 @@ import (
"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
@ -31,39 +28,33 @@ const (
svcName = "cassandra-writer"
sep = ","
defNatsURL = "nats://localhost:4222"
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDBPort = "9042"
defConfigPath = "/config.toml"
defContentType = "application/senml+json"
defTransformer = "senml"
defNatsURL = "nats://localhost:4222"
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDBPort = "9042"
defConfigPath = "/config.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUser = "MF_CASSANDRA_WRITER_DB_USER"
envDBPass = "MF_CASSANDRA_WRITER_DB_PASS"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envConfigPath = "MF_CASSANDRA_WRITER_CONFIG_PATH"
envContentType = "MF_CASSANDRA_WRITER_CONTENT_TYPE"
envTransformer = "MF_CASSANDRA_WRITER_TRANSFORMER"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUser = "MF_CASSANDRA_WRITER_DB_USER"
envDBPass = "MF_CASSANDRA_WRITER_DB_PASS"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envConfigPath = "MF_CASSANDRA_WRITER_CONFIG_PATH"
)
type config struct {
natsURL string
logLevel string
port string
configPath string
contentType string
transformer string
dbCfg cassandra.DBConfig
natsURL string
logLevel string
port string
configPath string
dbCfg cassandra.DBConfig
}
func main() {
@ -85,9 +76,8 @@ func main() {
defer session.Close()
repo := newService(session, logger)
t := makeTransformer(cfg, logger)
if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}
@ -120,13 +110,11 @@ func loadConfig() config {
}
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
contentType: mainflux.Env(envContentType, defContentType),
transformer: mainflux.Env(envTransformer, defTransformer),
dbCfg: dbCfg,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
dbCfg: dbCfg,
}
}
@ -162,21 +150,6 @@ func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer
return repo
}
func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.transformer) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.contentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New()
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer))
os.Exit(1)
return nil
}
}
func startHTTPServer(port string, errs chan error, logger logger.Logger) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port))

View File

@ -9,7 +9,6 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
@ -20,52 +19,43 @@ import (
"github.com/mainflux/mainflux/consumers/writers/influxdb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
svcName = "influxdb-writer"
defNatsURL = "nats://localhost:4222"
defLogLevel = "error"
defPort = "8180"
defDB = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defConfigPath = "/config.toml"
defContentType = "application/senml+json"
defTransformer = "senml"
defNatsURL = "nats://localhost:4222"
defLogLevel = "error"
defPort = "8180"
defDB = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defConfigPath = "/config.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDB = "MF_INFLUXDB_DB"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUXDB_PORT"
envDBUser = "MF_INFLUXDB_ADMIN_USER"
envDBPass = "MF_INFLUXDB_ADMIN_PASSWORD"
envConfigPath = "MF_INFLUX_WRITER_CONFIG_PATH"
envContentType = "MF_INFLUX_WRITER_CONTENT_TYPE"
envTransformer = "MF_INFLUX_WRITER_TRANSFORMER"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDB = "MF_INFLUXDB_DB"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUXDB_PORT"
envDBUser = "MF_INFLUXDB_ADMIN_USER"
envDBPass = "MF_INFLUXDB_ADMIN_PASSWORD"
envConfigPath = "MF_INFLUX_WRITER_CONFIG_PATH"
)
type config struct {
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
configPath string
contentType string
transformer string
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
configPath string
}
func main() {
@ -95,9 +85,8 @@ func main() {
counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
t := makeTransformer(cfg, logger)
if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
@ -117,17 +106,15 @@ func main() {
func loadConfigs() (config, influxdata.HTTPConfig) {
cfg := config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDB, defDB),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
configPath: mainflux.Env(envConfigPath, defConfigPath),
contentType: mainflux.Env(envContentType, defContentType),
transformer: mainflux.Env(envTransformer, defTransformer),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDB, defDB),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
configPath: mainflux.Env(envConfigPath, defConfigPath),
}
clientCfg := influxdata.HTTPConfig{
@ -157,21 +144,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
return counter, latency
}
func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.transformer) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.contentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New()
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer))
os.Exit(1)
return nil
}
}
func startHTTPService(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p))

View File

@ -10,7 +10,6 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
@ -20,9 +19,6 @@ import (
"github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@ -31,37 +27,31 @@ import (
const (
svcName = "mongodb-writer"
defLogLevel = "error"
defNatsURL = "nats://localhost:4222"
defPort = "8180"
defDB = "mainflux"
defDBHost = "localhost"
defDBPort = "27017"
defConfigPath = "/config.toml"
defContentType = "application/senml+json"
defTransformer = "senml"
defLogLevel = "error"
defNatsURL = "nats://localhost:4222"
defPort = "8180"
defDB = "mainflux"
defDBHost = "localhost"
defDBPort = "27017"
defConfigPath = "/config.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL"
envPort = "MF_MONGO_WRITER_PORT"
envDB = "MF_MONGO_WRITER_DB"
envDBHost = "MF_MONGO_WRITER_DB_HOST"
envDBPort = "MF_MONGO_WRITER_DB_PORT"
envConfigPath = "MF_MONGO_WRITER_CONFIG_PATH"
envContentType = "MF_MONGO_WRITER_CONTENT_TYPE"
envTransformer = "MF_MONGO_WRITER_TRANSFORMER"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL"
envPort = "MF_MONGO_WRITER_PORT"
envDB = "MF_MONGO_WRITER_DB"
envDBHost = "MF_MONGO_WRITER_DB_HOST"
envDBPort = "MF_MONGO_WRITER_DB_PORT"
envConfigPath = "MF_MONGO_WRITER_CONFIG_PATH"
)
type config struct {
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
configPath string
contentType string
transformer string
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
configPath string
}
func main() {
@ -92,9 +82,8 @@ func main() {
counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
t := makeTransformer(cfg, logger)
if err := consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
@ -114,15 +103,13 @@ func main() {
func loadConfigs() config {
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDB, defDB),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
contentType: mainflux.Env(envContentType, defContentType),
transformer: mainflux.Env(envTransformer, defTransformer),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDB, defDB),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
}
}
@ -144,21 +131,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
return counter, latency
}
func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.transformer) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.contentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New()
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer))
os.Exit(1)
return nil
}
}
func startHTTPService(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Mongodb writer service started, exposed port %s", p))

View File

@ -9,7 +9,6 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
@ -20,9 +19,6 @@ import (
"github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
@ -43,8 +39,6 @@ const (
defDBSSLKey = ""
defDBSSLRootCert = ""
defConfigPath = "/config.toml"
defContentType = "application/senml+json"
defTransformer = "senml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL"
@ -59,18 +53,14 @@ const (
envDBSSLKey = "MF_POSTGRES_WRITER_DB_SSL_KEY"
envDBSSLRootCert = "MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT"
envConfigPath = "MF_POSTGRES_WRITER_CONFIG_PATH"
envContentType = "MF_POSTGRES_WRITER_CONTENT_TYPE"
envTransformer = "MF_POSTGRES_WRITER_TRANSFORMER"
)
type config struct {
natsURL string
logLevel string
port string
configPath string
contentType string
transformer string
dbConfig postgres.Config
natsURL string
logLevel string
port string
configPath string
dbConfig postgres.Config
}
func main() {
@ -92,9 +82,8 @@ func main() {
defer db.Close()
repo := newService(db, logger)
t := makeTransformer(cfg, logger)
if err = consumers.Start(pubSub, repo, t, cfg.configPath, logger); err != nil {
if err = consumers.Start(pubSub, repo, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}
@ -126,13 +115,11 @@ func loadConfig() config {
}
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
contentType: mainflux.Env(envContentType, defContentType),
transformer: mainflux.Env(envTransformer, defTransformer),
dbConfig: dbConfig,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
configPath: mainflux.Env(envConfigPath, defConfigPath),
dbConfig: dbConfig,
}
}
@ -167,21 +154,6 @@ func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer {
return svc
}
func makeTransformer(cfg config, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.transformer) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.contentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New()
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.transformer))
os.Exit(1)
return nil
}
}
func startHTTPServer(port string, errs chan error, logger logger.Logger) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Postgres writer service started, exposed port %s", port))

View File

@ -154,7 +154,7 @@ func main() {
svc := newService(db, dbTracer, auth, cfg, logger)
errs := make(chan error, 2)
if err = consumers.Start(pubSub, svc, nil, cfg.configPath, logger); err != nil {
if err = consumers.Start(pubSub, svc, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

View File

@ -154,7 +154,7 @@ func main() {
svc := newService(db, dbTracer, auth, cfg, logger)
errs := make(chan error, 2)
if err = consumers.Start(pubSub, svc, nil, cfg.configPath, logger); err != nil {
if err = consumers.Start(pubSub, svc, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

View File

@ -6,6 +6,8 @@ package consumers
import (
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/pelletier/go-toml"
@ -14,6 +16,13 @@ import (
"github.com/mainflux/mainflux/pkg/messaging"
pubsub "github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
)
const (
defContentType = "application/senml+json"
defFormat = "senml"
)
var (
@ -24,13 +33,15 @@ var (
// Start method starts consuming messages received from NATS.
// This method transforms messages to SenML format before
// using MessageRepository to store them.
func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error {
subjects, err := loadSubjectsConfig(subjectsCfgPath)
func Start(sub messaging.Subscriber, consumer Consumer, configPath string, logger logger.Logger) error {
cfg, err := loadConfig(configPath)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err))
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
}
for _, subject := range subjects {
transformer := makeTransformer(cfg.TransformerCfg, logger)
for _, subject := range cfg.SubscriberCfg.Subjects {
if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil {
return err
}
@ -52,24 +63,55 @@ func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler {
}
}
type filterConfig struct {
Filter []string `toml:"filter"`
type subscriberConfig struct {
Subjects []string `toml:"subjects"`
}
type subjectsConfig struct {
Subjects filterConfig `toml:"subjects"`
type transformerConfig struct {
Format string `toml:"format"`
ContentType string `toml:"content_type"`
TimeFields []json.TimeField `toml:"time_fields"`
}
func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) {
data, err := ioutil.ReadFile(subjectsConfigPath)
type config struct {
SubscriberCfg subscriberConfig `toml:"subscriber"`
TransformerCfg transformerConfig `toml:"transformer"`
}
func loadConfig(configPath string) (config, error) {
cfg := config{
SubscriberCfg: subscriberConfig{
Subjects: []string{pubsub.SubjectAllChannels},
},
TransformerCfg: transformerConfig{
Format: defFormat,
ContentType: defContentType,
},
}
data, err := ioutil.ReadFile(configPath)
if err != nil {
return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err)
return cfg, errors.Wrap(errOpenConfFile, err)
}
var subjectsCfg subjectsConfig
if err := toml.Unmarshal(data, &subjectsCfg); err != nil {
return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err)
if err := toml.Unmarshal(data, &cfg); err != nil {
return cfg, errors.Wrap(errParseConfFile, err)
}
return subjectsCfg.Subjects.Filter, nil
return cfg, nil
}
func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer {
switch strings.ToUpper(cfg.Format) {
case "SENML":
logger.Info("Using SenML transformer")
return senml.New(cfg.ContentType)
case "JSON":
logger.Info("Using JSON transformer")
return json.New(cfg.TimeFields)
default:
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
os.Exit(1)
return nil
}
}

View File

@ -19,7 +19,7 @@ var _ notifiers.Notifier = (*notifier)(nil)
type notifier struct {
transmitter *smpp.Transmitter
tranformer transformers.Transformer
transformer transformers.Transformer
sourceAddrTON uint8
sourceAddrNPI uint8
destAddrTON uint8
@ -38,7 +38,7 @@ func New(cfg Config) notifiers.Notifier {
t.Bind()
ret := &notifier{
transmitter: t,
tranformer: json.New(),
transformer: json.New([]json.TimeField{}),
sourceAddrTON: cfg.SourceAddrTON,
destAddrTON: cfg.DestAddrTON,
sourceAddrNPI: cfg.SourceAddrNPI,

View File

@ -8,23 +8,20 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
| -------------------------------- | --------------------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error |
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
| MF_CASSANDRA_WRITER_DB_USER | Cassandra DB username | |
| MF_CASSANDRA_WRITER_DB_PASS | Cassandra DB password | |
| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 |
| MF_CASSANDRA_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml |
| MF_CASSANDRA_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json |
| MF_CASSANDRA_WRITER_TRANSFORMER | Message transformer type | senml |
| Variable | Description | Default |
| -------------------------------- | ----------------------------------------------------------------------- | --------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error |
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
| MF_CASSANDRA_WRITER_DB_USER | Cassandra DB username | |
| MF_CASSANDRA_WRITER_DB_PASS | Cassandra DB password | |
| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 |
| MF_CASSANDRA_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml |
## Deployment
The service itself is distributed as Docker container. Check the [`cassandra-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/cassandra-writer/docker-compose.yml#L30-L49) service section in
docker-compose to see how service is deployed.
The service itself is distributed as Docker container. Check the [`cassandra-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/cassandra-writer/docker-compose.yml#L30-L49) service section in docker-compose to see how service is deployed.
To start the service, execute the following shell script:
@ -49,8 +46,7 @@ MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] \
MF_CASSANDRA_READER_DB_USER=[Cassandra DB username] \
MF_CASSANDRA_READER_DB_PASS=[Cassandra DB password] \
MF_CASSANDRA_READER_DB_PORT=[Cassandra DB port] \
MF_CASSANDRA_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \
MF_CASSANDRA_WRITER_TRANSFORMER=[Message transformer type] \
MF_CASSANDRA_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \
$GOBIN/mainflux-cassandra-writer
```

View File

@ -8,19 +8,17 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
| ----------------------------- | -------------------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /configs.toml |
| MF_INFLUX_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json |
| MF_INFLUX_WRITER_TRANSFORMER | Message transformer type | senml |
| Variable | Description | Default |
| ----------------------------- | ----------------------------------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /configs.toml |
## Deployment
@ -49,8 +47,7 @@ MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] \
MF_INFLUXDB_PORT=[InfluxDB database port] \
MF_INFLUXDB_ADMIN_USER=[InfluxDB admin user] \
MF_INFLUXDB_ADMIN_PASSWORD=[InfluxDB admin password] \
MF_INFLUX_WRITER_CONFIG_PATH=[Configuration file path with filters list] \
MF_POSTGRES_WRITER_TRANSFORMER=[Message transformer type] \
MF_INFLUX_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \
$GOBIN/mainflux-influxdb
```

View File

@ -8,22 +8,19 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
| ---------------------------- | ----------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error |
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
| MF_MONGO_WRITER_DB | Default MongoDB database name | messages |
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
| MF_MONGO_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml |
| MF_MONGO_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json |
| MF_MONGO_WRITER_TRANSFORMER | Message transformer type | senml |
| Variable | Description | Default |
| ---------------------------- | ----------------------------------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error |
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
| MF_MONGO_WRITER_DB | Default MongoDB database name | messages |
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
| MF_MONGO_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml |
## Deployment
The service itself is distributed as Docker container. Check the [`mongodb-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/mongodb-writer/docker-compose.yml#L36-L55) service section in
docker-compose to see how service is deployed.
The service itself is distributed as Docker container. Check the [`mongodb-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/mongodb-writer/docker-compose.yml#L36-L55) service section in docker-compose to see how service is deployed.
To start the service, execute the following shell script:
@ -47,7 +44,6 @@ MF_MONGO_WRITER_DB=[MongoDB database name] \
MF_MONGO_WRITER_DB_HOST=[MongoDB database host] \
MF_MONGO_WRITER_DB_PORT=[MongoDB database port] \
MF_MONGO_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \
MF_MONGO_WRITER_TRANSFORMER=[Transformer type to be used] \
$GOBIN/mainflux-mongodb-writer
```

View File

@ -8,28 +8,25 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
| ----------------------------------- | ----------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error |
| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 |
| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres |
| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 |
| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux |
| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux |
| MF_POSTGRES_WRITER_DB | Postgres database name | messages |
| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled |
| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" |
| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" |
| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
| MF_POSTGRES_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml |
| MF_POSTGRES_WRITER_CONTENT_TYPE | Message payload Content Type | application/senml+json |
| MF_POSTGRES_WRITER_TRANSFORMER | Message transformer type | senml |
| Variable | Description | Default |
| ----------------------------------- | ----------------------------------------------------------------------- | ---------------------- |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error |
| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 |
| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres |
| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 |
| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux |
| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux |
| MF_POSTGRES_WRITER_DB | Postgres database name | messages |
| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled |
| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" |
| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" |
| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
| MF_POSTGRES_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml |
## Deployment
The service itself is distributed as Docker container. Check the [`postgres-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/postgres-writer/docker-compose.yml#L34-L59) service section in
docker-compose to see how service is deployed.
The service itself is distributed as Docker container. Check the [`postgres-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/postgres-writer/docker-compose.yml#L34-L59) service section in docker-compose to see how service is deployed.
To start the service, execute the following shell script:
@ -58,8 +55,7 @@ MF_POSTGRES_WRITER_DB_SSL_MODE=[Postgres SSL mode] \
MF_POSTGRES_WRITER_DB_SSL_CERT=[Postgres SSL cert] \
MF_POSTGRES_WRITER_DB_SSL_KEY=[Postgres SSL key] \
MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] \
MF_POSTGRES_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \
MF_POSTGRES_WRITER_TRANSFORMER=[Message transformer type] \
MF_POSTGRES_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \
$GOBIN/mainflux-postgres-writer
```

View File

@ -206,8 +206,6 @@ MF_CASSANDRA_WRITER_PORT=8902
MF_CASSANDRA_WRITER_DB_PORT=9042
MF_CASSANDRA_WRITER_DB_CLUSTER=mainflux-cassandra
MF_CASSANDRA_WRITER_DB_KEYSPACE=mainflux
MF_CASSANDRA_WRITER_CONTENT_TYPE=application/senml+json
MF_CASSANDRA_WRITER_TRANSFORMER=senml
### Cassandra Reader
MF_CASSANDRA_READER_LOG_LEVEL=debug
@ -231,8 +229,6 @@ MF_INFLUX_WRITER_PORT=8900
MF_INFLUX_WRITER_BATCH_SIZE=5000
MF_INFLUX_WRITER_BATCH_TIMEOUT=5
MF_INFLUX_WRITER_GRAFANA_PORT=3001
MF_INFLUX_WRITER_CONTENT_TYPE=application/senml+json
MF_INFLUX_WRITER_TRANSFORMER=senml
### InfluxDB Reader
MF_INFLUX_READER_LOG_LEVEL=debug
@ -245,8 +241,6 @@ MF_MONGO_WRITER_LOG_LEVEL=debug
MF_MONGO_WRITER_PORT=8901
MF_MONGO_WRITER_DB=mainflux
MF_MONGO_WRITER_DB_PORT=27017
MF_MONGO_WRITER_CONTENT_TYPE=application/senml+json
MF_MONGO_WRITER_TRANSFORMER=senml
### MongoDB Reader
MF_MONGO_READER_LOG_LEVEL=debug
@ -267,8 +261,6 @@ MF_POSTGRES_WRITER_DB_SSL_MODE=disable
MF_POSTGRES_WRITER_DB_SSL_CERT=""
MF_POSTGRES_WRITER_DB_SSL_KEY=""
MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=""
MF_POSTGRES_WRITER_CONTENT_TYPE=application/senml+json
MF_POSTGRES_WRITER_TRANSFORMER=senml
### Postgres Reader
MF_POSTGRES_READER_LOG_LEVEL=debug

View File

@ -1,5 +1,16 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]
[transformer]
# SenML or JSON
format = "senml"
# Used if format is SenML
content_type = "application/senml+json"
# Used as timestamp fields if format is JSON
time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"},
{ field_name = "millis_key", field_format = "unix_ms", location = "UTC"},
{ field_name = "micros_key", field_format = "unix_us", location = "UTC"},
{ field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}]

View File

@ -40,7 +40,6 @@ services:
MF_CASSANDRA_WRITER_DB_PORT: ${MF_CASSANDRA_WRITER_DB_PORT}
MF_CASSANDRA_WRITER_DB_CLUSTER: ${MF_CASSANDRA_WRITER_DB_CLUSTER}
MF_CASSANDRA_WRITER_DB_KEYSPACE: ${MF_CASSANDRA_WRITER_DB_KEYSPACE}
MF_CASSANDRA_WRITER_TRANSFORMER: ${MF_CASSANDRA_WRITER_TRANSFORMER}
ports:
- ${MF_CASSANDRA_WRITER_PORT}:${MF_CASSANDRA_WRITER_PORT}
networks:

View File

@ -1,5 +1,16 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]
[transformer]
# SenML or JSON
format = "senml"
# Used if format is SenML
content_type = "application/senml+json"
# Used as timestamp fields if format is JSON
time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"},
{ field_name = "millis_key", field_format = "unix_ms", location = "UTC"},
{ field_name = "micros_key", field_format = "unix_us", location = "UTC"},
{ field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}]

View File

@ -50,7 +50,6 @@ services:
MF_INFLUXDB_PORT: ${MF_INFLUXDB_PORT}
MF_INFLUXDB_ADMIN_USER: ${MF_INFLUXDB_ADMIN_USER}
MF_INFLUXDB_ADMIN_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD}
MF_INFLUX_WRITER_TRANSFORMER: ${MF_INFLUX_WRITER_TRANSFORMER}
ports:
- ${MF_INFLUX_WRITER_PORT}:${MF_INFLUX_WRITER_PORT}
networks:

View File

@ -1,5 +1,16 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]
[transformer]
# SenML or JSON
format = "senml"
# Used if format is SenML
content_type = "application/senml+json"
# Used as timestamp fields if format is JSON
time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"},
{ field_name = "millis_key", field_format = "unix_ms", location = "UTC"},
{ field_name = "micros_key", field_format = "unix_us", location = "UTC"},
{ field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}]

View File

@ -46,7 +46,6 @@ services:
MF_MONGO_WRITER_DB: ${MF_MONGO_WRITER_DB}
MF_MONGO_WRITER_DB_HOST: mongodb
MF_MONGO_WRITER_DB_PORT: ${MF_MONGO_WRITER_DB_PORT}
MF_MONGO_WRITER_TRANSFORMER: ${MF_MONGO_WRITER_TRANSFORMER}
ports:
- ${MF_MONGO_WRITER_PORT}:${MF_MONGO_WRITER_PORT}
networks:

View File

@ -1,5 +1,16 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]
[transformer]
# SenML or JSON
format = "senml"
# Used if format is SenML
content_type = "application/senml+json"
# Used as timestamp fields if format is JSON
time_fields = [{ field_name = "seconds_key", field_format = "unix", location = "UTC"},
{ field_name = "millis_key", field_format = "unix_ms", location = "UTC"},
{ field_name = "micros_key", field_format = "unix_us", location = "UTC"},
{ field_name = "nanos_key", field_format = "unix_ns", location = "UTC"}]

View File

@ -50,7 +50,6 @@ services:
MF_POSTGRES_WRITER_DB_SSL_CERT: ${MF_POSTGRES_WRITER_DB_SSL_CERT}
MF_POSTGRES_WRITER_DB_SSL_KEY: ${MF_POSTGRES_WRITER_DB_SSL_KEY}
MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT: ${MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT}
MF_POSTGRES_WRITER_TRANSFORMER: ${MF_POSTGRES_WRITER_TRANSFORMER}
ports:
- ${MF_POSTGRES_WRITER_PORT}:${MF_POSTGRES_WRITER_PORT}
networks:

View File

@ -1,5 +1,5 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]

View File

@ -1,5 +1,5 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]

View File

@ -0,0 +1,152 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package json
import (
"math"
"strconv"
"strings"
"time"
"github.com/mainflux/mainflux/pkg/errors"
)
var errUnsupportedFormat = errors.New("unsupported time format")
func parseTimestamp(format string, timestamp interface{}, location string) (time.Time, error) {
switch format {
case "unix", "unix_ms", "unix_us", "unix_ns":
return parseUnix(format, timestamp)
default:
if location == "" {
location = "UTC"
}
return parseTime(format, timestamp, location)
}
}
func parseUnix(format string, timestamp interface{}) (time.Time, error) {
integer, fractional, err := parseComponents(timestamp)
if err != nil {
return time.Unix(0, 0), err
}
switch strings.ToLower(format) {
case "unix":
return time.Unix(integer, fractional).UTC(), nil
case "unix_ms":
return time.Unix(0, integer*1e6).UTC(), nil
case "unix_us":
return time.Unix(0, integer*1e3).UTC(), nil
case "unix_ns":
return time.Unix(0, integer).UTC(), nil
default:
return time.Unix(0, 0), errUnsupportedFormat
}
}
func parseComponents(timestamp interface{}) (int64, int64, error) {
switch ts := timestamp.(type) {
case string:
parts := strings.SplitN(ts, ".", 2)
if len(parts) == 2 {
return parseUnixTimeComponents(parts[0], parts[1])
}
parts = strings.SplitN(ts, ",", 2)
if len(parts) == 2 {
return parseUnixTimeComponents(parts[0], parts[1])
}
integer, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return 0, 0, err
}
return integer, 0, nil
case int8:
return int64(ts), 0, nil
case int16:
return int64(ts), 0, nil
case int32:
return int64(ts), 0, nil
case int64:
return ts, 0, nil
case uint8:
return int64(ts), 0, nil
case uint16:
return int64(ts), 0, nil
case uint32:
return int64(ts), 0, nil
case uint64:
return int64(ts), 0, nil
case float32:
integer, fractional := math.Modf(float64(ts))
return int64(integer), int64(fractional * 1e9), nil
case float64:
integer, fractional := math.Modf(ts)
return int64(integer), int64(fractional * 1e9), nil
default:
return 0, 0, errUnsupportedFormat
}
}
func parseUnixTimeComponents(first, second string) (int64, int64, error) {
integer, err := strconv.ParseInt(first, 10, 64)
if err != nil {
return 0, 0, err
}
// Convert to nanoseconds, dropping any greater precision.
buf := []byte("000000000")
copy(buf, second)
fractional, err := strconv.ParseInt(string(buf), 10, 64)
if err != nil {
return 0, 0, err
}
return integer, fractional, nil
}
func parseTime(format string, timestamp interface{}, location string) (time.Time, error) {
switch ts := timestamp.(type) {
case string:
loc, err := time.LoadLocation(location)
if err != nil {
return time.Unix(0, 0), err
}
switch strings.ToLower(format) {
case "ansic":
format = time.ANSIC
case "unixdate":
format = time.UnixDate
case "rubydate":
format = time.RubyDate
case "rfc822":
format = time.RFC822
case "rfc822z":
format = time.RFC822Z
case "rfc850":
format = time.RFC850
case "rfc1123":
format = time.RFC1123
case "rfc1123z":
format = time.RFC1123Z
case "rfc3339":
format = time.RFC3339
case "rfc3339nano":
format = time.RFC3339Nano
case "stamp":
format = time.Stamp
case "stampmilli":
format = time.StampMilli
case "stampmicro":
format = time.StampMicro
case "stampnano":
format = time.StampNano
}
return time.ParseInLocation(format, ts, loc)
default:
return time.Unix(0, 0), errUnsupportedFormat
}
}

View File

@ -14,30 +14,41 @@ import (
const sep = "/"
var keys = [...]string{"publisher", "protocol", "channel", "subtopic"}
var (
keys = [...]string{"publisher", "protocol", "channel", "subtopic"}
// ErrTransform represents an error during parsing message.
ErrTransform = errors.New("unable to parse JSON object")
ErrInvalidKey = errors.New("invalid object key")
ErrTransform = errors.New("unable to parse JSON object")
// ErrInvalidKey represents the use of a reserved message field.
ErrInvalidKey = errors.New("invalid object key")
// ErrInvalidTimeField represents the use an invalid time field.
ErrInvalidTimeField = errors.New("invalid time field")
errUnknownFormat = errors.New("unknown format of JSON message")
errInvalidFormat = errors.New("invalid JSON object")
errInvalidNestedJSON = errors.New("invalid nested JSON object")
)
type funcTransformer func(messaging.Message) (interface{}, error)
// TimeField represents the message fields to use as timestamp
type TimeField struct {
FieldName string `toml:"field_name"`
FieldFormat string `toml:"field_format"`
Location string `toml:"location"`
}
type transformerService struct {
timeFields []TimeField
}
// New returns a new JSON transformer.
func New() transformers.Transformer {
return funcTransformer(transformer)
func New(tfs []TimeField) transformers.Transformer {
return &transformerService{
timeFields: tfs,
}
}
// Transform transforms Mainflux message to a list of JSON messages.
func (fh funcTransformer) Transform(msg messaging.Message) (interface{}, error) {
return fh(msg)
}
func transformer(msg messaging.Message) (interface{}, error) {
func (ts *transformerService) Transform(msg messaging.Message) (interface{}, error) {
ret := Message{
Publisher: msg.Publisher,
Created: msg.Created,
@ -45,21 +56,35 @@ func transformer(msg messaging.Message) (interface{}, error) {
Channel: msg.Channel,
Subtopic: msg.Subtopic,
}
if ret.Subtopic == "" {
return nil, errors.Wrap(ErrTransform, errUnknownFormat)
}
subs := strings.Split(ret.Subtopic, ".")
if len(subs) == 0 {
return nil, errors.Wrap(ErrTransform, errUnknownFormat)
}
format := subs[len(subs)-1]
var payload interface{}
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return nil, errors.Wrap(ErrTransform, err)
}
switch p := payload.(type) {
case map[string]interface{}:
ret.Payload = p
// Apply timestamp transformation rules depending on key/unit pairs
ts, err := ts.transformTimeField(p)
if err != nil {
return nil, errors.Wrap(ErrInvalidTimeField, err)
}
if ts != 0 {
ret.Created = ts
}
return Messages{[]Message{ret}, format}, nil
case []interface{}:
res := []Message{}
@ -70,6 +95,16 @@ func transformer(msg messaging.Message) (interface{}, error) {
return nil, errors.Wrap(ErrTransform, errInvalidNestedJSON)
}
newMsg := ret
// Apply timestamp transformation rules depending on key/unit pairs
ts, err := ts.transformTimeField(v)
if err != nil {
return nil, errors.Wrap(ErrInvalidTimeField, err)
}
if ts != 0 {
ret.Created = ts
}
newMsg.Payload = v
res = append(res, newMsg)
}
@ -140,3 +175,22 @@ func flatten(prefix string, m, m1 map[string]interface{}) (map[string]interface{
}
return m, nil
}
func (ts *transformerService) transformTimeField(payload map[string]interface{}) (int64, error) {
if len(ts.timeFields) == 0 {
return 0, nil
}
for _, tf := range ts.timeFields {
if val, ok := payload[tf.FieldName]; ok {
t, err := parseTimestamp(tf.FieldFormat, val, tf.Location)
if err != nil {
return 0, err
}
return t.UnixNano(), nil
}
}
return 0, nil
}

View File

@ -15,14 +15,26 @@ import (
)
const (
validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]`
invalidPayload = `{"key1": }`
validPayload = `{"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
tsPayload = `{"custom_ts_key": "1638310819", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
microsPayload = `{"custom_ts_micro_key": "1638310819000000", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
invalidTsPayload = `{"custom_ts_key": "abc", "key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}`
listPayload = `[{"key1": "val1", "key2": 123, "keylist3": "val3", "key4": {"key5": "val5"}}, {"key1": "val1", "key2": 123, "key3": "val3", "key4": {"key5": "val5"}}]`
invalidPayload = `{"key1": }`
)
func TestTransformJSON(t *testing.T) {
now := time.Now().Unix()
tr := json.New()
ts := []json.TimeField{
{
FieldName: "custom_ts_key",
FieldFormat: "unix",
}, {
FieldName: "custom_ts_micro_key",
FieldFormat: "unix_us",
},
}
tr := json.New(ts)
msg := messaging.Message{
Channel: "channel-1",
Subtopic: "subtopic-1",
@ -37,6 +49,18 @@ func TestTransformJSON(t *testing.T) {
listMsg := msg
listMsg.Payload = []byte(listPayload)
tsMsg := msg
tsMsg.Payload = []byte(tsPayload)
microsMsg := msg
microsMsg.Payload = []byte(microsPayload)
invalidFmt := msg
invalidFmt.Subtopic = ""
invalidTimeField := msg
invalidTimeField.Payload = []byte(invalidTsPayload)
jsonMsgs := json.Messages{
Data: []json.Message{
{
@ -58,8 +82,49 @@ func TestTransformJSON(t *testing.T) {
Format: msg.Subtopic,
}
invalidFmt := msg
invalidFmt.Subtopic = ""
jsonTsMsgs := json.Messages{
Data: []json.Message{
{
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Created: int64(1638310819000000000),
Payload: map[string]interface{}{
"custom_ts_key": "1638310819",
"key1": "val1",
"key2": float64(123),
"key3": "val3",
"key4": map[string]interface{}{
"key5": "val5",
},
},
},
},
Format: msg.Subtopic,
}
jsonMicrosMsgs := json.Messages{
Data: []json.Message{
{
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Created: int64(1638310819000000000),
Payload: map[string]interface{}{
"custom_ts_micro_key": "1638310819000000",
"key1": "val1",
"key2": float64(123),
"key3": "val3",
"key4": map[string]interface{}{
"key5": "val5",
},
},
},
},
Format: msg.Subtopic,
}
listJSON := json.Messages{
Data: []json.Message{
@ -127,6 +192,24 @@ func TestTransformJSON(t *testing.T) {
json: nil,
err: json.ErrTransform,
},
{
desc: "test transform JSON with timestamp transformation",
msg: tsMsg,
json: jsonTsMsgs,
err: nil,
},
{
desc: "test transform JSON with timestamp transformation in micros",
msg: microsMsg,
json: jsonMicrosMsgs,
err: nil,
},
{
desc: "test transform JSON with invalid timestamp transformation in micros",
msg: invalidTimeField,
json: nil,
err: json.ErrInvalidTimeField,
},
}
for _, tc := range cases {