diff --git a/Makefile b/Makefile index 0fd81aa7..5583a15e 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ ## SPDX-License-Identifier: Apache-2.0 BUILD_DIR = build -SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer cli bootstrap +SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap DOCKERS = $(addprefix docker_,$(SERVICES)) DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES)) CGO_ENABLED ?= 0 diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go new file mode 100644 index 00000000..91eee748 --- /dev/null +++ b/cmd/postgres-reader/main.go @@ -0,0 +1,187 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/jmoiron/sqlx" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/readers" + "github.com/mainflux/mainflux/readers/api" + "github.com/mainflux/mainflux/readers/postgres" + thingsapi "github.com/mainflux/mainflux/things/api/grpc" + stdprometheus "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const ( + svcName = "postgres-writer" + sep = "," + + defThingsURL = "localhost:8183" + defLogLevel = "debug" + defPort = "9204" + defClientTLS = "false" + defCACerts = "" + defDBHost = "localhost" + defDBPort = "5432" + defDBUser = "mainflux" + defDBPass = "mainflux" + defDBName = "messages" + defDBSSLMode = "disable" + defDBSSLCert = "" + defDBSSLKey = "" + defDBSSLRootCert = "" + + envThingsURL = "MF_THINGS_URL" + envLogLevel = "MF_POSTGRES_READER_LOG_LEVEL" + envPort = "MF_POSTGRES_READER_PORT" + envClientTLS = "MF_POSTGRES_READER_CLIENT_TLS" + envCACerts = "MF_POSTGRES_READER_CA_CERTS" + envDBHost = "MF_POSTGRES_READER_DB_HOST" + envDBPort = "MF_POSTGRES_READER_DB_PORT" + envDBUser = "MF_POSTGRES_READER_DB_USER" + envDBPass = "MF_POSTGRES_READER_DB_PASS" + envDBName = "MF_POSTGRES_READER_DB_NAME" + envDBSSLMode = "MF_POSTGRES_READER_DB_SSL_MODE" + envDBSSLCert = "MF_POSTGRES_READER_DB_SSL_CERT" + envDBSSLKey = "MF_POSTGRES_READER_DB_SSL_KEY" + envDBSSLRootCert = "MF_POSTGRES_READER_DB_SSL_ROOT_CERT" +) + +type config struct { + thingsURL string + logLevel string + port string + clientTLS bool + caCerts string + dbConfig postgres.Config +} + +func main() { + cfg := loadConfig() + + logger, err := logger.New(os.Stdout, cfg.logLevel) + if err != nil { + log.Fatalf(err.Error()) + } + + conn := connectToThings(cfg, logger) + defer conn.Close() + + tc := thingsapi.NewClient(conn) + + db := connectToDB(cfg.dbConfig, logger) + defer db.Close() + + repo := newService(db, logger) + + errs := make(chan error, 2) + + go startHTTPServer(repo, tc, cfg.port, logger, errs) + + go func() { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT) + errs <- fmt.Errorf("%s", <-c) + }() + + err = <-errs + logger.Error(fmt.Sprintf("Postgres writer service terminated: %s", err)) +} + +func loadConfig() config { + dbConfig := postgres.Config{ + Host: mainflux.Env(envDBHost, defDBHost), + Port: mainflux.Env(envDBPort, defDBPort), + User: mainflux.Env(envDBUser, defDBUser), + Pass: mainflux.Env(envDBPass, defDBPass), + Name: mainflux.Env(envDBName, defDBName), + SSLMode: mainflux.Env(envDBSSLMode, defDBSSLMode), + SSLCert: mainflux.Env(envDBSSLCert, defDBSSLCert), + SSLKey: mainflux.Env(envDBSSLKey, defDBSSLKey), + SSLRootCert: mainflux.Env(envDBSSLRootCert, defDBSSLRootCert), + } + + return config{ + thingsURL: mainflux.Env(envThingsURL, defThingsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + dbConfig: dbConfig, + } +} + +func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB { + db, err := postgres.Connect(dbConfig) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to Postgres: %s", err)) + os.Exit(1) + } + return db +} + +func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn { + var opts []grpc.DialOption + if cfg.clientTLS { + if cfg.caCerts != "" { + tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "") + if err != nil { + logger.Error(fmt.Sprintf("Failed to load certs: %s", err)) + os.Exit(1) + } + opts = append(opts, grpc.WithTransportCredentials(tpc)) + } + } else { + logger.Info("gRPC communication is not encrypted") + opts = append(opts, grpc.WithInsecure()) + } + + conn, err := grpc.Dial(cfg.thingsURL, opts...) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err)) + os.Exit(1) + } + return conn +} + +func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository { + svc := postgres.New(db) + svc = api.LoggingMiddleware(svc, logger) + svc = api.MetricsMiddleware( + svc, + kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "postgres", + Subsystem: "message_writer", + Name: "request_count", + Help: "Number of requests received.", + }, []string{"method"}), + kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "postgres", + Subsystem: "message_writer", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, []string{"method"}), + ) + + return svc +} + +func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) { + p := fmt.Sprintf(":%s", port) + logger.Info(fmt.Sprintf("Postgres reader service started, exposed port %s", port)) + errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, svcName)) +} diff --git a/docker/addons/postgres-reader/docker-compose.yml b/docker/addons/postgres-reader/docker-compose.yml new file mode 100644 index 00000000..15c2046e --- /dev/null +++ b/docker/addons/postgres-reader/docker-compose.yml @@ -0,0 +1,38 @@ +### +# This docker-compose file contains optional Postgres-reader service for Mainflux platform. +# Since this service is optional, this file is dependent of docker-compose.yml file +# from /docker. In order to run these optional service, execute command: +# docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-reader/docker-compose.yml up +# from project root. +### + +version: "3" + +networks: + docker_mainflux-base-net: + external: true + +services: + postgres-reader: + image: mainflux/postgres-reader:latest + container_name: mainflux-postgres-reader + restart: on-failure + environment: + MF_THINGS_URL: things:8183 + MF_POSTGRES_READER_LOG_LEVEL: debug + MF_POSTGRES_READER_PORT: 9204 + MF_POSTGRES_READER_CLIENT_TLS: "false" + MF_POSTGRES_READER_CA_CERTS: "" + MF_POSTGRES_READER_DB_HOST: postgres + MF_POSTGRES_READER_DB_PORT: 5432 + MF_POSTGRES_READER_DB_USER: mainflux + MF_POSTGRES_READER_DB_PASS: mainflux + MF_POSTGRES_READER_DB_NAME: messages + MF_POSTGRES_READER_DB_SSL_MODE: disable + MF_POSTGRES_READER_DB_SSL_CERT: "" + MF_POSTGRES_READER_DB_SSL_KEY: "" + MF_POSTGRES_READER_DB_SSL_ROOT_CERT: "" + ports: + - 9204:9204 + networks: + - docker_mainflux-base-net diff --git a/readers/postgres/README.md b/readers/postgres/README.md new file mode 100644 index 00000000..a5c8aeac --- /dev/null +++ b/readers/postgres/README.md @@ -0,0 +1,78 @@ +# Postgres reader + +Postgres reader provides message repository implementation for Postgres. + +## Configuration + +The service is configured using the environment variables presented in the +following table. Note that any unset variables will be replaced with their +default values. + +| Variable | Description | Default | +|-------------------------------------|------------------------------------|-------------| +| MF_THINGS_URL | Things service URL | things:8183 | +| MF_POSTGRES_READER_LOG_LEVEL | Service log level | debug | +| MF_POSTGRES_READER_PORT | Service HTTP port | 9204 | +| MF_POSTGRES_READER_CLIENT_TLS | TLS mode flag | false | +| MF_POSTGRES_READER_CA_CERTS | Path to trusted CAs in PEM format | "" | +| MF_POSTGRES_READER_DB_HOST | Postgres DB host | postgres | +| MF_POSTGRES_READER_DB_PORT | Postgres DB port | 5432 | +| MF_POSTGRES_READER_DB_USER | Postgres user | mainflux | +| MF_POSTGRES_READER_DB_PASS | Postgres password | mainflux | +| MF_POSTGRES_READER_DB_NAME | Postgres database name | messages | +| MF_POSTGRES_READER_DB_SSL_MODE | Postgres SSL mode | disabled | +| MF_POSTGRES_READER_DB_SSL_CERT | Postgres SSL certificate path | "" | +| MF_POSTGRES_READER_DB_SSL_KEY | Postgres SSL key | "" | +| MF_POSTGRES_READER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | + +## Deployment + +```yaml + postgres-writer: + image: mainflux/postgres-writer:[version] + container_name: [instance name] + depends_on: + - postgres + - nats + restart: on-failure + environment: + MF_NATS_URL: [NATS instance URL] + MF_POSTGRES_READER_LOG_LEVEL: [Service log level] + MF_POSTGRES_READER_PORT: [Service HTTP port] + MF_POSTGRES_READER_DB_HOST: [Postgres host] + MF_POSTGRES_READER_DB_PORT: [Postgres port] + MF_POSTGRES_READER_DB_USER: [Postgres user] + MF_POSTGRES_READER_DB_PASS: [Postgres password] + MF_POSTGRES_READER_DB_NAME: [Postgres database name] + MF_POSTGRES_READER_DB_SSL_MODE: [Postgres SSL mode] + MF_POSTGRES_READER_DB_SSL_CERT: [Postgres SSL cert] + MF_POSTGRES_READER_DB_SSL_KEY: [Postgres SSL key] + MF_POSTGRES_READER_DB_SSL_ROOT_CERT: [Postgres SSL Root cert] + ports: + - 8903:8903 + networks: + - docker_mainflux-base-net +``` + +To start the service, execute the following shell script: + +```bash +# download the latest version of the service +go get github.com/mainflux/mainflux + + +cd $GOPATH/src/github.com/mainflux/mainflux + +# compile the postgres writer +make postgres-writer + +# copy binary to bin +make install + +# Set the environment variables and run the service +MF_THINGS_URL=[Things service URL] MF_POSTGRES_READER_LOG_LEVEL=[Service log level] MF_POSTGRES_READER_PORT=[Service HTTP port] MF_POSTGRES_READER_CLIENT_TLS =[TLS mode flag] MF_POSTGRES_READER_CA_CERTS=[Path to trusted CAs in PEM format] MF_POSTGRES_READER_DB_HOST=[Postgres host] MF_POSTGRES_READER_DB_PORT=[Postgres port] MF_POSTGRES_READER_DB_USER=[Postgres user] MF_POSTGRES_READER_DB_PASS=[Postgres password] MF_POSTGRES_READER_DB_NAME=[Postgres database name] MF_POSTGRES_READER_DB_SSL_MODE=[Postgres SSL mode] MF_POSTGRES_READER_DB_SSL_CERT=[Postgres SSL cert] MF_POSTGRES_READER_DB_SSL_KEY=[Postgres SSL key] MF_POSTGRES_READER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] $GOBIN/mainflux-postgres-reader +``` + +## Usage + +Starting service will start consuming normalized messages in SenML format. diff --git a/readers/postgres/doc.go b/readers/postgres/doc.go new file mode 100644 index 00000000..2a5bf89e --- /dev/null +++ b/readers/postgres/doc.go @@ -0,0 +1,10 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +// Package postgres contains repository implementations using Postgres as +// the underlying database. +package postgres diff --git a/readers/postgres/init.go b/readers/postgres/init.go new file mode 100644 index 00000000..aa1c9715 --- /dev/null +++ b/readers/postgres/init.go @@ -0,0 +1,83 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package postgres + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" // required for SQL access + migrate "github.com/rubenv/sql-migrate" +) + +// Config defines the options that are used when connecting to a PostgreSQL instance +type Config struct { + Host string + Port string + User string + Pass string + Name string + SSLMode string + SSLCert string + SSLKey string + SSLRootCert string +} + +// Connect creates a connection to the PostgreSQL instance and applies any +// unapplied database migrations. A non-nil error is returned to indicate +// failure. +func Connect(cfg Config) (*sqlx.DB, error) { + url := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", cfg.Host, cfg.Port, cfg.User, cfg.Name, cfg.Pass, cfg.SSLMode, cfg.SSLCert, cfg.SSLKey, cfg.SSLRootCert) + + db, err := sqlx.Open("postgres", url) + if err != nil { + return nil, err + } + + if err := migrateDB(db); err != nil { + return nil, err + } + + return db, nil +} + +func migrateDB(db *sqlx.DB) error { + migrations := &migrate.MemoryMigrationSource{ + Migrations: []*migrate.Migration{ + { + Id: "messages_1", + Up: []string{ + `CREATE TABLE IF NOT EXISTS messages ( + id UUID, + channel UUID, + subtopic VARCHAR(254), + publisher UUID, + protocol TEXT, + name TEXT, + unit TEXT, + value FLOAT, + string_value TEXT, + bool_value BOOL, + data_value TEXT, + value_sum FLOAT, + time FlOAT, + update_time FLOAT, + link TEXT, + PRIMARY KEY (id) + )`, + }, + Down: []string{ + "DROP TABLE messages", + }, + }, + }, + } + + _, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up) + return err +} diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go new file mode 100644 index 00000000..825f66c3 --- /dev/null +++ b/readers/postgres/messages.go @@ -0,0 +1,137 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package postgres + +import ( + "errors" + "fmt" + + "github.com/jmoiron/sqlx" // required for DB access + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" +) + +const errInvalid = "invalid_text_representation" + +var errInvalidMessage = errors.New("invalid message representation") + +var _ readers.MessageRepository = (*postgresRepository)(nil) + +type postgresRepository struct { + db *sqlx.DB +} + +// New returns new PostgreSQL writer. +func New(db *sqlx.DB) readers.MessageRepository { + return &postgresRepository{ + db: db, + } +} + +func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { + subtopicQuery := "" + if query["subtopic"] != "" { + subtopicQuery = `AND subtopic = :subtopic` + } + q := fmt.Sprintf(`SELECT * FROM messages + WHERE channel = :channel %s ORDER BY time DESC + LIMIT :limit OFFSET :offset;`, subtopicQuery) + + params := map[string]interface{}{ + "channel": chanID, + "limit": limit, + "offset": offset, + "subtopic": query["subtopic"], + } + + rows, err := tr.db.NamedQuery(q, params) + if err != nil { + return readers.MessagesPage{}, err + } + defer rows.Close() + + page := readers.MessagesPage{ + Offset: offset, + Limit: limit, + Messages: []mainflux.Message{}, + } + for rows.Next() { + dbm := dbMessage{Channel: chanID} + if err := rows.StructScan(&dbm); err != nil { + return readers.MessagesPage{}, err + } + + msg, err := toMessage(dbm) + if err != nil { + return readers.MessagesPage{}, err + } + + page.Messages = append(page.Messages, msg) + } + + q = `SELECT COUNT(*) FROM messages WHERE channel = $1;` + qParams := []interface{}{chanID} + + if query["subtopic"] != "" { + q = `SELECT COUNT(*) FROM messages WHERE channel = $1 AND subtopic = $2;` + qParams = append(qParams, query["subtopic"]) + } + + if err := tr.db.QueryRow(q, qParams...).Scan(&page.Total); err != nil { + return readers.MessagesPage{}, err + } + + return page, nil +} + +type dbMessage struct { + ID string `db:"id"` + Channel string `db:"channel"` + Subtopic string `db:"subtopic"` + Publisher string `db:"publisher"` + Protocol string `db:"protocol"` + Name string `db:"name"` + Unit string `db:"unit"` + FloatValue *float64 `db:"value"` + StringValue *string `db:"string_value"` + BoolValue *bool `db:"bool_value"` + DataValue *string `db:"data_value"` + ValueSum *float64 `db:"value_sum"` + Time float64 `db:"time"` + UpdateTime float64 `db:"update_time"` + Link string `db:"link"` +} + +func toMessage(dbm dbMessage) (mainflux.Message, error) { + msg := mainflux.Message{ + Channel: dbm.Channel, + Subtopic: dbm.Subtopic, + Publisher: dbm.Publisher, + Protocol: dbm.Protocol, + Name: dbm.Name, + Unit: dbm.Unit, + Time: dbm.Time, + UpdateTime: dbm.UpdateTime, + Link: dbm.Link, + } + + switch { + case dbm.FloatValue != nil: + msg.Value = &mainflux.Message_FloatValue{FloatValue: *dbm.FloatValue} + case dbm.StringValue != nil: + msg.Value = &mainflux.Message_StringValue{StringValue: *dbm.StringValue} + case dbm.BoolValue != nil: + msg.Value = &mainflux.Message_BoolValue{BoolValue: *dbm.BoolValue} + case dbm.DataValue != nil: + msg.Value = &mainflux.Message_DataValue{DataValue: *dbm.DataValue} + case dbm.ValueSum != nil: + msg.ValueSum = &mainflux.SumValue{Value: *dbm.ValueSum} + } + + return msg, nil +} diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go new file mode 100644 index 00000000..b3760021 --- /dev/null +++ b/readers/postgres/messages_test.go @@ -0,0 +1,153 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package postgres_test + +import ( + "fmt" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" + preader "github.com/mainflux/mainflux/readers/postgres" + pwriter "github.com/mainflux/mainflux/writers/postgres" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + subtopic = "subtopic" + msgsNum = 42 + valueFields = 5 +) + +func TestMessageReadAll(t *testing.T) { + messageRepo := pwriter.New(db) + + chanID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + pubID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + wrongID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + msg := mainflux.Message{ + Channel: chanID.String(), + Publisher: pubID.String(), + Protocol: "mqtt", + } + + messages := []mainflux.Message{} + subtopicMsgs := []mainflux.Message{} + now := time.Now().Unix() + for i := 0; i < msgsNum; i++ { + // Mix possible values as well as value sum. + count := i % valueFields + msg.Subtopic = "" + switch count { + case 0: + msg.Subtopic = subtopic + msg.Value = &mainflux.Message_FloatValue{FloatValue: 5} + case 1: + msg.Value = &mainflux.Message_BoolValue{BoolValue: false} + case 2: + msg.Value = &mainflux.Message_StringValue{StringValue: "value"} + case 3: + msg.Value = &mainflux.Message_DataValue{DataValue: "base64data"} + case 5: + msg.ValueSum = &mainflux.SumValue{Value: 45} + } + msg.Time = float64(now - int64(i)) + + err := messageRepo.Save(msg) + assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) + messages = append(messages, msg) + if count == 0 { + subtopicMsgs = append(subtopicMsgs, msg) + } + } + + reader := preader.New(db) + + // Since messages are not saved in natural order, + // cases that return subset of messages are only + // checking data result set size, but not content. + cases := map[string]struct { + chanID string + offset uint64 + limit uint64 + query map[string]string + page readers.MessagesPage + }{ + "read message page for existing channel": { + chanID: chanID.String(), + offset: 0, + limit: msgsNum, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: msgsNum, + Messages: messages, + }, + }, + "read message page for non-existent channel": { + chanID: wrongID.String(), + offset: 0, + limit: msgsNum, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, + }, + "read message last page": { + chanID: chanID.String(), + offset: 40, + limit: 5, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 40, + Limit: 5, + Messages: messages[40:42], + }, + }, + "read message with non-existent subtopic": { + chanID: chanID.String(), + offset: 0, + limit: msgsNum, + query: map[string]string{"subtopic": "not-present"}, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, + }, + "read message with subtopic": { + chanID: chanID.String(), + offset: 0, + limit: uint64(len(subtopicMsgs)), + query: map[string]string{"subtopic": subtopic}, + page: readers.MessagesPage{ + Total: uint64(len(subtopicMsgs)), + Offset: 0, + Limit: uint64(len(subtopicMsgs)), + Messages: subtopicMsgs, + }, + }, + } + + for desc, tc := range cases { + result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) + assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) + assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) + } +} diff --git a/readers/postgres/setup_test.go b/readers/postgres/setup_test.go new file mode 100644 index 00000000..62dcfa1d --- /dev/null +++ b/readers/postgres/setup_test.go @@ -0,0 +1,87 @@ +// +// Copyright (c) 2019 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +// Package postgres_test contains tests for PostgreSQL repository +// implementations. +package postgres_test + +import ( + "fmt" + "log" + "os" + "testing" + + "github.com/jmoiron/sqlx" + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/readers/postgres" + dockertest "gopkg.in/ory-am/dockertest.v3" +) + +const ( + wrongID = "0" + wrongValue = "wrong-value" +) + +var ( + testLog, _ = logger.New(os.Stdout, logger.Info.String()) + db *sqlx.DB +) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + cfg := []string{ + "POSTGRES_USER=test", + "POSTGRES_PASSWORD=test", + "POSTGRES_DB=test", + } + container, err := pool.Run("postgres", "10.2-alpine", cfg) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + + port := container.GetPort("5432/tcp") + + if err = pool.Retry(func() error { + url := fmt.Sprintf("host=localhost port=%s user=test dbname=test password=test sslmode=disable", port) + db, err = sqlx.Open("postgres", url) + if err != nil { + return err + } + return db.Ping() + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + dbConfig := postgres.Config{ + Host: "localhost", + Port: port, + User: "test", + Pass: "test", + Name: "test", + SSLMode: "disable", + SSLCert: "", + SSLKey: "", + SSLRootCert: "", + } + + if db, err = postgres.Connect(dbConfig); err != nil { + log.Fatalf("Could not setup test DB connection: %s", err) + } + defer db.Close() + + code := m.Run() + + if err = pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +}