MF-311 - Implement basic InfluxDB reader (#365)

* Add InfluxDB reader

Summary:
- Add basic reader features
- Update Makefile

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Raise test coverage

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update README.md and docker composition

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix docker-compose.yml

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove exposed ports

Ports are already exposed by mapping, so no need to explicity expose them.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
Dušan Borovčanin 2018-08-25 12:48:03 +02:00 committed by Nikola Marčetić
parent 86f0d8e0d6
commit 507bd2ee84
10 changed files with 495 additions and 7 deletions

View File

@ -1,5 +1,5 @@
BUILD_DIR = build
SERVICES = users things http normalizer ws influxdb mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
SERVICES = users things http normalizer ws influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
DOCKERS = $(addprefix docker_,$(SERVICES))
CGO_ENABLED ?= 0
GOOS ?= linux

141
cmd/influxdb-reader/main.go Normal file
View File

@ -0,0 +1,141 @@
package main
import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/influxdb"
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
const (
defThingsURL = "localhost:8181"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
envThingsURL = "MF_THINGS_URL"
envPort = "MF_INFLUX_READER_PORT"
envDBName = "MF_INFLUX_READER_DB_NAME"
envDBHost = "MF_INFLUX_READER_DB_HOST"
envDBPort = "MF_INFLUX_READER_DB_PORT"
envDBUser = "MF_INFLUX_READER_DB_USER"
envDBPass = "MF_INFLUX_READER_DB_PASS"
)
type config struct {
ThingsURL string
Port string
DBName string
DBHost string
DBPort string
DBUser string
DBPass string
}
func main() {
cfg, clientCfg := loadConfigs()
logger := log.New(os.Stdout)
conn := connectToThings(cfg.ThingsURL, logger)
defer conn.Close()
tc := thingsapi.NewClient(conn)
client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB client: %s", err))
os.Exit(1)
}
defer client.Close()
repo, err := influxdb.New(client, cfg.DBName)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err))
os.Exit(1)
}
errs := make(chan error, 2)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
go startHTTPServer(repo, tc, cfg.Port, logger, errs)
err = <-errs
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}
func loadConfigs() (config, influxdata.HTTPConfig) {
cfg := config{
ThingsURL: mainflux.Env(envThingsURL, defThingsURL),
Port: mainflux.Env(envPort, defPort),
DBName: mainflux.Env(envDBName, defDBName),
DBHost: mainflux.Env(envDBHost, defDBHost),
DBPort: mainflux.Env(envDBPort, defDBPort),
DBUser: mainflux.Env(envDBUser, defDBUser),
DBPass: mainflux.Env(envDBPass, defDBPass),
}
clientCfg := influxdata.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", cfg.DBHost, cfg.DBPort),
Username: cfg.DBUser,
Password: cfg.DBPass,
}
return cfg, clientCfg
}
func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
conn, err := grpc.Dial(url, grpc.WithInsecure())
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
return conn
}
func newService(client influxdata.Client, logger log.Logger) readers.MessageRepository {
repo, _ := influxdb.New(client, "mainflux")
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
repo,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",
Subsystem: "message_reader",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "influxdb",
Subsystem: "message_reader",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return repo
}
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger log.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("InfluxDB reader service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "influxdb-reader"))
}

View File

@ -0,0 +1,30 @@
###
# This docker-compose file contains optional InfluxDB-reader service for the Mainflux
# platform. Since this service is optional, this file is dependent on the docker-compose.yml
# file from <project_root>/docker/. In order to run InfluxDB-reader service, core services,
# as well as the network from the core composition, should be already running.
###
version: "3"
networks:
docker_mainflux-base-net:
external: true
services:
influxdb-reader:
image: mainflux/influxdb-reader:latest
container_name: mainflux-influxdb-reader
restart: on-failure
environment:
MF_THINGS_URL: things:8183
MF_INFLUX_READER_PORT: 8905
MF_INFLUX_READER_DB_NAME: mainflux
MF_INFLUX_READER_DB_HOST: mainflux-influxdb
MF_INFLUX_READER_DB_PORT: 8086
MF_INFLUX_READER_DB_USER: mainflux
MF_INFLUX_READER_DB_PASS: mainflux
ports:
- 8905:8905
networks:
- docker_mainflux-base-net

