MF-1623 - Bring back WebSocket Adapter (#1625)

Signed-off-by: aryan <aryangodara03@gmail.com>
This commit is contained in:
Aryan Godara 2022-09-16 17:37:12 +05:30 committed by GitHub
parent 2e0299119d
commit 8a4cc12cd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2134 additions and 30 deletions

View File

@ -3,7 +3,7 @@
MF_DOCKER_IMAGE_NAME_PREFIX ?= mainflux
BUILD_DIR = build
SERVICES = users things http coap lora influxdb-writer influxdb-reader mongodb-writer \
SERVICES = users things http coap ws lora influxdb-writer influxdb-reader mongodb-writer \
mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader timescale-writer timescale-reader cli \
bootstrap opcua auth twins mqtt provision certs smtp-notifier smpp-notifier
DOCKERS = $(addprefix docker_,$(SERVICES))

41
api/openapi/websocket.yml Normal file
View File

@ -0,0 +1,41 @@
openapi: 3.0.1
info:
title: Mainflux ws adapter
description: WebSocket API for sending messages through communication channels.
version: "1.0.0"
paths:
/channels/{id}/messages:
post:
summary: Sends message to the communication channel
description: |
Sends message to the communication channel. Messages can be sent as
JSON formatted SenML or as blob.
tags:
- messages
parameters:
- $ref: "#/components/parameters/ID"
requestBody:
$ref: "#/components/requestBodies/MessageReq"
responses:
"202":
description: Message is accepted for processing.
"400":
description: Message discarded due to its malformed content.
"401":
description: Missing or invalid access token provided.
"404":
description: Message discarded due to invalid channel id.
"415":
description: Message discarded due to invalid or missing content type.
'500':
$ref: "#/components/responses/ServiceError"
/health:
get:
summary: Retrieves service health check info.
tags:
- health
responses:
'200':
$ref: "#/components/responses/HealthRes"
'500':
$ref: "#/components/responses/ServiceError"

232
cmd/ws/main.go Normal file
View File

@ -0,0 +1,232 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"time"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"golang.org/x/sync/errgroup"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
adapter "github.com/mainflux/mainflux/ws"
"github.com/mainflux/mainflux/ws/api"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
const (
stopWaitTime = 5 * time.Second
defPort = "8190"
defBrokerURL = "nats://localhost:4222"
defLogLevel = "error"
defClientTLS = "false"
defCACerts = ""
defJaegerURL = ""
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"
envPort = "MF_WS_ADAPTER_PORT"
envBrokerURL = "MF_BROKER_URL"
envLogLevel = "MF_WS_ADAPTER_LOG_LEVEL"
envClientTLS = "MF_WS_ADAPTER_CLIENT_TLS"
envCACerts = "MF_WS_ADAPTER_CA_CERTS"
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
)
type config struct {
port string
brokerURL string
logLevel string
clientTLS bool
caCerts string
jaegerURL string
thingsAuthURL string
thingsAuthTimeout time.Duration
}
func main() {
cfg := loadConfig()
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
conn := connectToThings(cfg, logger)
defer conn.Close()
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
nps, err := brokers.NewPubSub(cfg.brokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err))
os.Exit(1)
}
defer nps.Close()
svc := newService(tc, nps, logger)
g.Go(func() error {
return startWSServer(ctx, cfg, svc, logger)
})
g.Go(func() error {
if sig := errors.SignalHandler(ctx); sig != nil {
cancel()
logger.Info(fmt.Sprintf("WS adapter service shutdown by signal: %s", sig))
}
return nil
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("WS adapter service terminated: %s", err))
}
}
func loadConfig() config {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}
authTimeout, err := time.ParseDuration(mainflux.Env(envThingsTimeout, defThingsAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsTimeout, err.Error())
}
return config{
brokerURL: mainflux.Env(envBrokerURL, defBrokerURL),
port: mainflux.Env(envPort, defPort),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL),
thingsAuthTimeout: authTimeout,
}
}
func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to load certs: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
logger.Info("gRPC communication is not encrypted")
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.Dial(cfg.thingsAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
return conn
}
func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}
tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}
return tracer, closer
}
func newService(tc mainflux.ThingsServiceClient, nps messaging.PubSub, logger logger.Logger) adapter.Service {
svc := adapter.New(tc, nps)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "ws_adapter",
Subsystem: "api",
Name: "reqeust_count",
Help: "Number of requests received",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "ws_adapter",
Subsystem: "api",
Name: "request_latency_microsecond",
Help: "Total duration of requests in microseconds",
}, []string{"method"}),
)
return svc
}
func startWSServer(ctx context.Context, cfg config, svc adapter.Service, l logger.Logger) error {
p := fmt.Sprintf(":%s", cfg.port)
errCh := make(chan error, 2)
server := &http.Server{Addr: p, Handler: api.MakeHandler(svc, l)}
l.Info(fmt.Sprintf("WS adapter service started, exposed port %s", cfg.port))
go func() {
errCh <- server.ListenAndServe()
}()
select {
case <-ctx.Done():
ctxShutdown, cancelShutdown := context.WithTimeout(context.Background(), stopWaitTime)
defer cancelShutdown()
if err := server.Shutdown(ctxShutdown); err != nil {
l.Error(fmt.Sprintf("WS adapter service error occurred during shutdown at %s: %s", p, err))
return fmt.Errorf("WS adapter service error occurred during shutdown at %s: %w", p, err)
}
l.Info(fmt.Sprintf("WS adapter service shutdown at %s", p))
return nil
case err := <-errCh:
return err
}
}

