MF-732 - Add Postgres reader (#740)
* NOISSUE - Fix Readers logs Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * MF-732 - Add Postgres reader Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix total count Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm commented code Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add Postgres reader tests Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix editor format Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Change UUID lib Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
parent
3125f0bbc2
commit
5fec0a9e92
2
Makefile
2
Makefile
|
@ -4,7 +4,7 @@
|
|||
## SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
BUILD_DIR = build
|
||||
SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer cli bootstrap
|
||||
SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap
|
||||
DOCKERS = $(addprefix docker_,$(SERVICES))
|
||||
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
|
||||
CGO_ENABLED ?= 0
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
"github.com/mainflux/mainflux/readers/api"
|
||||
"github.com/mainflux/mainflux/readers/postgres"
|
||||
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
svcName = "postgres-writer"
|
||||
sep = ","
|
||||
|
||||
defThingsURL = "localhost:8183"
|
||||
defLogLevel = "debug"
|
||||
defPort = "9204"
|
||||
defClientTLS = "false"
|
||||
defCACerts = ""
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "5432"
|
||||
defDBUser = "mainflux"
|
||||
defDBPass = "mainflux"
|
||||
defDBName = "messages"
|
||||
defDBSSLMode = "disable"
|
||||
defDBSSLCert = ""
|
||||
defDBSSLKey = ""
|
||||
defDBSSLRootCert = ""
|
||||
|
||||
envThingsURL = "MF_THINGS_URL"
|
||||
envLogLevel = "MF_POSTGRES_READER_LOG_LEVEL"
|
||||
envPort = "MF_POSTGRES_READER_PORT"
|
||||
envClientTLS = "MF_POSTGRES_READER_CLIENT_TLS"
|
||||
envCACerts = "MF_POSTGRES_READER_CA_CERTS"
|
||||
envDBHost = "MF_POSTGRES_READER_DB_HOST"
|
||||
envDBPort = "MF_POSTGRES_READER_DB_PORT"
|
||||
envDBUser = "MF_POSTGRES_READER_DB_USER"
|
||||
envDBPass = "MF_POSTGRES_READER_DB_PASS"
|
||||
envDBName = "MF_POSTGRES_READER_DB_NAME"
|
||||
envDBSSLMode = "MF_POSTGRES_READER_DB_SSL_MODE"
|
||||
envDBSSLCert = "MF_POSTGRES_READER_DB_SSL_CERT"
|
||||
envDBSSLKey = "MF_POSTGRES_READER_DB_SSL_KEY"
|
||||
envDBSSLRootCert = "MF_POSTGRES_READER_DB_SSL_ROOT_CERT"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
thingsURL string
|
||||
logLevel string
|
||||
port string
|
||||
clientTLS bool
|
||||
caCerts string
|
||||
dbConfig postgres.Config
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := loadConfig()
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.logLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
conn := connectToThings(cfg, logger)
|
||||
defer conn.Close()
|
||||
|
||||
tc := thingsapi.NewClient(conn)
|
||||
|
||||
db := connectToDB(cfg.dbConfig, logger)
|
||||
defer db.Close()
|
||||
|
||||
repo := newService(db, logger)
|
||||
|
||||
errs := make(chan error, 2)
|
||||
|
||||
go startHTTPServer(repo, tc, cfg.port, logger, errs)
|
||||
|
||||
go func() {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGINT)
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Postgres writer service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
dbConfig := postgres.Config{
|
||||
Host: mainflux.Env(envDBHost, defDBHost),
|
||||
Port: mainflux.Env(envDBPort, defDBPort),
|
||||
User: mainflux.Env(envDBUser, defDBUser),
|
||||
Pass: mainflux.Env(envDBPass, defDBPass),
|
||||
Name: mainflux.Env(envDBName, defDBName),
|
||||
SSLMode: mainflux.Env(envDBSSLMode, defDBSSLMode),
|
||||
SSLCert: mainflux.Env(envDBSSLCert, defDBSSLCert),
|
||||
SSLKey: mainflux.Env(envDBSSLKey, defDBSSLKey),
|
||||
SSLRootCert: mainflux.Env(envDBSSLRootCert, defDBSSLRootCert),
|
||||
}
|
||||
|
||||
return config{
|
||||
thingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
logLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
port: mainflux.Env(envPort, defPort),
|
||||
dbConfig: dbConfig,
|
||||
}
|
||||
}
|
||||
|
||||
func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
|
||||
db, err := postgres.Connect(dbConfig)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to Postgres: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
|
||||
var opts []grpc.DialOption
|
||||
if cfg.clientTLS {
|
||||
if cfg.caCerts != "" {
|
||||
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to load certs: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
opts = append(opts, grpc.WithTransportCredentials(tpc))
|
||||
}
|
||||
} else {
|
||||
logger.Info("gRPC communication is not encrypted")
|
||||
opts = append(opts, grpc.WithInsecure())
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(cfg.thingsURL, opts...)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
return conn
|
||||
}
|
||||
|
||||
func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository {
|
||||
svc := postgres.New(db)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
svc = api.MetricsMiddleware(
|
||||
svc,
|
||||
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: "postgres",
|
||||
Subsystem: "message_writer",
|
||||
Name: "request_count",
|
||||
Help: "Number of requests received.",
|
||||
}, []string{"method"}),
|
||||
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
|
||||
Namespace: "postgres",
|
||||
Subsystem: "message_writer",
|
||||
Name: "request_latency_microseconds",
|
||||
Help: "Total duration of requests in microseconds.",
|
||||
}, []string{"method"}),
|
||||
)
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Postgres reader service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, svcName))
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
###
|
||||
# This docker-compose file contains optional Postgres-reader service for Mainflux platform.
|
||||
# Since this service is optional, this file is dependent of docker-compose.yml file
|
||||
# from <project_root>/docker. In order to run these optional service, execute command:
|
||||
# docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-reader/docker-compose.yml up
|
||||
# from project root.
|
||||
###
|
||||
|
||||
version: "3"
|
||||
|
||||
networks:
|
||||
docker_mainflux-base-net:
|
||||
external: true
|
||||
|
||||
services:
|
||||
postgres-reader:
|
||||
image: mainflux/postgres-reader:latest
|
||||
container_name: mainflux-postgres-reader
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_THINGS_URL: things:8183
|
||||
MF_POSTGRES_READER_LOG_LEVEL: debug
|
||||
MF_POSTGRES_READER_PORT: 9204
|
||||
MF_POSTGRES_READER_CLIENT_TLS: "false"
|
||||
MF_POSTGRES_READER_CA_CERTS: ""
|
||||
MF_POSTGRES_READER_DB_HOST: postgres
|
||||
MF_POSTGRES_READER_DB_PORT: 5432
|
||||
MF_POSTGRES_READER_DB_USER: mainflux
|
||||
MF_POSTGRES_READER_DB_PASS: mainflux
|
||||
MF_POSTGRES_READER_DB_NAME: messages
|
||||
MF_POSTGRES_READER_DB_SSL_MODE: disable
|
||||
MF_POSTGRES_READER_DB_SSL_CERT: ""
|
||||
MF_POSTGRES_READER_DB_SSL_KEY: ""
|
||||
MF_POSTGRES_READER_DB_SSL_ROOT_CERT: ""
|
||||
ports:
|
||||
- 9204:9204
|
||||
networks:
|
||||
- docker_mainflux-base-net
|
|
@ -0,0 +1,78 @@
|
|||
# Postgres reader
|
||||
|
||||
Postgres reader provides message repository implementation for Postgres.
|
||||
|
||||
## Configuration
|
||||
|
||||
The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|-------------------------------------|------------------------------------|-------------|
|
||||
| MF_THINGS_URL | Things service URL | things:8183 |
|
||||
| MF_POSTGRES_READER_LOG_LEVEL | Service log level | debug |
|
||||
| MF_POSTGRES_READER_PORT | Service HTTP port | 9204 |
|
||||
| MF_POSTGRES_READER_CLIENT_TLS | TLS mode flag | false |
|
||||
| MF_POSTGRES_READER_CA_CERTS | Path to trusted CAs in PEM format | "" |
|
||||
| MF_POSTGRES_READER_DB_HOST | Postgres DB host | postgres |
|
||||
| MF_POSTGRES_READER_DB_PORT | Postgres DB port | 5432 |
|
||||
| MF_POSTGRES_READER_DB_USER | Postgres user | mainflux |
|
||||
| MF_POSTGRES_READER_DB_PASS | Postgres password | mainflux |
|
||||
| MF_POSTGRES_READER_DB_NAME | Postgres database name | messages |
|
||||
| MF_POSTGRES_READER_DB_SSL_MODE | Postgres SSL mode | disabled |
|
||||
| MF_POSTGRES_READER_DB_SSL_CERT | Postgres SSL certificate path | "" |
|
||||
| MF_POSTGRES_READER_DB_SSL_KEY | Postgres SSL key | "" |
|
||||
| MF_POSTGRES_READER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
|
||||
|
||||
## Deployment
|
||||
|
||||
```yaml
|
||||
postgres-writer:
|
||||
image: mainflux/postgres-writer:[version]
|
||||
container_name: [instance name]
|
||||
depends_on:
|
||||
- postgres
|
||||
- nats
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_POSTGRES_READER_LOG_LEVEL: [Service log level]
|
||||
MF_POSTGRES_READER_PORT: [Service HTTP port]
|
||||
MF_POSTGRES_READER_DB_HOST: [Postgres host]
|
||||
MF_POSTGRES_READER_DB_PORT: [Postgres port]
|
||||
MF_POSTGRES_READER_DB_USER: [Postgres user]
|
||||
MF_POSTGRES_READER_DB_PASS: [Postgres password]
|
||||
MF_POSTGRES_READER_DB_NAME: [Postgres database name]
|
||||
MF_POSTGRES_READER_DB_SSL_MODE: [Postgres SSL mode]
|
||||
MF_POSTGRES_READER_DB_SSL_CERT: [Postgres SSL cert]
|
||||
MF_POSTGRES_READER_DB_SSL_KEY: [Postgres SSL key]
|
||||
MF_POSTGRES_READER_DB_SSL_ROOT_CERT: [Postgres SSL Root cert]
|
||||
ports:
|
||||
- 8903:8903
|
||||
networks:
|
||||
- docker_mainflux-base-net
|
||||
```
|
||||
|
||||
To start the service, execute the following shell script:
|
||||
|
||||
```bash
|
||||
# download the latest version of the service
|
||||
go get github.com/mainflux/mainflux
|
||||
|
||||
|
||||
cd $GOPATH/src/github.com/mainflux/mainflux
|
||||
|
||||
# compile the postgres writer
|
||||
make postgres-writer
|
||||
|
||||
# copy binary to bin
|
||||
make install
|
||||
|
||||
# Set the environment variables and run the service
|
||||
MF_THINGS_URL=[Things service URL] MF_POSTGRES_READER_LOG_LEVEL=[Service log level] MF_POSTGRES_READER_PORT=[Service HTTP port] MF_POSTGRES_READER_CLIENT_TLS =[TLS mode flag] MF_POSTGRES_READER_CA_CERTS=[Path to trusted CAs in PEM format] MF_POSTGRES_READER_DB_HOST=[Postgres host] MF_POSTGRES_READER_DB_PORT=[Postgres port] MF_POSTGRES_READER_DB_USER=[Postgres user] MF_POSTGRES_READER_DB_PASS=[Postgres password] MF_POSTGRES_READER_DB_NAME=[Postgres database name] MF_POSTGRES_READER_DB_SSL_MODE=[Postgres SSL mode] MF_POSTGRES_READER_DB_SSL_CERT=[Postgres SSL cert] MF_POSTGRES_READER_DB_SSL_KEY=[Postgres SSL key] MF_POSTGRES_READER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] $GOBIN/mainflux-postgres-reader
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
Starting service will start consuming normalized messages in SenML format.
|
|
@ -0,0 +1,10 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
// Package postgres contains repository implementations using Postgres as
|
||||
// the underlying database.
|
||||
package postgres
|
|
@ -0,0 +1,83 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq" // required for SQL access
|
||||
migrate "github.com/rubenv/sql-migrate"
|
||||
)
|
||||
|
||||
// Config defines the options that are used when connecting to a PostgreSQL instance
|
||||
type Config struct {
|
||||
Host string
|
||||
Port string
|
||||
User string
|
||||
Pass string
|
||||
Name string
|
||||
SSLMode string
|
||||
SSLCert string
|
||||
SSLKey string
|
||||
SSLRootCert string
|
||||
}
|
||||
|
||||
// Connect creates a connection to the PostgreSQL instance and applies any
|
||||
// unapplied database migrations. A non-nil error is returned to indicate
|
||||
// failure.
|
||||
func Connect(cfg Config) (*sqlx.DB, error) {
|
||||
url := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", cfg.Host, cfg.Port, cfg.User, cfg.Name, cfg.Pass, cfg.SSLMode, cfg.SSLCert, cfg.SSLKey, cfg.SSLRootCert)
|
||||
|
||||
db, err := sqlx.Open("postgres", url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := migrateDB(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func migrateDB(db *sqlx.DB) error {
|
||||
migrations := &migrate.MemoryMigrationSource{
|
||||
Migrations: []*migrate.Migration{
|
||||
{
|
||||
Id: "messages_1",
|
||||
Up: []string{
|
||||
`CREATE TABLE IF NOT EXISTS messages (
|
||||
id UUID,
|
||||
channel UUID,
|
||||
subtopic VARCHAR(254),
|
||||
publisher UUID,
|
||||
protocol TEXT,
|
||||
name TEXT,
|
||||
unit TEXT,
|
||||
value FLOAT,
|
||||
string_value TEXT,
|
||||
bool_value BOOL,
|
||||
data_value TEXT,
|
||||
value_sum FLOAT,
|
||||
time FlOAT,
|
||||
update_time FLOAT,
|
||||
link TEXT,
|
||||
PRIMARY KEY (id)
|
||||
)`,
|
||||
},
|
||||
Down: []string{
|
||||
"DROP TABLE messages",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up)
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jmoiron/sqlx" // required for DB access
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
)
|
||||
|
||||
const errInvalid = "invalid_text_representation"
|
||||
|
||||
var errInvalidMessage = errors.New("invalid message representation")
|
||||
|
||||
var _ readers.MessageRepository = (*postgresRepository)(nil)
|
||||
|
||||
type postgresRepository struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
// New returns new PostgreSQL writer.
|
||||
func New(db *sqlx.DB) readers.MessageRepository {
|
||||
return &postgresRepository{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
subtopicQuery := ""
|
||||
if query["subtopic"] != "" {
|
||||
subtopicQuery = `AND subtopic = :subtopic`
|
||||
}
|
||||
q := fmt.Sprintf(`SELECT * FROM messages
|
||||
WHERE channel = :channel %s ORDER BY time DESC
|
||||
LIMIT :limit OFFSET :offset;`, subtopicQuery)
|
||||
|
||||
params := map[string]interface{}{
|
||||
"channel": chanID,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"subtopic": query["subtopic"],
|
||||
}
|
||||
|
||||
rows, err := tr.db.NamedQuery(q, params)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
page := readers.MessagesPage{
|
||||
Offset: offset,
|
||||
Limit: limit,
|
||||
Messages: []mainflux.Message{},
|
||||
}
|
||||
for rows.Next() {
|
||||
dbm := dbMessage{Channel: chanID}
|
||||
if err := rows.StructScan(&dbm); err != nil {
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
msg, err := toMessage(dbm)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
page.Messages = append(page.Messages, msg)
|
||||
}
|
||||
|
||||
q = `SELECT COUNT(*) FROM messages WHERE channel = $1;`
|
||||
qParams := []interface{}{chanID}
|
||||
|
||||
if query["subtopic"] != "" {
|
||||
q = `SELECT COUNT(*) FROM messages WHERE channel = $1 AND subtopic = $2;`
|
||||
qParams = append(qParams, query["subtopic"])
|
||||
}
|
||||
|
||||
if err := tr.db.QueryRow(q, qParams...).Scan(&page.Total); err != nil {
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
return page, nil
|
||||
}
|
||||
|
||||
type dbMessage struct {
|
||||
ID string `db:"id"`
|
||||
Channel string `db:"channel"`
|
||||
Subtopic string `db:"subtopic"`
|
||||
Publisher string `db:"publisher"`
|
||||
Protocol string `db:"protocol"`
|
||||
Name string `db:"name"`
|
||||
Unit string `db:"unit"`
|
||||
FloatValue *float64 `db:"value"`
|
||||
StringValue *string `db:"string_value"`
|
||||
BoolValue *bool `db:"bool_value"`
|
||||
DataValue *string `db:"data_value"`
|
||||
ValueSum *float64 `db:"value_sum"`
|
||||
Time float64 `db:"time"`
|
||||
UpdateTime float64 `db:"update_time"`
|
||||
Link string `db:"link"`
|
||||
}
|
||||
|
||||
func toMessage(dbm dbMessage) (mainflux.Message, error) {
|
||||
msg := mainflux.Message{
|
||||
Channel: dbm.Channel,
|
||||
Subtopic: dbm.Subtopic,
|
||||
Publisher: dbm.Publisher,
|
||||
Protocol: dbm.Protocol,
|
||||
Name: dbm.Name,
|
||||
Unit: dbm.Unit,
|
||||
Time: dbm.Time,
|
||||
UpdateTime: dbm.UpdateTime,
|
||||
Link: dbm.Link,
|
||||
}
|
||||
|
||||
switch {
|
||||
case dbm.FloatValue != nil:
|
||||
msg.Value = &mainflux.Message_FloatValue{FloatValue: *dbm.FloatValue}
|
||||
case dbm.StringValue != nil:
|
||||
msg.Value = &mainflux.Message_StringValue{StringValue: *dbm.StringValue}
|
||||
case dbm.BoolValue != nil:
|
||||
msg.Value = &mainflux.Message_BoolValue{BoolValue: *dbm.BoolValue}
|
||||
case dbm.DataValue != nil:
|
||||
msg.Value = &mainflux.Message_DataValue{DataValue: *dbm.DataValue}
|
||||
case dbm.ValueSum != nil:
|
||||
msg.ValueSum = &mainflux.SumValue{Value: *dbm.ValueSum}
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package postgres_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
preader "github.com/mainflux/mainflux/readers/postgres"
|
||||
pwriter "github.com/mainflux/mainflux/writers/postgres"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
subtopic = "subtopic"
|
||||
msgsNum = 42
|
||||
valueFields = 5
|
||||
)
|
||||
|
||||
func TestMessageReadAll(t *testing.T) {
|
||||
messageRepo := pwriter.New(db)
|
||||
|
||||
chanID, err := uuid.NewV4()
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
pubID, err := uuid.NewV4()
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
wrongID, err := uuid.NewV4()
|
||||
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
|
||||
|
||||
msg := mainflux.Message{
|
||||
Channel: chanID.String(),
|
||||
Publisher: pubID.String(),
|
||||
Protocol: "mqtt",
|
||||
}
|
||||
|
||||
messages := []mainflux.Message{}
|
||||
subtopicMsgs := []mainflux.Message{}
|
||||
now := time.Now().Unix()
|
||||
for i := 0; i < msgsNum; i++ {
|
||||
// Mix possible values as well as value sum.
|
||||
count := i % valueFields
|
||||
msg.Subtopic = ""
|
||||
switch count {
|
||||
case 0:
|
||||
msg.Subtopic = subtopic
|
||||
msg.Value = &mainflux.Message_FloatValue{FloatValue: 5}
|
||||
case 1:
|
||||
msg.Value = &mainflux.Message_BoolValue{BoolValue: false}
|
||||
case 2:
|
||||
msg.Value = &mainflux.Message_StringValue{StringValue: "value"}
|
||||
case 3:
|
||||
msg.Value = &mainflux.Message_DataValue{DataValue: "base64data"}
|
||||
case 5:
|
||||
msg.ValueSum = &mainflux.SumValue{Value: 45}
|
||||
}
|
||||
msg.Time = float64(now - int64(i))
|
||||
|
||||
err := messageRepo.Save(msg)
|
||||
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
|
||||
messages = append(messages, msg)
|
||||
if count == 0 {
|
||||
subtopicMsgs = append(subtopicMsgs, msg)
|
||||
}
|
||||
}
|
||||
|
||||
reader := preader.New(db)
|
||||
|
||||
// Since messages are not saved in natural order,
|
||||
// cases that return subset of messages are only
|
||||
// checking data result set size, but not content.
|
||||
cases := map[string]struct {
|
||||
chanID string
|
||||
offset uint64
|
||||
limit uint64
|
||||
query map[string]string
|
||||
page readers.MessagesPage
|
||||
}{
|
||||
"read message page for existing channel": {
|
||||
chanID: chanID.String(),
|
||||
offset: 0,
|
||||
limit: msgsNum,
|
||||
page: readers.MessagesPage{
|
||||
Total: msgsNum,
|
||||
Offset: 0,
|
||||
Limit: msgsNum,
|
||||
Messages: messages,
|
||||
},
|
||||
},
|
||||
"read message page for non-existent channel": {
|
||||
chanID: wrongID.String(),
|
||||
offset: 0,
|
||||
limit: msgsNum,
|
||||
page: readers.MessagesPage{
|
||||
Total: 0,
|
||||
Offset: 0,
|
||||
Limit: msgsNum,
|
||||
Messages: []mainflux.Message{},
|
||||
},
|
||||
},
|
||||
"read message last page": {
|
||||
chanID: chanID.String(),
|
||||
offset: 40,
|
||||
limit: 5,
|
||||
page: readers.MessagesPage{
|
||||
Total: msgsNum,
|
||||
Offset: 40,
|
||||
Limit: 5,
|
||||
Messages: messages[40:42],
|
||||
},
|
||||
},
|
||||
"read message with non-existent subtopic": {
|
||||
chanID: chanID.String(),
|
||||
offset: 0,
|
||||
limit: msgsNum,
|
||||
query: map[string]string{"subtopic": "not-present"},
|
||||
page: readers.MessagesPage{
|
||||
Total: 0,
|
||||
Offset: 0,
|
||||
Limit: msgsNum,
|
||||
Messages: []mainflux.Message{},
|
||||
},
|
||||
},
|
||||
"read message with subtopic": {
|
||||
chanID: chanID.String(),
|
||||
offset: 0,
|
||||
limit: uint64(len(subtopicMsgs)),
|
||||
query: map[string]string{"subtopic": subtopic},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(subtopicMsgs)),
|
||||
Offset: 0,
|
||||
Limit: uint64(len(subtopicMsgs)),
|
||||
Messages: subtopicMsgs,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
|
||||
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
|
||||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
//
|
||||
// Copyright (c) 2019
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
// Package postgres_test contains tests for PostgreSQL repository
|
||||
// implementations.
|
||||
package postgres_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers/postgres"
|
||||
dockertest "gopkg.in/ory-am/dockertest.v3"
|
||||
)
|
||||
|
||||
const (
|
||||
wrongID = "0"
|
||||
wrongValue = "wrong-value"
|
||||
)
|
||||
|
||||
var (
|
||||
testLog, _ = logger.New(os.Stdout, logger.Info.String())
|
||||
db *sqlx.DB
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
pool, err := dockertest.NewPool("")
|
||||
if err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
cfg := []string{
|
||||
"POSTGRES_USER=test",
|
||||
"POSTGRES_PASSWORD=test",
|
||||
"POSTGRES_DB=test",
|
||||
}
|
||||
container, err := pool.Run("postgres", "10.2-alpine", cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not start container: %s", err)
|
||||
}
|
||||
|
||||
port := container.GetPort("5432/tcp")
|
||||
|
||||
if err = pool.Retry(func() error {
|
||||
url := fmt.Sprintf("host=localhost port=%s user=test dbname=test password=test sslmode=disable", port)
|
||||
db, err = sqlx.Open("postgres", url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return db.Ping()
|
||||
}); err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
dbConfig := postgres.Config{
|
||||
Host: "localhost",
|
||||
Port: port,
|
||||
User: "test",
|
||||
Pass: "test",
|
||||
Name: "test",
|
||||
SSLMode: "disable",
|
||||
SSLCert: "",
|
||||
SSLKey: "",
|
||||
SSLRootCert: "",
|
||||
}
|
||||
|
||||
if db, err = postgres.Connect(dbConfig); err != nil {
|
||||
log.Fatalf("Could not setup test DB connection: %s", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
code := m.Run()
|
||||
|
||||
if err = pool.Purge(container); err != nil {
|
||||
log.Fatalf("Could not purge container: %s", err)
|
||||
}
|
||||
|
||||
os.Exit(code)
|
||||
}
|
Loading…
Reference in New Issue