View File

@ -1,9 +1,8 @@
###
# This docker-compose file contains optional InfluxDB, InfluxDB-writer and Grafana services
# for Mainflux platform. Since these are optional, this file is dependent of docker-compose file
# from <project_root>/docker. In order to run these optional service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/influxdb/docker-compose.yml up
# from project root.
# for the Mainflux platform. Since this services are optional, this file is dependent on the
# docker-compose.yml file from <project_root>/docker/. In order to run these services,
# core services, as well as the network from the core composition, should be already running.
###
version: "3"
@ -26,11 +25,10 @@ services:
- docker_mainflux-base-net
influxdb-writer:
image: mainflux/influxdb:latest
image: mainflux/influxdb-writer:latest
container_name: mainflux-influxdb-writer
depends_on:
- influxdb
- nats
expose:
- 8900
restart: on-failure

View File

@ -0,0 +1,75 @@
# InfluxDB reader
InfluxDB reader provides message repository implementation for InfluxDB.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|---------------------------|-----------------------------------|-----------------------|
| MF_INFLUX_READER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_READER_DB_NAME | InfluxDB database name | mainflux |
| MF_INFLUX_READER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUX_READER_DB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUX_READER_DB_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUX_READER_DB_PASS | Default password of InfluxDB user | mainflux |
## Deployment
```yaml
version: "2"
influxdb-reader:
image: mainflux/influxdb-reader:[version]
container_name: [instance name]
restart: on-failure
environment:
MF_THINGS_URL: [Things service URL]
MF_INFLUX_READER_PORT: [Service HTTP port]
MF_INFLUX_READER_DB_NAME: [InfluxDB name]
MF_INFLUX_READER_DB_HOST: [InfluxDB host]
MF_INFLUX_READER_DB_PORT: [InfluxDB port]
MF_INFLUX_READER_DB_USER: [InfluxDB admin user]
MF_INFLUX_READER_DB_PASS: [InfluxDB admin password]
ports:
- [host machine port]:[configured HTTP port]
```
To start the service, execute the following shell script:
```bash
# download the latest version of the service
go get github.com/mainflux/mainflux
cd $GOPATH/src/github.com/mainflux/mainflux
# compile the influxdb-reader
make influxdb-reader
# copy binary to bin
make install
# Set the environment variables and run the service
MF_THINGS_URL=[Things service URL] MF_INFLUX_READER_PORT=[Service HTTP port] MF_INFLUX_READER_DB_NAME=[InfluxDB database name] MF_INFLUX_READER_DB_HOST=[InfluxDB database host] MF_INFLUX_READER_DB_PORT=[InfluxDB database port] MF_INFLUX_READER_DB_USER=[InfluxDB admin user] MF_INFLUX_READER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb
```
### Using docker-compose
This service can be deployed using docker containers. Docker compose file is
available in `<project_root>/docker/addons/influxdb-reader/docker-compose.yml`.
In order to run all Mainflux core services, as well as mentioned optional ones,
execute following command:
```bash
docker-compose -f docker/docker-compose.yml up -d
docker-compose -f docker/addons/influxdb-reader/docker-compose.yml up -d
```
## Usage
Service exposes HTTP API for fetching messages.
[doc]: ../swagger.yml

3
readers/influxdb/doc.go Normal file
View File

@ -0,0 +1,3 @@
// Package influxdb contains the domain concept definitions needed to
// support Mainflux InfluxDB reader service functionality.
package influxdb

View File

