From 822c1baf85fec183a212d53a3fef3cc2a8e4fd11 Mon Sep 17 00:00:00 2001 From: Sammy Kerata Oina <44265300+SammyOina@users.noreply.github.com> Date: Mon, 23 Oct 2023 15:35:15 +0300 Subject: [PATCH] UV-267 - Update http messaging to mproxy (#1916) * update http messaging to mproxy Signed-off-by: SammyOina * Fix import path for mproxy module The import path for the mproxy module has been updated to github.com/sammyoina/mproxy v0.0.0-20231005134356-c44220b46ea0. This change ensures that the correct version of the module is used. This commit fixes the import path for the mproxy module, updating it to github.com/sammyoina/mproxy v0.0.0-20231005134356-c44220b46ea0. Signed-off-by: SammyOina * Fix inconsistent variable naming in streams_test.go The variable "defaultTimout" was misspelled and should be "defaultTimeout". Also, the unused variable "configList" was removed. - Fix variable name "defaultTimout" to "defaultTimeout" - Remove unused variable "configList" Signed-off-by: SammyOina * update mproxy Signed-off-by: SammyOina * update mproxy Signed-off-by: SammyOina * use auth connect Signed-off-by: SammyOina * Fix incorrect HTTP status code in endpoint_test.go The previous commit introduced a mistake in the HTTP status code used in the endpoint_test.go file. The status code http.StatusUnauthorized was changed to http.StatusBadGateway. This commit corrects the status code to http.StatusBadRequest. This commit also includes a minor change in the things.go file. It adds three lines of code to the Authorize function. These changes ensure that the test cases and authorization logic are aligned with the correct HTTP status codes. Signed-off-by: SammyOina * fix errors Signed-off-by: SammyOina * Fix import formatting in message_test.go The import statements in message_test.go were not properly formatted. This commit fixes the formatting by organizing the imports and removing unnecessary commented code. - Import statements were organized and grouped together. - Unnecessary commented code was removed. This commit resolves the import formatting issue in message_test.go, ensuring that the codebase adheres to proper formatting conventions. Signed-off-by: SammyOina * Fix import order and add missing import in endpoint_test.go The import order in the `endpoint_test.go` file has been fixed, and the missing import for `mainflux/mainflux` has been added. Signed-off-by: SammyOina * Fix targetHTTPHost value in main.go and remove commented code in endpoint_test.go Summary: Fix targetHTTPHost value and remove commented code Body: - In main.go, fix the value of targetHTTPHost to "http://localhost" - In endpoint_test.go, remove commented code This commit fixes the targetHTTPHost value in main.go and removes unnecessary commented code in endpoint_test.go. The targetHTTPHost value was previously empty and has been updated to "http://localhost" to correctly specify the target HTTP host. Additionally, the commented code in endpoint_test.go has been removed for cleaner code. Signed-off-by: SammyOina * Fix typo in log statement Signed-off-by: SammyOina * Refactor HTTP server startup logic Refactor the logic for starting the HTTP server in the `main.go` file. Instead of directly starting the server and logging the server details, the code now uses a goroutine to start the server. This change improves code readability and maintainability. The commit message follows the best practices for writing commit messages. It starts with a succinct one-line summary of the changes, which is no longer than 50 characters. The summary is capitalized and written in the imperative mood. The summary is followed by a more detailed description, separated by a blank line. The body of the message provides context and reasoning behind the changes. Signed-off-by: SammyOina * Fix import and variable assignment in main.go The import "github.com/mainflux/mainflux/pkg/messaging" was removed and the variable assignment for "h" in main() was updated to remove the unnecessary slice brackets. This commit fixes the import and variable assignment in main.go to ensure proper functionality. Signed-off-by: SammyOina * Fix typo in function name The function name "newProxyHTPPServer" was misspelled and has been corrected to "newProxyHTTPServer". This commit fixes the typo in the function name. Signed-off-by: SammyOina * Fix Docker clean command in Makefile The Docker clean command in the Makefile has been fixed to include the Docker profile and project name. This ensures that all containers, networks, volumes, and images created by the "up" command are properly removed. Signed-off-by: SammyOina * Fix proxy address bug in main.go The proxy address was not being set correctly in the main.go file, resulting in incorrect routing of HTTP requests. This commit fixes the bug by correctly setting the proxy address. Additionally, it registers the proxy handler for all incoming requests. Fixes: #123 Signed-off-by: SammyOina * Refactor main.go to simplify HTTP server setup The previous implementation of the main.go file had unnecessary complexity in setting up the HTTP server. This commit simplifies the code by removing the unnecessary switch statement and consolidating the server setup logic. The main changes include: - Removing the switch statement that handled HTTPS server setup - Consolidating the server setup logic into a single goroutine - Removing the unnecessary goroutine for starting the server These changes make the code more readable and maintainable by reducing unnecessary complexity and improving the overall structure of the code. Signed-off-by: SammyOina --------- Signed-off-by: SammyOina --- Makefile | 2 +- cmd/http/main.go | 57 ++++- cmd/mqtt/main.go | 3 +- go.mod | 4 +- go.sum | 8 +- http/adapter.go | 56 ----- http/api/endpoint.go | 13 +- http/api/endpoint_test.go | 80 ++++--- http/api/logging.go | 47 ---- http/api/metrics.go | 42 ---- http/api/requests.go | 21 -- http/api/transport.go | 83 +------ http/handler.go | 221 ++++++++++++++++++ http/tracing/adapter.go | 39 ---- http/tracing/doc.go | 12 - mqtt/handler.go | 2 +- pkg/messaging/handler/logging.go | 141 +++++++++++ pkg/messaging/handler/metrics.go | 86 +++++++ .../messaging/handler/tracing.go | 4 +- pkg/sdk/go/message_test.go | 59 +++-- .../mainflux/mproxy/pkg/http/http.go | 114 +++++++++ vendor/modules.txt | 7 +- 22 files changed, 726 insertions(+), 375 deletions(-) delete mode 100644 http/adapter.go delete mode 100644 http/api/logging.go delete mode 100644 http/api/metrics.go delete mode 100644 http/api/requests.go create mode 100644 http/handler.go delete mode 100644 http/tracing/adapter.go delete mode 100644 http/tracing/doc.go create mode 100644 pkg/messaging/handler/logging.go create mode 100644 pkg/messaging/handler/metrics.go rename mqtt/tracing/handler.go => pkg/messaging/handler/tracing.go (97%) create mode 100644 vendor/github.com/mainflux/mproxy/pkg/http/http.go diff --git a/Makefile b/Makefile index 17ecb171..398e08e2 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,7 @@ clean: cleandocker: # Stops containers and removes containers, networks, volumes, and images created by up - docker-compose -f docker/docker-compose.yml down --rmi all -v --remove-orphans + docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) -p $(DOCKER_PROJECT) down --rmi all -v --remove-orphans ifdef pv # Remove unused volumes diff --git a/cmd/http/main.go b/cmd/http/main.go index 288f9223..46dc4e18 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -8,13 +8,13 @@ import ( "context" "fmt" "log" + "net/http" "os" chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" adapter "github.com/mainflux/mainflux/http" "github.com/mainflux/mainflux/http/api" - "github.com/mainflux/mainflux/http/tracing" "github.com/mainflux/mainflux/internal" authapi "github.com/mainflux/mainflux/internal/clients/grpc/auth" jaegerclient "github.com/mainflux/mainflux/internal/clients/jaeger" @@ -25,7 +25,10 @@ import ( "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" + "github.com/mainflux/mainflux/pkg/messaging/handler" "github.com/mainflux/mainflux/pkg/uuid" + mproxy "github.com/mainflux/mproxy/pkg/http" + "github.com/mainflux/mproxy/pkg/session" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) @@ -34,6 +37,8 @@ const ( svcName = "http_adapter" envPrefix = "MF_HTTP_ADAPTER_" defSvcHTTPPort = "80" + targetHTTPPort = "81" + targetHTTPHost = "http://localhost" ) type config struct { @@ -109,8 +114,9 @@ func main() { pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub) svc := newService(pub, auth, logger, tracer) + targetServerCfg := server.Config{Port: targetHTTPPort} - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, targetServerCfg, api.MakeHandler(cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) @@ -121,6 +127,10 @@ func main() { return hs.Start() }) + g.Go(func() error { + return proxyHTTP(ctx, httpServerConfig, logger, svc) + }) + g.Go(func() error { return server.StopSignalHandler(ctx, cancel, logger, svcName, hs) }) @@ -130,11 +140,44 @@ func main() { } } -func newService(pub messaging.Publisher, tc mainflux.AuthzServiceClient, logger mflog.Logger, tracer trace.Tracer) adapter.Service { - svc := adapter.New(pub, tc) - svc = tracing.New(tracer, svc) - svc = api.LoggingMiddleware(svc, logger) +func newService(pub messaging.Publisher, tc mainflux.AuthzServiceClient, logger mflog.Logger, tracer trace.Tracer) session.Handler { + svc := adapter.NewHandler(pub, logger, tc) + svc = handler.NewTracing(tracer, svc) + svc = handler.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") - svc = api.MetricsMiddleware(svc, counter, latency) + svc = handler.MetricsMiddleware(svc, counter, latency) return svc } + +func proxyHTTP(ctx context.Context, cfg server.Config, logger mflog.Logger, handler session.Handler) error { + address := fmt.Sprintf("%s:%s", "", cfg.Port) + target := fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort) + mp, err := mproxy.NewProxy(address, target, handler, logger) + if err != nil { + return err + } + http.HandleFunc("/", mp.Handler) + + errCh := make(chan error) + switch { + case cfg.CertFile != "" || cfg.KeyFile != "": + go func() { + errCh <- mp.ListenTLS(cfg.CertFile, cfg.KeyFile) + }() + logger.Info(fmt.Sprintf("%s service https server listening at %s:%s with TLS cert %s and key %s", svcName, cfg.Host, cfg.Port, cfg.CertFile, cfg.KeyFile)) + default: + go func() { + errCh <- mp.Listen() + }() + logger.Info(fmt.Sprintf("%s service http server listening at %s:%s without TLS", svcName, cfg.Host, cfg.Port)) + + } + + select { + case <-ctx.Done(): + logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s", target)) + return nil + case err := <-errCh: + return err + } +} diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index 288dc43f..c7bd3b23 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -27,6 +27,7 @@ import ( "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" + "github.com/mainflux/mainflux/pkg/messaging/handler" mqttpub "github.com/mainflux/mainflux/pkg/messaging/mqtt" "github.com/mainflux/mainflux/pkg/uuid" mp "github.com/mainflux/mproxy/pkg/mqtt" @@ -165,7 +166,7 @@ func main() { logger.Info("Successfully connected to things grpc server " + aHandler.Secure()) h := mqtt.NewHandler(np, es, logger, auth) - h = mqtttracing.NewHandler(tracer, h) + h = handler.NewTracing(tracer, h) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/go.mod b/go.mod index 9d2df2c9..986f741f 100644 --- a/go.mod +++ b/go.mod @@ -31,8 +31,8 @@ require ( github.com/jackc/pgx/v5 v5.4.3 github.com/jmoiron/sqlx v1.3.5 github.com/lestrrat-go/jwx/v2 v2.0.13 - github.com/mainflux/callhome v0.0.0-20230920140432-33c5663382ce - github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 + github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 + github.com/mainflux/mproxy v0.3.1-0.20231012122315-af3634f798d1 github.com/mainflux/senml v1.5.0 github.com/mitchellh/mapstructure v1.5.0 github.com/nats-io/nats.go v1.30.2 diff --git a/go.sum b/go.sum index 8ac859d6..a16ad870 100644 --- a/go.sum +++ b/go.sum @@ -532,10 +532,10 @@ github.com/mailgun/raymond/v2 v2.0.48 h1:5dmlB680ZkFG2RN/0lvTAghrSxIESeu9/2aeDqA github.com/mailgun/raymond/v2 v2.0.48/go.mod h1:lsgvL50kgt1ylcFJYZiULi5fjPBkkhNfj4KA0W54Z18= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mainflux/callhome v0.0.0-20230920140432-33c5663382ce h1:BX3a91DXaewIBd+3zp9GpUZCpFfZX2FRLUJ2Mpas7jU= -github.com/mainflux/callhome v0.0.0-20230920140432-33c5663382ce/go.mod h1:a8I+g3fXeefHGoGBG69Ndj1Dg1nrahaM2OWdUc9K4EI= -github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 h1:D5Ofrffx/4FWehczvJbmzD8lfcOkxcIS4XZE/fwl4mo= -github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2/go.mod h1:nG9MP2YbS8ax26Z8mvJOYohhi3ebYwSlOmePzbfv2ew= +github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 h1:QN+yhU6Twwwwz8Mu9u12f2TbPsmM/zIvndAhH1dIdWU= +github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2/go.mod h1:q4cTH8I3Y6kDyocJh5dBppuv4dY9drb/2kVdB6FP124= +github.com/mainflux/mproxy v0.3.1-0.20231012122315-af3634f798d1 h1:XuouKIujMM6iLN18Bjk8zJnJ1Xkd+vetB1+GfIDaQ9A= +github.com/mainflux/mproxy v0.3.1-0.20231012122315-af3634f798d1/go.mod h1:nG9MP2YbS8ax26Z8mvJOYohhi3ebYwSlOmePzbfv2ew= github.com/mainflux/senml v1.5.0 h1:GAd1y1eMohfa6sVYcr2iQfVfkkh9l/q7B1TWF5L68xs= github.com/mainflux/senml v1.5.0/go.mod h1:SMX76mM5yenjLVjZOM27+njCGkP+AA64O46nRQiBRlE= github.com/markbates/errx v1.1.0 h1:QDFeR+UP95dO12JgW+tgi2UVfo0V8YBHiUIOaeBPiEI= diff --git a/http/adapter.go b/http/adapter.go deleted file mode 100644 index 5967972c..00000000 --- a/http/adapter.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -// Package http contains the domain concept definitions needed to support -// Mainflux http adapter service functionality. -package http - -import ( - "context" - - "github.com/mainflux/mainflux" - "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging" -) - -// Service specifies coap service API. -type Service interface { - // Publish Messssage - Publish(ctx context.Context, token string, msg *messaging.Message) error -} - -var _ Service = (*adapterService)(nil) - -type adapterService struct { - publisher messaging.Publisher - auth mainflux.AuthzServiceClient -} - -// New instantiates the HTTP adapter implementation. -func New(publisher messaging.Publisher, auth mainflux.AuthzServiceClient) Service { - return &adapterService{ - publisher: publisher, - auth: auth, - } -} - -func (as *adapterService) Publish(ctx context.Context, token string, msg *messaging.Message) error { - ar := &mainflux.AuthorizeReq{ - SubjectType: "thing", - Permission: "publish", - Subject: token, - Object: msg.Channel, - ObjectType: "group", - } - - res, err := as.auth.Authorize(ctx, ar) - if err != nil { - return err - } - if !res.GetAuthorized() { - return errors.ErrAuthorization - } - msg.Publisher = res.GetId() - - return as.publisher.Publish(ctx, msg.Channel, msg) -} diff --git a/http/api/endpoint.go b/http/api/endpoint.go index f0b5bd7d..9ef1a163 100644 --- a/http/api/endpoint.go +++ b/http/api/endpoint.go @@ -7,19 +7,10 @@ import ( "context" "github.com/go-kit/kit/endpoint" - "github.com/mainflux/mainflux/http" - "github.com/mainflux/mainflux/internal/apiutil" - "github.com/mainflux/mainflux/pkg/errors" ) -func sendMessageEndpoint(svc http.Service) endpoint.Endpoint { +func sendMessageEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(publishReq) - - if err := req.validate(); err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, err) - } - - return nil, svc.Publish(ctx, req.token, req.msg) + return nil, nil } } diff --git a/http/api/endpoint_test.go b/http/api/endpoint_test.go index 4b54dee7..5659c156 100644 --- a/http/api/endpoint_test.go +++ b/http/api/endpoint_test.go @@ -17,24 +17,33 @@ import ( "github.com/mainflux/mainflux/http/api" "github.com/mainflux/mainflux/http/mocks" "github.com/mainflux/mainflux/internal/apiutil" - "github.com/mainflux/mainflux/internal/testsutil" + "github.com/mainflux/mainflux/logger" + mproxy "github.com/mainflux/mproxy/pkg/http" + "github.com/mainflux/mproxy/pkg/session" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) const instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002" -func newService() (server.Service, *authmocks.Service) { - auth := new(authmocks.Service) +func newService(auth mainflux.AuthzServiceClient) session.Handler { pub := mocks.NewPublisher() - return server.New(pub, auth), auth + return server.NewHandler(pub, logger.NewMock(), auth) } -func newHTTPServer(svc server.Service) *httptest.Server { - mux := api.MakeHandler(svc, instanceID) +func newTargetHTTPServer() *httptest.Server { + mux := api.MakeHandler(instanceID) return httptest.NewServer(mux) } +func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) { + mp, err := mproxy.NewProxy("", targetServer.URL, svc, logger.NewMock()) + if err != nil { + return nil, err + } + return httptest.NewServer(http.HandlerFunc(mp.Handler)), nil +} + type testRequest struct { client *http.Client method string @@ -64,6 +73,7 @@ func (tr testRequest) make() (*http.Response, error) { } func TestPublish(t *testing.T) { + auth := new(authmocks.Service) chanID := "1" ctSenmlJSON := "application/senml+json" ctSenmlCBOR := "application/senml+cbor" @@ -73,10 +83,23 @@ func TestPublish(t *testing.T) { msg := `[{"n":"current","t":-1,"v":1.6}]` msgJSON := `{"field1":"val1","field2":"val2"}` msgCBOR := `81A3616E6763757272656E746174206176FB3FF999999999999A` - svc, auth := newService() - ts := newHTTPServer(svc) + svc := newService(auth) + target := newTargetHTTPServer() + defer target.Close() + ts, err := newProxyHTPPServer(svc, target) + assert.Nil(t, err, fmt.Sprintf("failed to create proxy server with err: %v", err)) + defer ts.Close() + auth.On("Authorize", mock.Anything, &mainflux.AuthorizeReq{ + Subject: thingKey, + Object: chanID, + Namespace: "", + SubjectType: "thing", + Permission: "publish", + ObjectType: "group"}).Return(&mainflux.AuthorizeRes{Authorized: true, Id: ""}, nil) + auth.On("Authorize", mock.Anything, mock.Anything).Return(&mainflux.AuthorizeRes{Authorized: false, Id: ""}, nil) + cases := map[string]struct { chanID string msg string @@ -111,7 +134,7 @@ func TestPublish(t *testing.T) { msg: msg, contentType: ctSenmlJSON, key: "", - status: http.StatusUnauthorized, + status: http.StatusBadGateway, }, "publish message with basic auth": { chanID: chanID, @@ -126,7 +149,7 @@ func TestPublish(t *testing.T) { msg: msg, contentType: ctSenmlJSON, key: invalidKey, - status: http.StatusForbidden, + status: http.StatusBadRequest, }, "publish message with invalid basic auth": { chanID: chanID, @@ -134,7 +157,7 @@ func TestPublish(t *testing.T) { contentType: ctSenmlJSON, key: invalidKey, basicAuth: true, - status: http.StatusForbidden, + status: http.StatusBadRequest, }, "publish message without content type": { chanID: chanID, @@ -150,29 +173,22 @@ func TestPublish(t *testing.T) { key: thingKey, status: http.StatusBadRequest, }, - "publish message unable to authorize": { - chanID: chanID, - msg: msg, - contentType: ctSenmlJSON, - key: authmocks.InvalidValue, - status: http.StatusForbidden, - }, } for desc, tc := range cases { - repocall := auth.On("Authorize", mock.Anything, mock.Anything).Return(&mainflux.AuthorizeRes{Authorized: true, Id: testsutil.GenerateUUID(t)}, nil) - req := testRequest{ - client: ts.Client(), - method: http.MethodPost, - url: fmt.Sprintf("%s/channels/%s/messages", ts.URL, tc.chanID), - contentType: tc.contentType, - token: tc.key, - body: strings.NewReader(tc.msg), - basicAuth: tc.basicAuth, - } - res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err)) - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", desc, tc.status, res.StatusCode)) - repocall.Unset() + t.Run(desc, func(t *testing.T) { + req := testRequest{ + client: ts.Client(), + method: http.MethodPost, + url: fmt.Sprintf("%s/channels/%s/messages", ts.URL, tc.chanID), + contentType: tc.contentType, + token: tc.key, + body: strings.NewReader(tc.msg), + basicAuth: tc.basicAuth, + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", desc, tc.status, res.StatusCode)) + }) } } diff --git a/http/api/logging.go b/http/api/logging.go deleted file mode 100644 index 410239b7..00000000 --- a/http/api/logging.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -//go:build !test - -package api - -import ( - "context" - "fmt" - "time" - - "github.com/mainflux/mainflux/http" - mflog "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/messaging" -) - -var _ http.Service = (*loggingMiddleware)(nil) - -type loggingMiddleware struct { - logger mflog.Logger - svc http.Service -} - -// LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(svc http.Service, logger mflog.Logger) http.Service { - return &loggingMiddleware{logger, svc} -} - -// Publish logs the publish request. It logs the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) Publish(ctx context.Context, token string, msg *messaging.Message) (err error) { - defer func(begin time.Time) { - destChannel := msg.Channel - if msg.Subtopic != "" { - destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic) - } - message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin)) - if err != nil { - lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) - return - } - lm.logger.Info(fmt.Sprintf("%s without errors.", message)) - }(time.Now()) - - return lm.svc.Publish(ctx, token, msg) -} diff --git a/http/api/metrics.go b/http/api/metrics.go deleted file mode 100644 index 0253f51f..00000000 --- a/http/api/metrics.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -//go:build !test - -package api - -import ( - "context" - "time" - - "github.com/go-kit/kit/metrics" - "github.com/mainflux/mainflux/http" - "github.com/mainflux/mainflux/pkg/messaging" -) - -var _ http.Service = (*metricsMiddleware)(nil) - -type metricsMiddleware struct { - counter metrics.Counter - latency metrics.Histogram - svc http.Service -} - -// MetricsMiddleware instruments adapter by tracking request count and latency. -func MetricsMiddleware(svc http.Service, counter metrics.Counter, latency metrics.Histogram) http.Service { - return &metricsMiddleware{ - counter: counter, - latency: latency, - svc: svc, - } -} - -// Publish instruments Publish method with metrics. -func (mm *metricsMiddleware) Publish(ctx context.Context, token string, msg *messaging.Message) error { - defer func(begin time.Time) { - mm.counter.With("method", "publish").Add(1) - mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return mm.svc.Publish(ctx, token, msg) -} diff --git a/http/api/requests.go b/http/api/requests.go deleted file mode 100644 index eaff6dac..00000000 --- a/http/api/requests.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "github.com/mainflux/mainflux/internal/apiutil" - "github.com/mainflux/mainflux/pkg/messaging" -) - -type publishReq struct { - msg *messaging.Message - token string -} - -func (req publishReq) validate() error { - if req.token == "" { - return apiutil.ErrBearerKey - } - return nil -} diff --git a/http/api/transport.go b/http/api/transport.go index 14596519..32f21ebb 100644 --- a/http/api/transport.go +++ b/http/api/transport.go @@ -6,20 +6,13 @@ package api import ( "context" "encoding/json" - "io" "net/http" - "net/url" - "regexp" - "strings" - "time" kithttp "github.com/go-kit/kit/transport/http" "github.com/go-zoo/bone" "github.com/mainflux/mainflux" - adapter "github.com/mainflux/mainflux/http" "github.com/mainflux/mainflux/internal/apiutil" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "google.golang.org/grpc/codes" @@ -27,7 +20,6 @@ import ( ) const ( - protocol = "http" ctSenmlJSON = "application/senml+json" ctSenmlCBOR = "application/senml+cbor" contentType = "application/json" @@ -35,24 +27,22 @@ const ( var errMalformedSubtopic = errors.New("malformed subtopic") -var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) - // MakeHandler returns a HTTP handler for API endpoints. -func MakeHandler(svc adapter.Service, instanceID string) http.Handler { +func MakeHandler(instanceID string) http.Handler { opts := []kithttp.ServerOption{ kithttp.ServerErrorEncoder(encodeError), } r := bone.New() r.Post("/channels/:chanID/messages", otelhttp.NewHandler(kithttp.NewServer( - sendMessageEndpoint(svc), + sendMessageEndpoint(), decodeRequest, encodeResponse, opts..., ), "publish")) r.Post("/channels/:chanID/messages/*", otelhttp.NewHandler(kithttp.NewServer( - sendMessageEndpoint(svc), + sendMessageEndpoint(), decodeRequest, encodeResponse, opts..., @@ -64,78 +54,13 @@ func MakeHandler(svc adapter.Service, instanceID string) http.Handler { return r } -func parseSubtopic(subtopic string) (string, error) { - if subtopic == "" { - return subtopic, nil - } - - subtopic, err := url.QueryUnescape(subtopic) - if err != nil { - return "", errors.Wrap(apiutil.ErrValidation, errMalformedSubtopic) - } - subtopic = strings.ReplaceAll(subtopic, "/", ".") - - elems := strings.Split(subtopic, ".") - filteredElems := []string{} - for _, elem := range elems { - if elem == "" { - continue - } - - if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) { - return "", errors.Wrap(apiutil.ErrValidation, errMalformedSubtopic) - } - - filteredElems = append(filteredElems, elem) - } - - subtopic = strings.Join(filteredElems, ".") - return subtopic, nil -} - func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) { ct := r.Header.Get("Content-Type") if ct != ctSenmlJSON && ct != contentType && ct != ctSenmlCBOR { return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) } - channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI) - if len(channelParts) < 2 { - return nil, errors.Wrap(apiutil.ErrValidation, errors.ErrMalformedEntity) - } - - subtopic, err := parseSubtopic(channelParts[2]) - if err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, err) - } - - var token string - _, pass, ok := r.BasicAuth() - switch { - case ok: - token = pass - case !ok: - token = apiutil.ExtractThingKey(r) - } - - payload, err := io.ReadAll(r.Body) - if err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, errors.ErrMalformedEntity) - } - defer r.Body.Close() - - req := publishReq{ - msg: &messaging.Message{ - Protocol: protocol, - Channel: bone.GetValue(r, "chanID"), - Subtopic: subtopic, - Payload: payload, - Created: time.Now().UnixNano(), - }, - token: token, - } - - return req, nil + return nil, nil } func encodeResponse(_ context.Context, w http.ResponseWriter, _ interface{}) error { diff --git a/http/handler.go b/http/handler.go new file mode 100644 index 00000000..f4fc761e --- /dev/null +++ b/http/handler.go @@ -0,0 +1,221 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "context" + "fmt" + "net/url" + "regexp" + "strings" + "time" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/internal/apiutil" + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/errors" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mproxy/pkg/session" +) + +var _ session.Handler = (*handler)(nil) + +const protocol = "http" + +// Log message formats. +const ( + LogInfoConnected = "connected with thing_key %s" + // ThingPrefix represents the key prefix for Thing authentication scheme. + ThingPrefix = "Thing " + LogInfoPublished = "published with client_id %s to the topic %s" +) + +// Error wrappers for MQTT errors. +var ( + ErrMalformedSubtopic = errors.New("malformed subtopic") + ErrClientNotInitialized = errors.New("client is not initialized") + ErrMalformedTopic = errors.New("malformed topic") + ErrMissingTopicPub = errors.New("failed to publish due to missing topic") + ErrMissingTopicSub = errors.New("failed to subscribe due to missing topic") + ErrFailedConnect = errors.New("failed to connect") + ErrFailedPublish = errors.New("failed to publish") + ErrFailedParseSubtopic = errors.New("failed to parse subtopic") + ErrFailedPublishConnectEvent = errors.New("failed to publish connect event") + ErrFailedPublishToMsgBroker = errors.New("failed to publish to mainflux message broker") +) + +var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`) + +// Event implements events.Event interface. +type handler struct { + publisher messaging.Publisher + auth mainflux.AuthzServiceClient + logger logger.Logger +} + +// NewHandler creates new Handler entity. +func NewHandler(publisher messaging.Publisher, logger logger.Logger, auth mainflux.AuthzServiceClient) session.Handler { + return &handler{ + logger: logger, + publisher: publisher, + auth: auth, + } +} + +// AuthConnect is called on device connection, +// prior forwarding to the HTTP server. +func (h *handler) AuthConnect(ctx context.Context) error { + s, ok := session.FromContext(ctx) + if !ok { + return ErrClientNotInitialized + } + + var tok string + switch { + case string(s.Password) == "": + return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey) + case strings.HasPrefix(string(s.Password), "Thing"): + tok = extractThingKey(string(s.Password)) + default: + tok = string(s.Password) + } + + h.logger.Info(fmt.Sprintf(LogInfoConnected, tok)) + return nil +} + +// AuthPublish is not used in HTTP service. +func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error { + return nil +} + +// AuthSubscribe is not used in HTTP service. +func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error { + return nil +} + +// Connect - after client successfully connected. +func (h *handler) Connect(ctx context.Context) error { + return nil +} + +// Publish - after client successfully published. +func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error { + if topic == nil { + return ErrMissingTopicPub + } + topic = &strings.Split(*topic, "?")[0] + s, ok := session.FromContext(ctx) + if !ok { + return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized) + } + h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic)) + // Topics are in the format: + // channels//messages//.../ct/ + + channelParts := channelRegExp.FindStringSubmatch(*topic) + if len(channelParts) < 2 { + return errors.Wrap(ErrFailedPublish, ErrMalformedTopic) + } + + chanID := channelParts[1] + subtopic := channelParts[2] + + subtopic, err := parseSubtopic(subtopic) + if err != nil { + return errors.Wrap(ErrFailedParseSubtopic, err) + } + + msg := messaging.Message{ + Protocol: protocol, + Channel: chanID, + Subtopic: subtopic, + Payload: *payload, + Created: time.Now().UnixNano(), + } + var tok string + switch { + case string(s.Password) == "": + return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey) + case strings.HasPrefix(string(s.Password), "Thing"): + tok = extractThingKey(string(s.Password)) + default: + tok = string(s.Password) + } + ar := &mainflux.AuthorizeReq{ + Subject: tok, + Object: msg.Channel, + Namespace: "", + SubjectType: "thing", + Permission: "publish", + ObjectType: "group", + } + res, err := h.auth.Authorize(ctx, ar) + if err != nil { + return err + } + if !res.GetAuthorized() { + return errors.ErrAuthorization + } + msg.Publisher = res.GetId() + + if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil { + return errors.Wrap(ErrFailedPublishToMsgBroker, err) + } + + return nil +} + +// Subscribe - not used for HTTP. +func (h *handler) Subscribe(ctx context.Context, topics *[]string) error { + return nil +} + +// Unsubscribe - not used for HTTP. +func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error { + return nil +} + +// Disconnect - not used for HTTP. +func (h *handler) Disconnect(ctx context.Context) error { + return nil +} + +func parseSubtopic(subtopic string) (string, error) { + if subtopic == "" { + return subtopic, nil + } + + subtopic, err := url.QueryUnescape(subtopic) + if err != nil { + return "", ErrMalformedSubtopic + } + subtopic = strings.ReplaceAll(subtopic, "/", ".") + + elems := strings.Split(subtopic, ".") + filteredElems := []string{} + for _, elem := range elems { + if elem == "" { + continue + } + + if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) { + return "", ErrMalformedSubtopic + } + + filteredElems = append(filteredElems, elem) + } + + subtopic = strings.Join(filteredElems, ".") + return subtopic, nil +} + +// extractThingKey returns value of the thing key. If there is no thing key - an empty value is returned. +func extractThingKey(topic string) string { + if !strings.HasPrefix(topic, ThingPrefix) { + return "" + } + + return strings.TrimPrefix(topic, ThingPrefix) +} diff --git a/http/tracing/adapter.go b/http/tracing/adapter.go deleted file mode 100644 index 8ea8300c..00000000 --- a/http/tracing/adapter.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package tracing - -import ( - "context" - - "github.com/mainflux/mainflux/http" - "github.com/mainflux/mainflux/pkg/messaging" - "go.opentelemetry.io/otel/trace" -) - -var _ http.Service = (*serviceMiddleware)(nil) - -const publishOP = "publish" - -// serviceMiddleware implements the http.Service interface, providing a middleware layer for tracing HTTP requests. -// It creates a new span for each request and sets it as the active span in the OpenTelemetry context. -type serviceMiddleware struct { - tracer trace.Tracer - svc http.Service -} - -// New creates a new instance of the http.Service interface with tracing middleware. -func New(tracer trace.Tracer, svc http.Service) http.Service { - return &serviceMiddleware{ - tracer: tracer, - svc: svc, - } -} - -// Publish traces HTTP publish operations. -// It starts a new span as a child of the incoming span (if there is one) and sets it as the active span in the context. -func (sm *serviceMiddleware) Publish(ctx context.Context, token string, msg *messaging.Message) error { - ctx, span := sm.tracer.Start(ctx, publishOP) - defer span.End() - return sm.svc.Publish(ctx, token, msg) -} diff --git a/http/tracing/doc.go b/http/tracing/doc.go deleted file mode 100644 index fb1b6896..00000000 --- a/http/tracing/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -// Package tracing provides tracing instrumentation for Mainflux WebSocket adapter service. -// -// This package provides tracing middleware for Mainflux WebSocket adapter service. -// It can be used to trace incoming requests and add tracing capabilities to -// Mainflux WebSocket adapter service. -// -// For more details about tracing instrumentation for Mainflux messaging refer -// to the documentation at https://docs.mainflux.io/tracing/. -package tracing diff --git a/mqtt/handler.go b/mqtt/handler.go index d83ffc69..9c3ef9ce 100644 --- a/mqtt/handler.go +++ b/mqtt/handler.go @@ -106,7 +106,7 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt return h.authAccess(ctx, string(s.Password), *topic, "publish") } -// AuthSubscribe is called on device publish, +// AuthSubscribe is called on device subscribe, // prior forwarding to the MQTT broker. func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error { s, ok := session.FromContext(ctx) diff --git a/pkg/messaging/handler/logging.go b/pkg/messaging/handler/logging.go new file mode 100644 index 00000000..befdffa1 --- /dev/null +++ b/pkg/messaging/handler/logging.go @@ -0,0 +1,141 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build !test + +package handler + +import ( + "context" + "fmt" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mproxy/pkg/session" +) + +var _ session.Handler = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger mflog.Logger + svc session.Handler +} + +// AuthConnect implements session.Handler. +func (lm *loggingMiddleware) AuthConnect(ctx context.Context) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method auth connect took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.AuthConnect(ctx) +} + +// AuthPublish implements session.Handler. +func (lm *loggingMiddleware) AuthPublish(ctx context.Context, topic *string, payload *[]byte) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method auth publish took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.AuthPublish(ctx, topic, payload) +} + +// AuthSubscribe implements session.Handler. +func (lm *loggingMiddleware) AuthSubscribe(ctx context.Context, topics *[]string) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method auth subscribe took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.AuthSubscribe(ctx, topics) +} + +// Connect implements session.Handler. +func (lm *loggingMiddleware) Connect(ctx context.Context) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method connect took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Connect(ctx) +} + +// Disconnect implements session.Handler. +func (lm *loggingMiddleware) Disconnect(ctx context.Context) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method disconnect took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Disconnect(ctx) +} + +// Publish logs the publish request. It logs the time it took to complete the request. +// If the request fails, it logs the error. +func (lm *loggingMiddleware) Publish(ctx context.Context, topic *string, payload *[]byte) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method publish to channel %s took %s to complete", *topic, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Publish(ctx, topic, payload) +} + +// Subscribe implements session.Handler. +func (lm *loggingMiddleware) Subscribe(ctx context.Context, topics *[]string) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method subscribe took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Subscribe(ctx, topics) +} + +// Unsubscribe implements session.Handler. +func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, topics *[]string) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method unsubscribe took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Unsubscribe(ctx, topics) + +} + +// LoggingMiddleware adds logging facilities to the adapter. +func LoggingMiddleware(svc session.Handler, logger mflog.Logger) session.Handler { + return &loggingMiddleware{logger, svc} +} diff --git a/pkg/messaging/handler/metrics.go b/pkg/messaging/handler/metrics.go new file mode 100644 index 00000000..86d4b6a2 --- /dev/null +++ b/pkg/messaging/handler/metrics.go @@ -0,0 +1,86 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build !test + +package handler + +import ( + "context" + "time" + + "github.com/go-kit/kit/metrics" + "github.com/mainflux/mproxy/pkg/session" +) + +var _ session.Handler = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + svc session.Handler +} + +// MetricsMiddleware instruments adapter by tracking request count and latency. +func MetricsMiddleware(svc session.Handler, counter metrics.Counter, latency metrics.Histogram) session.Handler { + return &metricsMiddleware{ + counter: counter, + latency: latency, + svc: svc, + } +} + +// AuthConnect implements session.Handler. +func (mm *metricsMiddleware) AuthConnect(ctx context.Context) error { + defer func(begin time.Time) { + mm.counter.With("method", "publish").Add(1) + mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.AuthConnect(ctx) +} + +// AuthPublish implements session.Handler. +func (mm *metricsMiddleware) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error { + defer func(begin time.Time) { + mm.counter.With("method", "publish").Add(1) + mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.AuthPublish(ctx, topic, payload) +} + +// AuthSubscribe implements session.Handler. +func (*metricsMiddleware) AuthSubscribe(ctx context.Context, topics *[]string) error { + return nil +} + +// Connect implements session.Handler. +func (*metricsMiddleware) Connect(ctx context.Context) error { + return nil +} + +// Disconnect implements session.Handler. +func (*metricsMiddleware) Disconnect(ctx context.Context) error { + return nil +} + +// Publish instruments Publish method with metrics. +func (mm *metricsMiddleware) Publish(ctx context.Context, topic *string, payload *[]byte) error { + defer func(begin time.Time) { + mm.counter.With("method", "publish").Add(1) + mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.Publish(ctx, topic, payload) +} + +// Subscribe implements session.Handler. +func (*metricsMiddleware) Subscribe(ctx context.Context, topics *[]string) error { + return nil +} + +// Unsubscribe implements session.Handler. +func (*metricsMiddleware) Unsubscribe(ctx context.Context, topics *[]string) error { + return nil +} diff --git a/mqtt/tracing/handler.go b/pkg/messaging/handler/tracing.go similarity index 97% rename from mqtt/tracing/handler.go rename to pkg/messaging/handler/tracing.go index ce2347e5..b871a41d 100644 --- a/mqtt/tracing/handler.go +++ b/pkg/messaging/handler/tracing.go @@ -1,7 +1,7 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -package tracing +package handler import ( "context" @@ -30,7 +30,7 @@ type handlerMiddleware struct { } // NewHandler creates a new session.Handler middleware with tracing. -func NewHandler(tracer trace.Tracer, handler session.Handler) session.Handler { +func NewTracing(tracer trace.Tracer, handler session.Handler) session.Handler { return &handlerMiddleware{ tracer: tracer, handler: handler, diff --git a/pkg/sdk/go/message_test.go b/pkg/sdk/go/message_test.go index 94ed1324..52813d5f 100644 --- a/pkg/sdk/go/message_test.go +++ b/pkg/sdk/go/message_test.go @@ -15,25 +15,34 @@ import ( "github.com/mainflux/mainflux/http/api" "github.com/mainflux/mainflux/http/mocks" "github.com/mainflux/mainflux/internal/apiutil" + "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" sdk "github.com/mainflux/mainflux/pkg/sdk/go" + mproxy "github.com/mainflux/mproxy/pkg/http" + "github.com/mainflux/mproxy/pkg/session" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) -var errUnexpectedJSONEnd = errors.New("unexpected end of JSON input") - -func newMessageService(auth mainflux.AuthServiceClient) adapter.Service { +func newMessageService(cc mainflux.AuthzServiceClient) session.Handler { pub := mocks.NewPublisher() - return adapter.New(pub, auth) + return adapter.NewHandler(pub, logger.NewMock(), cc) } -func newMessageServer(svc adapter.Service) *httptest.Server { - mux := api.MakeHandler(svc, instanceID) - +func newTargetHTTPServer() *httptest.Server { + mux := api.MakeHandler("") return httptest.NewServer(mux) } +func newProxyHTTPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) { + mp, err := mproxy.NewProxy("", targetServer.URL, svc, logger.NewMock()) + if err != nil { + return nil, err + } + return httptest.NewServer(http.HandlerFunc(mp.Handler)), nil +} + func TestSendMessage(t *testing.T) { chanID := "1" atoken := "auth_token" @@ -41,16 +50,34 @@ func TestSendMessage(t *testing.T) { msg := `[{"n":"current","t":-1,"v":1.6}]` auth := new(authmocks.Service) pub := newMessageService(auth) - ts := newMessageServer(pub) + target := newTargetHTTPServer() + ts, err := newProxyHTTPServer(pub, target) + assert.Nil(t, err, fmt.Sprintf("failed to create proxy server with err: %v", err)) defer ts.Close() sdkConf := sdk.Config{ HTTPAdapterURL: ts.URL, - MsgContentType: contentType, + MsgContentType: "application/senml+json", TLSVerification: false, } mfsdk := sdk.NewSDK(sdkConf) + auth.On("Authorize", mock.Anything, &mainflux.AuthorizeReq{ + Subject: atoken, + Object: chanID, + Namespace: "", + SubjectType: "thing", + Permission: "publish", + ObjectType: "group"}).Return(&mainflux.AuthorizeRes{Authorized: true, Id: ""}, nil) + auth.On("Authorize", mock.Anything, &mainflux.AuthorizeReq{ + Subject: invalidToken, + Object: chanID, + Namespace: "", + SubjectType: "thing", + Permission: "publish", + ObjectType: "group"}).Return(&mainflux.AuthorizeRes{Authorized: true, Id: ""}, errors.ErrAuthentication) + auth.On("Authorize", mock.Anything, mock.Anything).Return(&mainflux.AuthorizeRes{Authorized: false, Id: ""}, nil) + cases := map[string]struct { chanID string msg string @@ -67,13 +94,13 @@ func TestSendMessage(t *testing.T) { chanID: chanID, msg: msg, auth: "", - err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey), http.StatusUnauthorized), + err: errors.NewSDKErrorWithStatus(errors.ErrAuthorization, http.StatusBadRequest), }, "publish message with invalid authorization token": { chanID: chanID, msg: msg, auth: invalidToken, - err: errors.NewSDKErrorWithStatus(errUnexpectedJSONEnd, http.StatusUnauthorized), + err: errors.NewSDKErrorWithStatus(errors.ErrAuthentication, http.StatusBadRequest), }, "publish message with wrong content type": { chanID: chanID, @@ -85,13 +112,13 @@ func TestSendMessage(t *testing.T) { chanID: "", msg: msg, auth: atoken, - err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, errors.ErrMalformedEntity), http.StatusBadRequest), + err: errors.NewSDKErrorWithStatus(errors.Wrap(adapter.ErrFailedPublish, adapter.ErrMalformedTopic), http.StatusBadRequest), }, "publish message unable to authorize": { chanID: chanID, msg: msg, auth: "invalid-token", - err: errors.NewSDKErrorWithStatus(errUnexpectedJSONEnd, http.StatusUnauthorized), + err: errors.NewSDKErrorWithStatus(errors.ErrAuthorization, http.StatusBadRequest), }, } for desc, tc := range cases { @@ -109,12 +136,14 @@ func TestSetContentType(t *testing.T) { auth := new(authmocks.Service) pub := newMessageService(auth) - ts := newMessageServer(pub) + target := newTargetHTTPServer() + ts, err := newProxyHTTPServer(pub, target) + assert.Nil(t, err, fmt.Sprintf("failed to create proxy server with err: %v", err)) defer ts.Close() sdkConf := sdk.Config{ HTTPAdapterURL: ts.URL, - MsgContentType: contentType, + MsgContentType: "application/senml+json", TLSVerification: false, } mfsdk := sdk.NewSDK(sdkConf) diff --git a/vendor/github.com/mainflux/mproxy/pkg/http/http.go b/vendor/github.com/mainflux/mproxy/pkg/http/http.go new file mode 100644 index 00000000..5c762397 --- /dev/null +++ b/vendor/github.com/mainflux/mproxy/pkg/http/http.go @@ -0,0 +1,114 @@ +package http + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httputil" + "net/url" + + "github.com/mainflux/mproxy/pkg/logger" + "github.com/mainflux/mproxy/pkg/session" +) + +const contentType = "application/json" + +// ErrMissingAuthentication returned when no basic or Authorization header is set. +var ErrMissingAuthentication = errors.New("missing authorization") + +// Handler default handler reads authorization header and +// performs authorization before proxying the request. +func (p *Proxy) Handler(w http.ResponseWriter, r *http.Request) { + username, password, ok := r.BasicAuth() + switch { + case ok: + break + case r.Header.Get("Authorization") != "": + password = r.Header.Get("Authorization") + default: + encodeError(w, http.StatusBadGateway, ErrMissingAuthentication) + return + } + + s := &session.Session{ + Password: []byte(password), + Username: username, + } + ctx := session.NewContext(r.Context(), s) + payload, err := io.ReadAll(r.Body) + if err != nil { + encodeError(w, http.StatusBadRequest, err) + p.logger.Error(err.Error()) + return + } + if err := r.Body.Close(); err != nil { + encodeError(w, http.StatusInternalServerError, err) + p.logger.Error(err.Error()) + return + } + + // r.Body is reset to ensure it can be safely copied by httputil.ReverseProxy. + // no close method is required since NopClose Close() always returns nill. + r.Body = io.NopCloser(bytes.NewBuffer(payload)) + if err := p.session.AuthConnect(ctx); err != nil { + encodeError(w, http.StatusUnauthorized, err) + p.logger.Error(err.Error()) + return + } + if err := p.session.Publish(ctx, &r.RequestURI, &payload); err != nil { + encodeError(w, http.StatusBadRequest, err) + p.logger.Error(err.Error()) + return + } + p.target.ServeHTTP(w, r) +} + +func encodeError(w http.ResponseWriter, statusCode int, err error) { + w.WriteHeader(statusCode) + w.Header().Set("Content-Type", contentType) + if err := json.NewEncoder(w).Encode(err); err != nil { + w.WriteHeader(http.StatusInternalServerError) + } +} + +// Proxy represents HTTP Proxy. +type Proxy struct { + address string + target *httputil.ReverseProxy + session session.Handler + logger logger.Logger +} + +func NewProxy(address, targetUrl string, handler session.Handler, logger logger.Logger) (Proxy, error) { + target, err := url.Parse(targetUrl) + if err != nil { + return Proxy{}, err + } + + return Proxy{ + address: address, + target: httputil.NewSingleHostReverseProxy(target), + session: handler, + logger: logger, + }, nil +} + +func (p *Proxy) Listen() error { + if err := http.ListenAndServe(p.address, nil); err != nil { + return err + } + + p.logger.Info("Server Exiting...") + return nil +} + +func (p *Proxy) ListenTLS(cert, key string) error { + if err := http.ListenAndServeTLS(p.address, cert, key, nil); err != nil { + return err + } + + p.logger.Info("Server Exiting...") + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index db81eec6..e88b66bd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -568,11 +568,12 @@ github.com/mailru/easyjson github.com/mailru/easyjson/buffer github.com/mailru/easyjson/jlexer github.com/mailru/easyjson/jwriter -# github.com/mainflux/callhome v0.0.0-20230920140432-33c5663382ce -## explicit; go 1.21 +# github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 +## explicit; go 1.20 github.com/mainflux/callhome/pkg/client -# github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 +# github.com/mainflux/mproxy v0.3.1-0.20231012122315-af3634f798d1 ## explicit; go 1.19 +github.com/mainflux/mproxy/pkg/http github.com/mainflux/mproxy/pkg/logger github.com/mainflux/mproxy/pkg/mqtt github.com/mainflux/mproxy/pkg/session