View File

@ -121,6 +121,10 @@ MF_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL=error
MF_COAP_ADAPTER_LOG_LEVEL=debug
MF_COAP_ADAPTER_PORT=5683
### WS
MF_WS_ADAPTER_LOG_LEVEL=debug
MF_WS_ADAPTER_PORT=8190
## Addons Services
### Bootstrap
MF_BOOTSTRAP_LOG_LEVEL=debug

View File

@ -94,6 +94,7 @@ services:
- users
- mqtt-adapter
- http-adapter
- ws-adapter
broker:
extends:
@ -346,3 +347,22 @@ services:
- ${MF_COAP_ADAPTER_PORT}:${MF_COAP_ADAPTER_PORT}/tcp
networks:
- mainflux-base-net
ws-adapter:
image: mainflux/ws:${MF_RELEASE_TAG}
container_name: mainflux-ws
depends_on:
- things
- broker
restart: on-failure
environment:
MF_WS_ADAPTER_LOG_LEVEL: ${MF_WS_ADAPTER_LOG_LEVEL}
MF_WS_ADAPTER_PORT: ${MF_WS_ADAPTER_PORT}
MF_BROKER_URL: ${MF_NATS_URL}
MF_JAEGER_URL: ${MF_JAEGER_URL}
MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL}
MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT}
ports:
- ${MF_WS_ADAPTER_PORT}:${MF_WS_ADAPTER_PORT}
networks:
- mainflux-base-net

View File

@ -110,6 +110,13 @@ http {
include snippets/ws-upgrade.conf;
proxy_pass http://mqtt_ws_cluster;
}
# Proxy pass to mainflux-ws-adapter
location /ws/ {
include snippets/proxy-headers.conf;
include snippets/ws-upgrade.conf;
proxy_pass http://ws-adapter:${MF_WS_ADAPTER_PORT}/;
}
}
}

View File

@ -121,6 +121,14 @@ http {
include snippets/ws-upgrade.conf;
proxy_pass http://mqtt_ws_cluster;
}
# Proxy pass to mainflux-ws-adapter
location /ws/ {
include snippets/verify-ssl-client.conf;
include snippets/proxy-headers.conf;
include snippets/ws-upgrade.conf;
proxy_pass http://ws-adapter:${MF_WS_ADAPTER_PORT}/;
}
}
}

View File

@ -138,6 +138,18 @@ function setKey(r) {
return '';
}
if (r.uri.startsWith('/ws') && (!auth || !auth.length)) {
var a;
for (a in r.args) {
if (a == 'authorization' && r.args[a] === clientKey) {
return clientKey
}
}
r.error('Authorization param does not match certificate')
return '';
}
return clientKey;
}

58
go.mod
View File