@ -0,0 +1,102 @@
package influxdb
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"github.com/mainflux/mainflux/readers"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
)
const maxLimit = 100
var _ readers.MessageRepository = (*influxRepository)(nil)
type influxRepository struct {
database string
client influxdata.Client
}
type fields map[string]interface{}
type tags map[string]string
// New returns new InfluxDB reader.
func New(client influxdata.Client, database string) (readers.MessageRepository, error) {
return &influxRepository{database, client}, nil
}
func (repo *influxRepository) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
if limit > maxLimit {
limit = maxLimit
}
cmd := fmt.Sprintf(`SELECT * from messages WHERE Channel='%d' LIMIT %d OFFSET %d`, chanID, limit, offset)
q := influxdata.Query{
Command: cmd,
Database: repo.database,
}
ret := []mainflux.Message{}
resp, err := repo.client.Query(q)
if err != nil || resp.Error() != nil {
return ret
}
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
return ret
}
result := resp.Results[0].Series[0]
for _, v := range result.Values {
ret = append(ret, genMessage(result.Columns, v))
}
return ret
}
// GenMessage and parseFloat are util methods. Since InfluxDB client returns
// results in some proprietary from, this obscure message conversion is needed
// to return actual []mainflux.Message from the query result.
func parseFloat(value interface{}) float64 {
switch value.(type) {
case string:
ret, _ := strconv.ParseFloat(value.(string), 64)
return ret
case json.Number:
ret, _ := strconv.ParseFloat((value.(json.Number)).String(), 64)
return ret
}
return 0
}
func genMessage(names []string, fields []interface{}) mainflux.Message {
m := mainflux.Message{}
v := reflect.ValueOf(&m).Elem()
for i, name := range names {
msgField := v.FieldByName(name)
if !msgField.IsValid() {
continue
}
f := msgField.Interface()
switch f.(type) {
case string:
if s, ok := fields[i].(string); ok {
msgField.SetString(s)
}
case uint64:
u, _ := strconv.ParseUint(fields[i].(string), 10, 64)
msgField.SetUint(u)
case float64:
msgField.SetFloat(parseFloat(fields[i]))
case bool:
if b, ok := fields[i].(bool); ok {
msgField.SetBool(b)
}
}
}
return m
}

View File

@ -0,0 +1,92 @@
package influxdb_test
import (
"fmt"
"os"
"testing"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
reader "github.com/mainflux/mainflux/readers/influxdb"
writer "github.com/mainflux/mainflux/writers/influxdb"
log "github.com/mainflux/mainflux/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testDB = "test"
chanID = 1
numOfMessages = 101
)
var (
port string
client influxdata.Client
clientCfg = influxdata.HTTPConfig{
Username: "test",
Password: "test",
}
msg = mainflux.Message{
Channel: chanID,
Publisher: 1,
Protocol: "mqtt",
}
testLog = log.New(os.Stdout)
)
func TestReadAll(t *testing.T) {
client, err := influxdata.NewHTTPClient(clientCfg)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err))
writer, err := writer.New(client, testDB)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err))
messages := []mainflux.Message{}
for i := 0; i < numOfMessages; i++ {
err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err))
messages = append(messages, msg)
}
reader, err := reader.New(client, testDB)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err))
cases := map[string]struct {
chanID uint64
offset uint64
limit uint64
messages []mainflux.Message
}{
"read message page for existing channel": {
chanID: chanID,
offset: 0,
limit: 10,
messages: messages[0:10],
},
"read message page for too large limit": {
chanID: chanID,
offset: 0,
limit: 101,
messages: messages[0:100],
},
"read message page for non-existent channel": {
chanID: 2,
offset: 0,
limit: 10,
messages: []mainflux.Message{},
},
"read message last page": {
chanID: chanID,
offset: 95,
limit: 10,
messages: messages[95:101],
},
}
for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result))
}
}

View File

@ -0,0 +1,47 @@
package influxdb_test
import (
"fmt"
"os"
"testing"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
dockertest "gopkg.in/ory-am/dockertest.v3"
)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err))
}
cfg := []string{
"INFLUXDB_USER=test",
"INFLUXDB_USER_PASSWORD=test",
"INFLUXDB_DB=test",
}
container, err := pool.Run("influxdb", "1.5.2-alpine", cfg)
if err != nil {
testLog.Error(fmt.Sprintf("Could not start container: %s", err))
}
port = container.GetPort("8086/tcp")
clientCfg.Addr = fmt.Sprintf("http://localhost:%s", port)
if err := pool.Retry(func() error {
client, err = influxdb.NewHTTPClient(clientCfg)
_, _, err = client.Ping(5 * time.Millisecond)
return err
}); err != nil {
testLog.Error(fmt.Sprintf("Could not connect to docker: %s", err))
}
code := m.Run()
if err := pool.Purge(container); err != nil {
testLog.Error(fmt.Sprintf("Could not purge container: %s", err))
}
os.Exit(code)
}