From 649986b19faad871678d27a061e55cf4775bcac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Thu, 25 Jul 2019 02:22:28 +0200 Subject: [PATCH] MF-757 - Add support for explicit content type (#771) * Add format to raw message and content type mapping Signed-off-by: Aleksandar Novakovic * Add format as part of content type header Signed-off-by: Aleksandar Novakovic * Add format to content-type header for WS adapter Add format as part of content-type and add support for content-type header and query parameter. Signed-off-by: Aleksandar Novakovic * Add support for format and content-type in MQTT adapter Signed-off-by: Aleksandar Novakovic * Add format and content-type to CoAP adapter Signed-off-by: Aleksandar Novakovic * Add documentation for content type and format Signed-off-by: Aleksandar Novakovic * Add comments to MQTT code Signed-off-by: Aleksandar Novakovic * Fix SenML JSON CoAP code type Signed-off-by: Aleksandar Novakovic * Remove format from the adapters Signed-off-by: Aleksandar Novakovic * Remove format from the docs Signed-off-by: Aleksandar Novakovic * Remove format field from MQTT adapter Signed-off-by: Aleksandar Novakovic * Update content type to be optional Signed-off-by: Aleksandar Novakovic * Add support for CBOR content type in the normalizer Signed-off-by: Aleksandar Novakovic * Update docs for MQTT content type Signed-off-by: Aleksandar Novakovic --- coap/api/transport.go | 80 +++++++++++++++++++++++++++++++--------- docs/messaging.md | 31 +++++++++++----- http/api/transport.go | 7 +++- message.go | 8 ++++ mqtt/mqtt.js | 20 ++++++++-- normalizer/normalizer.go | 12 +++++- version.go | 4 ++ ws/api/transport.go | 46 ++++++++++++++++++----- 8 files changed, 168 insertions(+), 40 deletions(-) diff --git a/coap/api/transport.go b/coap/api/transport.go index 65126b69..37db8d2f 100644 --- a/coap/api/transport.go +++ b/coap/api/transport.go @@ -15,6 +15,7 @@ import ( "fmt" "net" "net/http" + "net/url" "regexp" "strings" "time" @@ -29,7 +30,11 @@ import ( "google.golang.org/grpc/status" ) -const protocol = "coap" +const ( + protocol = "coap" + senMLJSON gocoap.MediaType = 110 + senMLCBOR gocoap.MediaType = 112 +) var ( errBadRequest = errors.New("bad request") @@ -112,23 +117,31 @@ func subtopic(msg *gocoap.Message) string { func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, error) { // Device Key is passed as Uri-Query parameter, which option ID is 15 (0xf). - key, err := authKey(msg.Option(gocoap.URIQuery)) - if err != nil { - switch err { - case errBadOption: - res.Code = gocoap.BadOption - case errBadRequest: - res.Code = gocoap.BadRequest - } - - return "", err + query := msg.Option(gocoap.URIQuery) + queryStr, ok := query.(string) + if !ok { + res.Code = gocoap.BadRequest + return "", errBadRequest } + params, err := url.ParseQuery(queryStr) + if err != nil { + res.Code = gocoap.BadRequest + return "", errBadRequest + } + + auths, ok := params["authorization"] + if !ok || len(auths) != 1 { + res.Code = gocoap.BadRequest + return "", errBadRequest + } + + key := auths[0] + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: key, ChanID: cid}) - if err != nil { e, ok := status.FromError(err) if ok { @@ -142,6 +155,7 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, er } res.Code = gocoap.InternalServerError } + return id.GetValue(), nil } @@ -207,6 +221,11 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message { return res } + ct, err := contentType(msg) + if err != nil { + ct = mainflux.SenMLJSON + } + publisher, err := authorize(msg, res, chanID) if err != nil { res.Code = gocoap.Forbidden @@ -214,11 +233,12 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message { } rawMsg := mainflux.RawMessage{ - Channel: chanID, - Subtopic: subtopic, - Publisher: publisher, - Protocol: protocol, - Payload: msg.Payload, + Channel: chanID, + Subtopic: subtopic, + Publisher: publisher, + ContentType: ct, + Protocol: protocol, + Payload: msg.Payload, } if err := svc.Publish(context.Background(), "", rawMsg); err != nil { @@ -316,6 +336,15 @@ func handleMessage(conn *net.UDPConn, addr *net.UDPAddr, o *coap.Observer, msg * observeVal := buff.Bytes() notifyMsg.SetOption(gocoap.Observe, observeVal[len(observeVal)-3:]) + coapCT := senMLJSON + switch msg.ContentType { + case mainflux.SenMLJSON: + coapCT = senMLJSON + case mainflux.SenMLCBOR: + coapCT = senMLCBOR + } + notifyMsg.SetOption(gocoap.ContentFormat, coapCT) + if err := gocoap.Transmit(conn, addr, notifyMsg); err != nil { logger.Warn(fmt.Sprintf("Failed to send message to observer: %s", err)) } @@ -360,3 +389,20 @@ func ping(svc coap.Service, obsID string, conn *net.UDPConn, addr *net.UDPAddr, } } } + +func contentType(msg *gocoap.Message) (string, error) { + ctid, ok := msg.Option(gocoap.ContentFormat).(gocoap.MediaType) + if !ok { + return "", errBadRequest + } + + ct := "" + switch ctid { + case senMLJSON: + ct = mainflux.SenMLJSON + case senMLCBOR: + ct = mainflux.SenMLCBOR + } + + return ct, nil +} diff --git a/docs/messaging.md b/docs/messaging.md index 0522310e..f5badff4 100644 --- a/docs/messaging.md +++ b/docs/messaging.md @@ -10,20 +10,22 @@ To publish message over channel, thing should send following request: curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/senml+json" -H "Authorization: " https://localhost/http/channels//messages -d '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' ``` -Note that you should always send array of messages in senML format. +Note that if you're going to use senml message format, you should always send +messages as an array. ## WebSocket To publish and receive messages over channel using web socket, you should first send handshake request to `/channels//messages` path. Don't forget -to send `Authorization` header with thing authorization token. +to send `Authorization` header with thing authorization token. In order to pass +message content type to WS adapter you can use `Content-Type` header. -If you are not able to send custom headers in your handshake request, send it as -query parameter `authorization`. Then your path should look like this -`/channels//messages?authorization=`. +If you are not able to send custom headers in your handshake request, send them as +query parameter `authorization` and `content-type`. Then your path should look like +this `/channels//messages?authorization=&content-type=`. If you are using the docker environment prepend the url with `ws`. So for example -`/ws/channels//messages?authorization=` +`/ws/channels//messages?authorization=&content-type=`. ### Basic nodejs example @@ -34,7 +36,7 @@ const WebSocket = require('ws'); process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0' // cbf02d60-72f2-4180-9f82-2c957db929d1 is an example of a thing_auth_key -const ws = new WebSocket('wss://localhost/ws/channels/1/messages?authorization=cbf02d60-72f2-4180-9f82-2c957db929d1') +const ws = new WebSocket('wss://localhost/ws/channels/1/messages?authorization=cbf02d60-72f2-4180-9f82-2c957db929d1&content-type=application%2Fsenml%2Bjson') ws.on('open', () => { ws.send('something') @@ -65,6 +67,16 @@ To subscribe to channel, thing should call following command: mosquitto_sub -u -P -t channels//messages -h localhost ``` +In order to pass content type as part of topic, one should append it to the end +of an existing topic. Content type value should always be prefixed with `/ct/`. +If you want to use standard topic such as `channels//messages` +with SenML content type, you should use following topic `channels//messages/ct/application_senml-json`. +If there is no `/ct/` prefix in the subtopic, then content type will have the +default value which is `application/senml+json`. Content type will be removed from +the topic under the hood. You should pass content type only when you're publishing +a message. Characters like `_` and `-` in the content type will be replaced with `/` +and `+` respectively. + If you are using TLS to secure MQTT connection, add `--cafile docker/ssl/certs/ca.crt` to every command. @@ -76,7 +88,8 @@ CoAP adapter implements CoAP protocol using underlying UDP and according to [RFC coap://localhost/channels//messages?authorization= ``` -To send a message, use `POST` request. To subscribe, send `GET` request with Observe option set to 0. There are two ways to unsubscribe: +To send a message, use `POST` request. When posting a message you can pass content type in `Content-Format` option. +To subscribe, send `GET` request with Observe option set to 0. There are two ways to unsubscribe: 1) Send `GET` request with Observe option set to 1. 2) Forget the token and send `RST` message as a response to `CONF` message received by the server. @@ -84,7 +97,7 @@ The most of the notifications received from the Adapter are non-confirmable. By > Server must send a notification in a confirmable message instead of a non-confirmable message at least every 24 hours. This prevents a client that went away or is no longer interested from remaining in the list of observers indefinitely. -CoAP Adapter sends these notifications every 12 hours. To configure this period, please check (adapter documentation)[https://www.github.com/mainflux/mainflux/tree/master/coap/README.md) If the client is no longer interested in receiving notifications, the second scenario described above can be used to unsubscribe +CoAP Adapter sends these notifications every 12 hours. To configure this period, please check [adapter documentation](https://www.github.com/mainflux/mainflux/tree/master/coap/README.md) If the client is no longer interested in receiving notifications, the second scenario described above can be used to unsubscribe. ## Subtopics diff --git a/http/api/transport.go b/http/api/transport.go index c1679a8d..2fd22165 100644 --- a/http/api/transport.go +++ b/http/api/transport.go @@ -110,9 +110,14 @@ func decodeRequest(ctx context.Context, r *http.Request) (interface{}, error) { return nil, err } + ct := r.Header.Get("Content-Type") + if ct == "" { + ct = mainflux.SenMLJSON + } + msg := mainflux.RawMessage{ Protocol: protocol, - ContentType: r.Header.Get("Content-Type"), + ContentType: ct, Channel: chanID, Subtopic: subtopic, Payload: payload, diff --git a/message.go b/message.go index ec0deb1b..2efa64a5 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,14 @@ package mainflux import "encoding/json" +const ( + // SenMLJSON represents SenML in JSON format content type. + SenMLJSON = "application/senml+json" + + // SenMLCBOR represents SenML in CBOR format content type. + SenMLCBOR = "application/senml+cbor" +) + // Type messageType is introduced to prevent cycle when calling Message // MarshalJSON and UnmarshalJSON methods. type messageType Message diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index bdfc672e..6ab45e76 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -160,7 +160,8 @@ aedes.authorizePublish = function (client, packet, publish) { isEmpty = function(value) { return value !== ''; }, - elements = packet.topic.split('/').slice(baseLength).join('.').split('.').filter(isEmpty), + parts = packet.topic.split('/'), + elements = parts.slice(baseLength).join('.').split('.').filter(isEmpty), baseTopic = 'channel.' + channelId; // Remove empty elements for (var i = 0; i < elements.length; i++) { @@ -171,14 +172,27 @@ aedes.authorizePublish = function (client, packet, publish) { return; } } - var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic, + + var contentType = 'application/senml+json', + st = elements; + + if (elements.length > 1 && elements[elements.length - 2] === 'ct') { + // If there is ct prefix, read and decode content type. + contentType = elements[elements.length - 1].replace('_', '/').replace('-', '+'); + st = elements.slice(0, elements.length - 2); + parts = parts.slice(0, parts.length - 2); + } + packet.topic = parts.join('/'); + + var channelTopic = st.length ? baseTopic + '.' + st.join('.') : baseTopic, onAuthorize = function (err, res) { var rawMsg; if (!err) { rawMsg = RawMessage.encode({ publisher: client.thingId, channel: channelId, - subtopic: elements.join('.'), + subtopic: st.join('.'), + contentType: contentType, protocol: 'mqtt', payload: packet.payload }).finish(); diff --git a/normalizer/normalizer.go b/normalizer/normalizer.go index 0df09db9..6bcaa17f 100644 --- a/normalizer/normalizer.go +++ b/normalizer/normalizer.go @@ -14,6 +14,11 @@ import ( "github.com/mainflux/mainflux" ) +var formats = map[string]senml.Format{ + mainflux.SenMLJSON: senml.JSON, + mainflux.SenMLCBOR: senml.CBOR, +} + type normalizer struct{} // New returns normalizer service implementation. @@ -22,7 +27,12 @@ func New() Service { } func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { - raw, err := senml.Decode(msg.Payload, senml.JSON) + format, ok := formats[msg.ContentType] + if !ok { + format = senml.JSON + } + + raw, err := senml.Decode(msg.Payload, format) if err != nil { return NormalizedData{}, err } diff --git a/version.go b/version.go index 403abccf..cae0dbbe 100644 --- a/version.go +++ b/version.go @@ -14,8 +14,12 @@ import ( const version string = "0.9.0" +// VersionInfo contains version endpoint response. type VersionInfo struct { + // Service contains service name. Service string `json:"service"` + + // Version contains service current version value. Version string `json:"version"` } diff --git a/ws/api/transport.go b/ws/api/transport.go index c9c3b85a..38ceb7ec 100644 --- a/ws/api/transport.go +++ b/ws/api/transport.go @@ -28,7 +28,9 @@ import ( "google.golang.org/grpc/status" ) -const protocol = "ws" +const ( + protocol = "ws" +) var ( errUnauthorizedAccess = errors.New("missing or invalid credentials provided") @@ -48,6 +50,11 @@ var ( channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) ) +var contentTypes = map[string]int{ + mainflux.SenMLJSON: websocket.TextMessage, + mainflux.SenMLCBOR: websocket.BinaryMessage, +} + // MakeHandler returns http handler with handshake endpoint. func MakeHandler(svc ws.Service, tc mainflux.ThingsServiceClient, l log.Logger) http.Handler { auth = tc @@ -77,6 +84,8 @@ func handshake(svc ws.Service) http.HandlerFunc { } } + ct := contentType(r) + channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI) if len(channelParts) < 2 { logger.Warn(fmt.Sprintf("Empty channel id or malformed url")) @@ -108,7 +117,7 @@ func handshake(svc ws.Service) http.HandlerFunc { go sub.listen() // Start listening for messages from NATS. - go sub.broadcast(svc) + go sub.broadcast(svc, ct) } } @@ -176,6 +185,19 @@ func authorize(r *http.Request) (subscription, error) { return sub, nil } +func contentType(r *http.Request) string { + ct := r.Header.Get("Content-Type") + if ct == "" { + ctvals := bone.GetQuery(r, "content-type") + if len(ctvals) == 0 { + return mainflux.SenMLJSON + } + ct = ctvals[0] + } + + return ct +} + type subscription struct { pubID string chanID string @@ -184,7 +206,7 @@ type subscription struct { channel *ws.Channel } -func (sub subscription) broadcast(svc ws.Service) { +func (sub subscription) broadcast(svc ws.Service, contentType string) { for { _, payload, err := sub.conn.ReadMessage() if websocket.IsUnexpectedCloseError(err) { @@ -196,11 +218,12 @@ func (sub subscription) broadcast(svc ws.Service) { return } msg := mainflux.RawMessage{ - Channel: sub.chanID, - Subtopic: sub.subtopic, - Publisher: sub.pubID, - Protocol: protocol, - Payload: payload, + Channel: sub.chanID, + Subtopic: sub.subtopic, + ContentType: contentType, + Publisher: sub.pubID, + Protocol: protocol, + Payload: payload, } if err := svc.Publish(context.Background(), "", msg); err != nil { logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err)) @@ -215,7 +238,12 @@ func (sub subscription) broadcast(svc ws.Service) { func (sub subscription) listen() { for msg := range sub.channel.Messages { - if err := sub.conn.WriteMessage(websocket.TextMessage, msg.Payload); err != nil { + format, ok := contentTypes[msg.ContentType] + if !ok { + format = websocket.TextMessage + } + + if err := sub.conn.WriteMessage(format, msg.Payload); err != nil { logger.Warn(fmt.Sprintf("Failed to broadcast message to thing: %s", err)) } }