@ -30,10 +30,10 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/ory/dockertest/v3 v3.8.1
github.com/ory/keto/proto/ory/keto/acl/v1alpha1 v0.0.0-20210616104402-80e043246cf9
github.com/rabbitmq/amqp091-go v1.3.0
github.com/pelletier/go-toml v1.9.5
github.com/plgd-dev/go-coap/v2 v2.5.0
github.com/prometheus/client_golang v1.12.1
github.com/rabbitmq/amqp091-go v1.3.0
github.com/rubenv/sql-migrate v1.1.1
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.11.0
@ -55,24 +55,18 @@ require (
github.com/armon/go-metrics v0.3.10 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/continuity v0.2.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.12+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/go-gorp/gorp/v3 v3.0.2 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
@ -80,39 +74,26 @@ require (
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.2 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.2 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.4.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/sdk v0.3.0 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pion/dtls/v2 v2.1.2 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport v0.13.0 // indirect
github.com/pion/udp v0.1.1 // indirect
@ -128,27 +109,50 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.0 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/containerd/continuity v0.2.2 // indirect
github.com/docker/cli v20.10.12+incompatible // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.2 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.2 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-version v1.4.0 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/opencontainers/runc v1.1.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pion/dtls/v2 v2.1.2 // indirect
github.com/xdg-go/scram v1.1.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
)

548
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -50,6 +50,11 @@ MF_THINGS_LOG_LEVEL=info MF_THINGS_HTTP_PORT=8182 MF_THINGS_AUTH_GRPC_PORT=8183
###
MF_HTTP_ADAPTER_LOG_LEVEL=info MF_HTTP_ADAPTER_PORT=8185 MF_THINGS_AUTH_GRPC_URL=localhost:8183 $BUILD_DIR/mainflux-http &
###
# WS
###
MF_WS_ADAPTER_LOG_LEVEL=info MF_WS_ADAPTER_PORT=8190 MF_THINGS_AUTH_GRPC_URL=localhost:8183 $BUILD_DIR/mainflux-ws &
###
# MQTT
###

2
vendor/modules.txt vendored
View File

@ -523,6 +523,8 @@ github.com/spf13/viper/internal/encoding/javaproperties
github.com/spf13/viper/internal/encoding/json
github.com/spf13/viper/internal/encoding/toml
github.com/spf13/viper/internal/encoding/yaml
# github.com/stretchr/objx v0.2.0
## explicit; go 1.12
# github.com/stretchr/testify v1.7.1
## explicit; go 1.13
github.com/stretchr/testify/assert

56
ws/README.md Normal file
View File

@ -0,0 +1,56 @@
# WebSocket adapter
WebSocket adapter provides an [WebSocket](https://en.wikipedia.org/wiki/WebSocket#:~:text=WebSocket%20is%20a%20computer%20communications,protocol%20is%20known%20as%20WebSockets.) API for sending and receiving messages through the platform.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|------------------------------|-----------------------------------------------------|-----------------------|
| MF_WS_ADAPTER_PORT | Service WS port | 8190 |
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
| MF_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter | error |
| MF_WS_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MF_WS_ADAPTER_CA_CERTS | Path to trusted CAs in PEM format | |
| MF_JAEGER_URL | Jaeger server URL | localhost:6831 |
| MF_THINGS_AUTH_GRPC_URL | Things service Auth gRPC URL | localhost:8181 |
| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s |
## Deployment
The service is distributed as Docker container. Check the [`ws-adapter`](https://github.com/mainflux/mainflux/blob/master/docker/docker-compose.yml#L350-L368) service section in docker-compose to see how the service is deployed.
Running this service outside of container requires working instance of the message broker service.
To start the service outside of the container, execute the following shell script:
```bash
# download the latest version of the service
git clone https://github.com/mainflux/mainflux
cd mainflux
# compile the ws
make ws
# copy binary to bin
make install
# set the environment variables and run the service
MF_Broker_URL=[Message broker instance URL] \
MF_WS_ADAPTER_PORT=[Service WS port] \
MF_WS_ADAPTER_LOG_LEVEL=[WS adapter log level] \
MF_WS_ADAPTER_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \
MF_WS_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] \
MF_JAEGER_URL=[Jaeger server URL] \
MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \
$GOBIN/mainflux-ws
```
## Usage
For more information about service capabilities and its usage, please check out
the [WebSocket paragraph](https://mainflux.readthedocs.io/en/latest/messaging/#websocket) in the Getting Started guide.

150
ws/adapter.go Normal file
View File

@ -0,0 +1,150 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package ws contains the domain concept definitions needed to support
// Mainflux ws adapter service functionality
package ws
import (
"context"
"fmt"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
)
const (
chansPrefix = "channels"
)
var (
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel
ErrFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
// ErrInvalidConnection indicates that client couldn't subscribe to message broker
ErrInvalidConnection = errors.New("nats: invalid connection")
// ErrUnauthorizedAccess indicates that client provided missing or invalid credentials
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
// ErrEmptyTopic indicate absence of thingKey in the request
ErrEmptyTopic = errors.New("empty topic")
// ErrEmptyID indicate absence of channelID in the request
ErrEmptyID = errors.New("empty id")
)
// Service specifies web socket service API.
type Service interface {
// Publish Message
Publish(ctx context.Context, thingKey string, msg messaging.Message) error
// Subscribes to a channel with specified id.
Subscribe(ctx context.Context, thingKey, chanID, subtopic string, client *Client) error
// Unsubscribe method is used to stop observing resource.
Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) error
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
auth mainflux.ThingsServiceClient
pubsub messaging.PubSub
}
// New instantiates the WS adapter implementation
func New(auth mainflux.ThingsServiceClient, pubsub messaging.PubSub) Service {
return &adapterService{
auth: auth,
pubsub: pubsub,
}
}
// Publish publishes the message using the broker
func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg messaging.Message) error {
thid, err := svc.authorize(ctx, thingKey, msg.GetChannel())
if err != nil {
return ErrUnauthorizedAccess
}
if len(msg.Payload) == 0 {
return ErrFailedMessagePublish
}
msg.Publisher = thid.GetValue()
if err := svc.pubsub.Publish(msg.GetChannel(), msg); err != nil {
return ErrFailedMessagePublish
}
return nil
}
// Subscribe subscribes the thingKey and channelID to the topic
func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *Client) error {
if chanID == "" || thingKey == "" {
return ErrUnauthorizedAccess
}
thid, err := svc.authorize(ctx, thingKey, chanID)
if err != nil {
return ErrUnauthorizedAccess
}
c.id = thid.GetValue()
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
if err := svc.pubsub.Subscribe(thid.GetValue(), subject, c); err != nil {
return ErrFailedSubscription
}
return nil
}
// Subscribe subscribes the thingKey and channelID to the topic
func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) error {
if chanID == "" || thingKey == "" {
return ErrUnauthorizedAccess
}
thid, err := svc.authorize(ctx, thingKey, chanID)
if err != nil {
return ErrUnauthorizedAccess
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return svc.pubsub.Unsubscribe(thid.GetValue(), subject)
}
func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID string) (*mainflux.ThingID, error) {
ar := &mainflux.AccessByKeyReq{
Token: thingKey,
ChanID: chanID,
}
thid, err := svc.auth.CanAccessByKey(ctx, ar)
if err != nil {
return nil, errors.Wrap(errors.ErrAuthorization, err)
}
return thid, nil
}

