From 0525a9ab75952eca3de9eade842086a040d6e8b0 Mon Sep 17 00:00:00 2001 From: Sammy Kerata Oina <44265300+SammyOina@users.noreply.github.com> Date: Tue, 22 Aug 2023 21:01:47 +0300 Subject: [PATCH] NOISSUE - Update mProxy handlers for testability (#1889) * update mproxy handler for testability Signed-off-by: SammyOina * remove import replacement Signed-off-by: SammyOina --------- Signed-off-by: SammyOina --- go.mod | 2 +- go.sum | 4 +- mqtt/handler.go | 40 +++++++++---------- mqtt/handler_test.go | 40 ++++++++++++------- mqtt/tracing/doc.go | 6 +-- mqtt/tracing/handler.go | 20 +++++----- .../mainflux/mproxy/pkg/session/handler.go | 10 ++--- .../mainflux/mproxy/pkg/session/stream.go | 16 ++++---- vendor/modules.txt | 2 +- 9 files changed, 76 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index f316c94a..77faf7e9 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/jmoiron/sqlx v1.3.5 github.com/lestrrat-go/jwx/v2 v2.0.11 github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 - github.com/mainflux/mproxy v0.3.0 + github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 github.com/mainflux/senml v1.5.0 github.com/mitchellh/mapstructure v1.5.0 github.com/nats-io/nats.go v1.27.1 diff --git a/go.sum b/go.sum index 6b9e668a..51e89901 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 h1:QN+yhU6Twwwwz8Mu9u12f2TbPsmM/zIvndAhH1dIdWU= github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2/go.mod h1:q4cTH8I3Y6kDyocJh5dBppuv4dY9drb/2kVdB6FP124= -github.com/mainflux/mproxy v0.3.0 h1:tU60nu/Bd5lWb7NlJJvW6ulKxeBGJFnp5uOS9j6JXBQ= -github.com/mainflux/mproxy v0.3.0/go.mod h1:nG9MP2YbS8ax26Z8mvJOYohhi3ebYwSlOmePzbfv2ew= +github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 h1:D5Ofrffx/4FWehczvJbmzD8lfcOkxcIS4XZE/fwl4mo= +github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2/go.mod h1:nG9MP2YbS8ax26Z8mvJOYohhi3ebYwSlOmePzbfv2ew= github.com/mainflux/senml v1.5.0 h1:GAd1y1eMohfa6sVYcr2iQfVfkkh9l/q7B1TWF5L68xs= github.com/mainflux/senml v1.5.0/go.mod h1:SMX76mM5yenjLVjZOM27+njCGkP+AA64O46nRQiBRlE= github.com/markbates/errx v1.1.0 h1:QDFeR+UP95dO12JgW+tgi2UVfo0V8YBHiUIOaeBPiEI= diff --git a/mqtt/handler.go b/mqtt/handler.go index d6fc024b..fac01858 100644 --- a/mqtt/handler.go +++ b/mqtt/handler.go @@ -139,21 +139,20 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error { } // Connect - after client successfully connected. -func (h *handler) Connect(ctx context.Context) { +func (h *handler) Connect(ctx context.Context) error { s, ok := session.FromContext(ctx) if !ok { - h.logger.Error(errors.Wrap(ErrFailedConnect, ErrClientNotInitialized).Error()) - return + return errors.Wrap(ErrFailedConnect, ErrClientNotInitialized) } h.logger.Info(fmt.Sprintf(LogInfoConnected, s.ID)) + return nil } // Publish - after client successfully published. -func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) { +func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error { s, ok := session.FromContext(ctx) if !ok { - h.logger.Error(errors.Wrap(ErrFailedPublish, ErrClientNotInitialized).Error()) - return + return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized) } h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic)) // Topics are in the format: @@ -161,8 +160,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) { channelParts := channelRegExp.FindStringSubmatch(*topic) if len(channelParts) < 2 { - h.logger.Error(errors.Wrap(ErrFailedPublish, ErrMalformedTopic).Error()) - return + return errors.Wrap(ErrFailedPublish, ErrMalformedTopic) } chanID := channelParts[1] @@ -170,8 +168,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) { subtopic, err := parseSubtopic(subtopic) if err != nil { - h.logger.Error(errors.Wrap(ErrFailedParseSubtopic, err).Error()) - return + return errors.Wrap(ErrFailedParseSubtopic, err) } msg := messaging.Message{ @@ -185,42 +182,43 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) { for _, pub := range h.publishers { if err := pub.Publish(ctx, msg.Channel, &msg); err != nil { - h.logger.Error(errors.Wrap(ErrFailedPublishToMsgBroker, err).Error()) + return errors.Wrap(ErrFailedPublishToMsgBroker, err) } } + return nil } // Subscribe - after client successfully subscribed. -func (h *handler) Subscribe(ctx context.Context, topics *[]string) { +func (h *handler) Subscribe(ctx context.Context, topics *[]string) error { s, ok := session.FromContext(ctx) if !ok { - h.logger.Error(errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized).Error()) - return + return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized) } h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ","))) + return nil } // Unsubscribe - after client unsubscribed. -func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) { +func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error { s, ok := session.FromContext(ctx) if !ok { - h.logger.Error(errors.Wrap(ErrFailedUnsubscribe, ErrClientNotInitialized).Error()) - return + return errors.Wrap(ErrFailedUnsubscribe, ErrClientNotInitialized) } h.logger.Info(fmt.Sprintf(LogInfoUnsubscribed, s.ID, strings.Join(*topics, ","))) + return nil } // Disconnect - connection with broker or client lost. -func (h *handler) Disconnect(ctx context.Context) { +func (h *handler) Disconnect(ctx context.Context) error { s, ok := session.FromContext(ctx) if !ok { - h.logger.Error(errors.Wrap(ErrFailedDisconnect, ErrClientNotInitialized).Error()) - return + return errors.Wrap(ErrFailedDisconnect, ErrClientNotInitialized) } h.logger.Error(fmt.Sprintf(LogInfoDisconnected, s.ID, s.Password)) if err := h.es.Disconnect(ctx, string(s.Password)); err != nil { - h.logger.Error(errors.Wrap(ErrFailedPublishDisconnectEvent, err).Error()) + return errors.Wrap(ErrFailedPublishDisconnectEvent, err) } + return nil } func (h *handler) authAccess(ctx context.Context, password, topic, action string) error { diff --git a/mqtt/handler_test.go b/mqtt/handler_test.go index b71fd030..993d459a 100644 --- a/mqtt/handler_test.go +++ b/mqtt/handler_test.go @@ -234,17 +234,19 @@ func TestConnect(t *testing.T) { cases := []struct { desc string session *session.Session + err error logMsg string }{ { desc: "connect without active session", session: nil, - logMsg: errors.Wrap(mqtt.ErrFailedConnect, mqtt.ErrClientNotInitialized).Error(), + err: errors.Wrap(mqtt.ErrFailedConnect, mqtt.ErrClientNotInitialized), }, { desc: "connect with active session", session: &sessionClient, logMsg: fmt.Sprintf(mqtt.LogInfoConnected, clientID), + err: nil, }, } @@ -253,8 +255,9 @@ func TestConnect(t *testing.T) { if tc.session != nil { ctx = session.NewContext(ctx, tc.session) } - handler.Connect(ctx) + err := handler.Connect(ctx) assert.Contains(t, logBuffer.String(), tc.logMsg) + assert.Equal(t, tc.err, err) } } @@ -272,13 +275,14 @@ func TestPublish(t *testing.T) { topic string payload []byte logMsg string + err error }{ { desc: "publish without active session", session: nil, topic: topic, payload: payload, - logMsg: mqtt.ErrClientNotInitialized.Error(), + err: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrClientNotInitialized), }, { desc: "publish with invalid topic", @@ -286,27 +290,28 @@ func TestPublish(t *testing.T) { topic: invalidTopic, payload: payload, logMsg: fmt.Sprintf(mqtt.LogInfoPublished, clientID, invalidTopic), + err: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrMalformedTopic), }, { desc: "publish with invalid channel ID", session: &sessionClient, topic: invalidChannelIDTopic, payload: payload, - logMsg: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrMalformedTopic).Error(), + err: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrMalformedTopic), }, { desc: "publish with malformed subtopic", session: &sessionClient, topic: malformedSubtopics, payload: payload, - logMsg: mqtt.ErrMalformedSubtopic.Error(), + err: errors.Wrap(mqtt.ErrFailedParseSubtopic, mqtt.ErrMalformedSubtopic), }, { desc: "publish with subtopic containing wrong character", session: &sessionClient, topic: wrongCharSubtopics, payload: payload, - logMsg: mqtt.ErrMalformedSubtopic.Error(), + err: errors.Wrap(mqtt.ErrFailedParseSubtopic, mqtt.ErrMalformedSubtopic), }, { desc: "publish with subtopic", @@ -329,8 +334,9 @@ func TestPublish(t *testing.T) { if tc.session != nil { ctx = session.NewContext(ctx, tc.session) } - handler.Publish(ctx, &tc.topic, &tc.payload) + err := handler.Publish(ctx, &tc.topic, &tc.payload) assert.Contains(t, logBuffer.String(), tc.logMsg) + assert.Equal(t, tc.err, err) } } @@ -343,12 +349,13 @@ func TestSubscribe(t *testing.T) { session *session.Session topic []string logMsg string + err error }{ { desc: "subscribe without active session", session: nil, topic: topics, - logMsg: errors.Wrap(mqtt.ErrFailedSubscribe, mqtt.ErrClientNotInitialized).Error(), + err: errors.Wrap(mqtt.ErrFailedSubscribe, mqtt.ErrClientNotInitialized), }, { desc: "subscribe with valid session and topics", @@ -363,8 +370,9 @@ func TestSubscribe(t *testing.T) { if tc.session != nil { ctx = session.NewContext(ctx, tc.session) } - handler.Subscribe(ctx, &tc.topic) + err := handler.Subscribe(ctx, &tc.topic) assert.Contains(t, logBuffer.String(), tc.logMsg) + assert.Equal(t, tc.err, err) } } @@ -377,12 +385,13 @@ func TestUnsubscribe(t *testing.T) { session *session.Session topic []string logMsg string + err error }{ { desc: "unsubscribe without active session", session: nil, topic: topics, - logMsg: errors.Wrap(mqtt.ErrFailedUnsubscribe, mqtt.ErrClientNotInitialized).Error(), + err: errors.Wrap(mqtt.ErrFailedUnsubscribe, mqtt.ErrClientNotInitialized), }, { desc: "unsubscribe with valid session and topics", @@ -397,8 +406,9 @@ func TestUnsubscribe(t *testing.T) { if tc.session != nil { ctx = session.NewContext(ctx, tc.session) } - handler.Unsubscribe(ctx, &tc.topic) + err := handler.Unsubscribe(ctx, &tc.topic) assert.Contains(t, logBuffer.String(), tc.logMsg) + assert.Equal(t, tc.err, err) } } @@ -411,18 +421,19 @@ func TestDisconnect(t *testing.T) { session *session.Session topic []string logMsg string + err error }{ { desc: "disconnect without active session", session: nil, topic: topics, - logMsg: errors.Wrap(mqtt.ErrFailedDisconnect, mqtt.ErrClientNotInitialized).Error(), + err: errors.Wrap(mqtt.ErrFailedDisconnect, mqtt.ErrClientNotInitialized), }, { desc: "disconnect with valid session", session: &sessionClient, topic: topics, - logMsg: mqtt.ErrClientNotInitialized.Error(), + err: nil, }, } @@ -431,8 +442,9 @@ func TestDisconnect(t *testing.T) { if tc.session != nil { ctx = session.NewContext(ctx, tc.session) } - handler.Disconnect(ctx) + err := handler.Disconnect(ctx) assert.Contains(t, logBuffer.String(), tc.logMsg) + assert.Equal(t, tc.err, err) } } diff --git a/mqtt/tracing/doc.go b/mqtt/tracing/doc.go index fb1b6896..a81666e8 100644 --- a/mqtt/tracing/doc.go +++ b/mqtt/tracing/doc.go @@ -1,11 +1,11 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -// Package tracing provides tracing instrumentation for Mainflux WebSocket adapter service. +// Package tracing provides tracing instrumentation for Mainflux MQTT adapter service. // -// This package provides tracing middleware for Mainflux WebSocket adapter service. +// This package provides tracing middleware for Mainflux MQTT adapter service. // It can be used to trace incoming requests and add tracing capabilities to -// Mainflux WebSocket adapter service. +// Mainflux MQTT adapter service. // // For more details about tracing instrumentation for Mainflux messaging refer // to the documentation at https://docs.mainflux.io/tracing/. diff --git a/mqtt/tracing/handler.go b/mqtt/tracing/handler.go index 6b02c8e8..ce2347e5 100644 --- a/mqtt/tracing/handler.go +++ b/mqtt/tracing/handler.go @@ -81,36 +81,36 @@ func (h *handlerMiddleware) AuthSubscribe(ctx context.Context, topics *[]string) } // Connect traces connect operations. -func (h *handlerMiddleware) Connect(ctx context.Context) { +func (h *handlerMiddleware) Connect(ctx context.Context) error { ctx, span := h.tracer.Start(ctx, connectOP) defer span.End() - h.handler.Connect(ctx) + return h.handler.Connect(ctx) } // Disconnect traces disconnect operations. -func (h *handlerMiddleware) Disconnect(ctx context.Context) { +func (h *handlerMiddleware) Disconnect(ctx context.Context) error { ctx, span := h.tracer.Start(ctx, disconnectOP) defer span.End() - h.handler.Disconnect(ctx) + return h.handler.Disconnect(ctx) } // Publish traces publish operations. -func (h *handlerMiddleware) Publish(ctx context.Context, topic *string, payload *[]byte) { +func (h *handlerMiddleware) Publish(ctx context.Context, topic *string, payload *[]byte) error { ctx, span := h.tracer.Start(ctx, publishOP) defer span.End() - h.handler.Publish(ctx, topic, payload) + return h.handler.Publish(ctx, topic, payload) } // Subscribe traces subscribe operations. -func (h *handlerMiddleware) Subscribe(ctx context.Context, topics *[]string) { +func (h *handlerMiddleware) Subscribe(ctx context.Context, topics *[]string) error { ctx, span := h.tracer.Start(ctx, subscribeOP) defer span.End() - h.handler.Subscribe(ctx, topics) + return h.handler.Subscribe(ctx, topics) } // Unsubscribe traces unsubscribe operations. -func (h *handlerMiddleware) Unsubscribe(ctx context.Context, topics *[]string) { +func (h *handlerMiddleware) Unsubscribe(ctx context.Context, topics *[]string) error { ctx, span := h.tracer.Start(ctx, unsubscribeOP) defer span.End() - h.handler.Unsubscribe(ctx, topics) + return h.handler.Unsubscribe(ctx, topics) } diff --git a/vendor/github.com/mainflux/mproxy/pkg/session/handler.go b/vendor/github.com/mainflux/mproxy/pkg/session/handler.go index de62ca78..a58bc6af 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/session/handler.go +++ b/vendor/github.com/mainflux/mproxy/pkg/session/handler.go @@ -17,17 +17,17 @@ type Handler interface { AuthSubscribe(ctx context.Context, topics *[]string) error // After client successfully connected - Connect(ctx context.Context) + Connect(ctx context.Context) error // After client successfully published - Publish(ctx context.Context, topic *string, payload *[]byte) + Publish(ctx context.Context, topic *string, payload *[]byte) error // After client successfully subscribed - Subscribe(ctx context.Context, topics *[]string) + Subscribe(ctx context.Context, topics *[]string) error // After client unsubscribed - Unsubscribe(ctx context.Context, topics *[]string) + Unsubscribe(ctx context.Context, topics *[]string) error // Disconnect on connection with client lost - Disconnect(ctx context.Context) + Disconnect(ctx context.Context) error } diff --git a/vendor/github.com/mainflux/mproxy/pkg/session/stream.go b/vendor/github.com/mainflux/mproxy/pkg/session/stream.go index 48ca217b..b73b2543 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/session/stream.go +++ b/vendor/github.com/mainflux/mproxy/pkg/session/stream.go @@ -67,7 +67,9 @@ func stream(ctx context.Context, dir direction, r, w net.Conn, h Handler, errs c } if dir == up { - notify(ctx, pkt, h) + if err := notify(ctx, pkt, h); err != nil { + errs <- wrap(ctx, err, dir) + } } } } @@ -101,18 +103,18 @@ func authorize(ctx context.Context, pkt packets.ControlPacket, h Handler) error } } -func notify(ctx context.Context, pkt packets.ControlPacket, h Handler) { +func notify(ctx context.Context, pkt packets.ControlPacket, h Handler) error { switch p := pkt.(type) { case *packets.ConnectPacket: - h.Connect(ctx) + return h.Connect(ctx) case *packets.PublishPacket: - h.Publish(ctx, &p.TopicName, &p.Payload) + return h.Publish(ctx, &p.TopicName, &p.Payload) case *packets.SubscribePacket: - h.Subscribe(ctx, &p.Topics) + return h.Subscribe(ctx, &p.Topics) case *packets.UnsubscribePacket: - h.Unsubscribe(ctx, &p.Topics) + return h.Unsubscribe(ctx, &p.Topics) default: - return + return nil } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 35953fe9..4795f6e4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -430,7 +430,7 @@ github.com/magiconair/properties # github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 ## explicit; go 1.20 github.com/mainflux/callhome/pkg/client -# github.com/mainflux/mproxy v0.3.0 +# github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 ## explicit; go 1.19 github.com/mainflux/mproxy/pkg/logger github.com/mainflux/mproxy/pkg/mqtt