240
ws/adapter_test.go Normal file
View File

@ -0,0 +1,240 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package ws_test
import (
"context"
"fmt"
"testing"
"github.com/mainflux/mainflux"
httpmock "github.com/mainflux/mainflux/http/mocks"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/ws"
"github.com/mainflux/mainflux/ws/mocks"
"github.com/stretchr/testify/assert"
)
const (
chanID = "1"
id = "1"
thingKey = "thing_key"
subTopic = "subtopic"
protocol = "ws"
)
var msg = messaging.Message{
Channel: chanID,
Publisher: id,
Subtopic: "",
Protocol: protocol,
Payload: []byte(`[{"n":"current","t":-5,"v":1.2}]`),
}
func newService(cc mainflux.ThingsServiceClient) (ws.Service, mocks.MockPubSub) {
pubsub := mocks.NewPubSub()
return ws.New(cc, pubsub), pubsub
}
func TestPublish(t *testing.T) {
thingsClient := httpmock.NewThingsClient(map[string]string{thingKey: chanID})
svc, _ := newService(thingsClient)
cases := []struct {
desc string
thingKey string
msg messaging.Message
err error
}{
{
desc: "publish a valid message with valid thingKey",
thingKey: thingKey,
msg: msg,
err: nil,
},
{
desc: "publish a valid message with empty thingKey",
thingKey: "",
msg: msg,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "publish a valid message with invalid thingKey",
thingKey: "invalid",
msg: msg,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "publish an empty message with valid thingKey",
thingKey: thingKey,
msg: messaging.Message{},
err: ws.ErrFailedMessagePublish,
},
{
desc: "publish an empty message with empty thingKey",
thingKey: "",
msg: messaging.Message{},
err: ws.ErrUnauthorizedAccess,
},
{
desc: "publish an empty message with invalid thingKey",
thingKey: "invalid",
msg: messaging.Message{},
err: ws.ErrUnauthorizedAccess,
},
}
for _, tc := range cases {
err := svc.Publish(context.Background(), tc.thingKey, tc.msg)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestSubscribe(t *testing.T) {
thingsClient := httpmock.NewThingsClient(map[string]string{thingKey: chanID})
svc, pubsub := newService(thingsClient)
c := ws.NewClient(nil)
cases := []struct {
desc string
thingKey string
chanID string
subtopic string
fail bool
err error
}{
{
desc: "subscribe to channel with valid thingKey, chanID, subtopic",
thingKey: thingKey,
chanID: chanID,
subtopic: subTopic,
fail: false,
err: nil,
},
{
desc: "subscribe again to channel with valid thingKey, chanID, subtopic",
thingKey: thingKey,
chanID: chanID,
subtopic: subTopic,
fail: false,
err: nil,
},
{
desc: "subscribe to channel with subscribe set to fail",
thingKey: thingKey,
chanID: chanID,
subtopic: subTopic,
fail: true,
err: ws.ErrFailedSubscription,
},
{
desc: "subscribe to channel with invalid chanID and invalid thingKey",
thingKey: "invalid",
chanID: "0",
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "subscribe to channel with empty channel",
thingKey: thingKey,
chanID: "",
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "subscribe to channel with empty thingKey",
thingKey: "",
chanID: chanID,
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "subscribe to channel with empty thingKey and empty channel",
thingKey: "",
chanID: "",
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
}
for _, tc := range cases {
pubsub.SetFail(tc.fail)
err := svc.Subscribe(context.Background(), tc.thingKey, tc.chanID, tc.subtopic, c)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestUnsubscribe(t *testing.T) {
thingsClient := httpmock.NewThingsClient(map[string]string{thingKey: chanID})
svc, pubsub := newService(thingsClient)
cases := []struct {
desc string
thingKey string
chanID string
subtopic string
fail bool
err error
}{
{
desc: "unsubscribe from channel with valid thingKey, chanID, subtopic",
thingKey: thingKey,
chanID: chanID,
subtopic: subTopic,
fail: false,
err: nil,
},
{
desc: "unsubscribe from channel with valid thingKey, chanID, and empty subtopic",
thingKey: thingKey,
chanID: chanID,
subtopic: "",
fail: false,
err: nil,
},
{
desc: "unsubscribe from channel with unsubscribe set to fail",
thingKey: thingKey,
chanID: chanID,
subtopic: subTopic,
fail: true,
err: ws.ErrFailedUnsubscribe,
},
{
desc: "unsubscribe from channel with empty channel",
thingKey: thingKey,
chanID: "",
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "unsubscribe from channel with empty thingKey",
thingKey: "",
chanID: chanID,
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
{
desc: "unsubscribe from channel with empty thingKey and empty channel",
thingKey: "",
chanID: "",
subtopic: subTopic,
fail: false,
err: ws.ErrUnauthorizedAccess,
},
}
for _, tc := range cases {
pubsub.SetFail(tc.fail)
err := svc.Unsubscribe(context.Background(), tc.thingKey, tc.chanID, tc.subtopic)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}

6
ws/api/doc.go Normal file
View File

@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package api contains API-related concerns: endpoint definitions, middlewares
// and all resource representations.
package api

189
ws/api/endpoint_test.go Normal file
View File

@ -0,0 +1,189 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package api_test
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/ws"
httpmock "github.com/mainflux/mainflux/http/mocks"
"github.com/mainflux/mainflux/ws/api"
"github.com/mainflux/mainflux/ws/mocks"
"github.com/stretchr/testify/assert"
)
const (
chanID = "30315311-56ba-484d-b500-c1e08305511f"
id = "1"
thingKey = "c02ff576-ccd5-40f6-ba5f-c85377aad529"
protocol = "ws"
)
var msg = []byte(`[{"n":"current","t":-1,"v":1.6}]`)
func newService(cc mainflux.ThingsServiceClient) (ws.Service, mocks.MockPubSub) {
pubsub := mocks.NewPubSub()
return ws.New(cc, pubsub), pubsub
}
func newHTTPServer(svc ws.Service) *httptest.Server {
logger := log.NewMock()
mux := api.MakeHandler(svc, logger)
return httptest.NewServer(mux)
}
func makeURL(tsURL, chanID, subtopic, thingKey string, header bool) (string, error) {
u, _ := url.Parse(tsURL)
u.Scheme = protocol
if chanID == "0" || chanID == "" {
if header {
return fmt.Sprintf("%s/channels/%s/messages", u, chanID), fmt.Errorf("invalid channel id")
}
return fmt.Sprintf("%s/channels/%s/messages?authorization=%s", u, chanID, thingKey), fmt.Errorf("invalid channel id")
}
subtopicPart := ""
if subtopic != "" {
subtopicPart = fmt.Sprintf("/%s", subtopic)
}
if header {
return fmt.Sprintf("%s/channels/%s/messages%s", u, chanID, subtopicPart), nil
}
return fmt.Sprintf("%s/channels/%s/messages%s?authorization=%s", u, chanID, subtopicPart, thingKey), nil
}
func handshake(tsURL, chanID, subtopic, thingKey string, addHeader bool) (*websocket.Conn, *http.Response, error) {
header := http.Header{}
if addHeader {
header.Add("Authorization", thingKey)
}
url, _ := makeURL(tsURL, chanID, subtopic, thingKey, addHeader)
conn, res, errRet := websocket.DefaultDialer.Dial(url, header)
return conn, res, errRet
}
func TestHandshake(t *testing.T) {
thingsClient := httpmock.NewThingsClient(map[string]string{thingKey: chanID})
svc, _ := newService(thingsClient)
ts := newHTTPServer(svc)
defer ts.Close()
cases := []struct {
desc string
chanID string
subtopic string
header bool
thingKey string
status int
err error
msg []byte
}{
{
desc: "connect and send message",
chanID: id,
subtopic: "",
header: true,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: msg,
},
{
desc: "connect and send message with thingKey as query parameter",
chanID: id,
subtopic: "",
header: false,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: msg,
},
{
desc: "connect and send message that cannot be published",
chanID: id,
subtopic: "",
header: true,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: []byte{},
},
{
desc: "connect and send message to subtopic",
chanID: id,
subtopic: "subtopic",
header: true,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: msg,
},
{
desc: "connect and send message to nested subtopic",
chanID: id,
subtopic: "subtopic/nested",
header: true,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: msg,
},
{
desc: "connect and send message to all subtopics",
chanID: id,
subtopic: ">",
header: true,
thingKey: thingKey,
status: http.StatusSwitchingProtocols,
msg: msg,
},
{
desc: "connect to empty channel",
chanID: "",
subtopic: "",
header: true,
thingKey: thingKey,
status: http.StatusBadRequest,
msg: []byte{},
},
{
desc: "connect with empty thingKey",
chanID: id,
subtopic: "",
header: true,
thingKey: "",
status: http.StatusForbidden,
msg: []byte{},
},
{
desc: "connect and send message to subtopic with invalid name",
chanID: id,
subtopic: "sub/a*b/topic",
header: true,
thingKey: thingKey,
status: http.StatusBadRequest,
msg: msg,
},
}
for _, tc := range cases {
conn, res, err := handshake(ts.URL, tc.chanID, tc.subtopic, tc.thingKey, tc.header)
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code '%d' got '%d'\n", tc.desc, tc.status, res.StatusCode))
if tc.status == http.StatusSwitchingProtocols {
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error %s\n", tc.desc, err))
err = conn.WriteMessage(websocket.TextMessage, tc.msg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error %s\n", tc.desc, err))
}
}
}

170
ws/api/endpoints.go Normal file
View File

@ -0,0 +1,170 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package api
import (
"context"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"time"
"github.com/go-zoo/bone"
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/ws"
)
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
func handshake(svc ws.Service) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req, err := decodeRequest(r)
if err != nil {
encodeError(w, err)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to upgrade connection to websocket: %s", err.Error()))
return
}
req.conn = conn
client := ws.NewClient(conn)
if err := svc.Subscribe(context.Background(), req.thingKey, req.chanID, req.subtopic, client); err != nil {
req.conn.Close()
return
}
logger.Debug(fmt.Sprintf("Successfully upgraded communication to WS on channel %s", req.chanID))
msgs := make(chan []byte)
// Listen for messages received from the chan messages, and publish them to broker
go process(svc, req, msgs)
go listen(conn, msgs)
}
}
func decodeRequest(r *http.Request) (connReq, error) {
authKey := r.Header.Get("Authorization")
if authKey == "" {
authKeys := bone.GetQuery(r, "authorization")
if len(authKeys) == 0 {
logger.Debug("Missing authorization key.")
return connReq{}, errUnauthorizedAccess
}
authKey = authKeys[0]
}
chanID := bone.GetValue(r, "id")
req := connReq{
thingKey: authKey,
chanID: chanID,
}
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
logger.Warn("Empty channel id or malformed url")
return connReq{}, errors.ErrMalformedEntity
}
subtopic, err := parseSubTopic(channelParts[2])
if err != nil {
return connReq{}, err
}
req.subtopic = subtopic
return req, nil
}
func parseSubTopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
func listen(conn *websocket.Conn, msgs chan<- []byte) {
for {
// Listen for message from the client, and push them to the msgs channel
_, payload, err := conn.ReadMessage()
if websocket.IsUnexpectedCloseError(err) {
logger.Debug(fmt.Sprintf("Closing WS connection: %s", err.Error()))
close(msgs)
return
}
if err != nil {
logger.Warn(fmt.Sprintf("Failed to read message: %s", err.Error()))
close(msgs)
return
}
msgs <- payload
}
}
func process(svc ws.Service, req connReq, msgs <-chan []byte) {
for msg := range msgs {
m := messaging.Message{
Channel: req.chanID,
Subtopic: req.subtopic,
Protocol: "websocket",
Payload: msg,
Created: time.Now().UnixNano(),
}
svc.Publish(context.Background(), req.thingKey, m)
}
if err := svc.Unsubscribe(context.Background(), req.thingKey, req.chanID, req.subtopic); err != nil {
req.conn.Close()
}
}
func encodeError(w http.ResponseWriter, err error) {
statusCode := http.StatusUnauthorized
switch err {
case ws.ErrEmptyID, ws.ErrEmptyTopic:
statusCode = http.StatusBadRequest
case errUnauthorizedAccess:
statusCode = http.StatusForbidden
case errMalformedSubtopic, errors.ErrMalformedEntity:
statusCode = http.StatusBadRequest
default:
statusCode = http.StatusNotFound
}
logger.Warn(fmt.Sprintf("Failed to authorize: %s", err.Error()))
w.WriteHeader(statusCode)
}

79
ws/api/logging.go Normal file
View File

@ -0,0 +1,79 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// go:build !test
package api
import (
"context"
"fmt"
"time"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/ws"
)
var _ ws.Service = (*loggingMiddleware)(nil)
type loggingMiddleware struct {
logger log.Logger
svc ws.Service
}
// LoggingMiddleware adds logging facilities to the adapter
func LoggingMiddleware(svc ws.Service, logger log.Logger) ws.Service {
return &loggingMiddleware{logger, svc}
}
func (lm *loggingMiddleware) Publish(ctx context.Context, thingKey string, msg messaging.Message) (err error) {
defer func(begin time.Time) {
destChannel := msg.GetChannel()
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Publish(ctx, thingKey, msg)
}
func (lm *loggingMiddleware) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *ws.Client) (err error) {
defer func(begin time.Time) {
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method subscribe to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Subscribe(ctx, thingKey, chanID, subtopic, c)
}
func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) (err error) {
defer func(begin time.Time) {
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method unsubscribe from channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Unsubscribe(ctx, thingKey, chanID, subtopic)
}

59
ws/api/metrics.go Normal file
View File

@ -0,0 +1,59 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
//go:build !test
package api
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/ws"
)
var _ ws.Service = (*metricsMiddleware)(nil)
type metricsMiddleware struct {
counter metrics.Counter
latency metrics.Histogram
svc ws.Service
}
// MetricsMiddleware instruments adapter by tracking request count and latency
func MetricsMiddleware(svc ws.Service, counter metrics.Counter, latency metrics.Histogram) ws.Service {
return &metricsMiddleware{
counter: counter,
latency: latency,
svc: svc,
}
}
func (mm *metricsMiddleware) Publish(ctx context.Context, thingKey string, msg messaging.Message) error {
defer func(begin time.Time) {
mm.counter.With("method", "publish").Add(1)
mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Publish(ctx, thingKey, msg)
}
func (mm *metricsMiddleware) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *ws.Client) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Subscribe(ctx, thingKey, chanID, subtopic, c)
}
func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) error {
defer func(begin time.Time) {
mm.counter.With("method", "unsubscribe").Add(1)
mm.latency.With("method", "unsubscribe").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Unsubscribe(ctx, thingKey, chanID, subtopic)
}

13
ws/api/requests.go Normal file
View File

@ -0,0 +1,13 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package api
import "github.com/gorilla/websocket"
type connReq struct {
thingKey string
chanID string
subtopic string
conn *websocket.Conn
}

48
ws/api/transport.go Normal file
View File

@ -0,0 +1,48 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package api
import (
"errors"
"net/http"
"github.com/go-zoo/bone"
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/ws"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
protocol = "ws"
readwriteBufferSize = 1024
)
var (
errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
errMalformedSubtopic = errors.New("malformed subtopic")
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: readwriteBufferSize,
WriteBufferSize: readwriteBufferSize,
CheckOrigin: func(r *http.Request) bool { return true },
}
logger log.Logger
)
// MakeHandler returns http handler with handshake endpoint.
func MakeHandler(svc ws.Service, l log.Logger) http.Handler {
logger = l
mux := bone.New()
mux.GetFunc("/channels/:id/messages", handshake(svc))
mux.GetFunc("/channels/:id/messages/*", handshake(svc))
mux.GetFunc("/version", mainflux.Health(protocol))
mux.Handle("/metrics", promhttp.Handler())
return mux
}

40
ws/client.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package ws
import (
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux/pkg/messaging"
)
// Client handles messaging and websocket connection
type Client struct {
conn *websocket.Conn
id string
}
// NewClient returns a new Client object
func NewClient(c *websocket.Conn) *Client {
return &Client{
conn: c,
id: "",
}
}
// Cancel handles the websocket connection after unsubscribing
func (c *Client) Cancel() error {
if c.conn == nil {
return nil
}
return c.conn.Close()
}
// Handle handles the sending and receiving of messages via the broker
func (c *Client) Handle(msg messaging.Message) error {
// To prevent publisher from receiving its own published message
if msg.GetPublisher() == c.id {
return nil
}
return c.conn.WriteMessage(websocket.TextMessage, msg.Payload)
}

101
ws/client_test.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package ws_test
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux/ws"
"github.com/stretchr/testify/assert"
)
const expectedCount = uint64(1)
var (
msgChan = make(chan []byte)
c *ws.Client
count uint64
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
)
func handler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
for {
_, message, err := conn.ReadMessage()
if err != nil {
break
}
atomic.AddUint64(&count, 1)
msgChan <- message
}
}
func TestHandle(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(handler))
defer s.Close()
// Convert http://127.0.0.1 to ws://127.0.0.1
u := strings.Replace(s.URL, "http", "ws", 1)
// Connect to the server
wsConn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("%v", err)
}
defer wsConn.Close()
c = ws.NewClient(wsConn)
cases := []struct {
desc string
publisher string
expectedPayload []byte
expectMsg bool
}{
{
desc: "handling with different id from ws.Client",
publisher: msg.Publisher,
expectedPayload: msg.Payload,
expectMsg: true,
},
{
desc: "handling with same id as ws.Client (empty by default) drops message",
publisher: "",
expectedPayload: []byte{},
expectMsg: false,
},
}
for _, tc := range cases {
msg.Publisher = tc.publisher
err = c.Handle(msg)
assert.Nil(t, err, fmt.Sprintf("expected nil error from handle, got: %s", err))
receivedMsg := []byte{}
switch tc.expectMsg {
case true:
rec := <-msgChan // Wait for the message to be received.
receivedMsg = rec
case false:
time.Sleep(100 * time.Millisecond) // Give time to server to process c.Handle call.
}
assert.Equal(t, tc.expectedPayload, receivedMsg, fmt.Sprintf("%s: expected %+v, got %+v", tc.desc, msg, receivedMsg))
}
c := atomic.LoadUint64(&count)
assert.Equal(t, expectedCount, c, fmt.Sprintf("expected message count %d, got %d", expectedCount, c))
}

74
ws/mocks/pubsub.go Normal file
View File

@ -0,0 +1,74 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/ws"
)
var _ messaging.PubSub = (*mockPubSub)(nil)
type MockPubSub interface {
Publish(string, messaging.Message) error
Subscribe(string, string, messaging.MessageHandler) error
Unsubscribe(string, string) error
SetFail(bool)
SetConn(*websocket.Conn)
Close() error
}
type mockPubSub struct {
fail bool
conn *websocket.Conn
}
// NewPubSub returns mock message publisher-subscriber
func NewPubSub() MockPubSub {
return &mockPubSub{false, nil}
}
func (pubsub *mockPubSub) Publish(s string, msg messaging.Message) error {
if pubsub.conn != nil {
data, err := json.Marshal(msg)
if err != nil {
fmt.Println("can't marshall")
return ws.ErrFailedMessagePublish
}
return pubsub.conn.WriteMessage(websocket.BinaryMessage, data)
}
if pubsub.fail {
return ws.ErrFailedMessagePublish
}
return nil
}
func (pubsub *mockPubSub) Subscribe(string, string, messaging.MessageHandler) error {
if pubsub.fail {
return ws.ErrFailedSubscription
}
return nil
}
func (pubsub *mockPubSub) Unsubscribe(string, string) error {
if pubsub.fail {
return ws.ErrFailedUnsubscribe
}
return nil
}
func (pubsub *mockPubSub) SetFail(fail bool) {
pubsub.fail = fail
}
func (pubsub *mockPubSub) SetConn(c *websocket.Conn) {
pubsub.conn = c
}
func (pubsub *mockPubSub) Close() error {
return nil
}