MF-757 - Add support for explicit content type (#771)

* Add format to raw message and content type mapping

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add format as part of content type header

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add format to content-type header for WS adapter

Add format as part of content-type and add support for
content-type header and query parameter.

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add support for format and content-type in MQTT adapter

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add format and content-type to CoAP adapter

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add documentation for content type and format

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add comments to MQTT code

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Fix SenML JSON CoAP code type

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Remove format from the adapters

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Remove format from the docs

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Remove format field from MQTT adapter

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Update content type to be optional

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Add support for CBOR content type in the normalizer

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>

* Update docs for MQTT content type

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
This commit is contained in:
Aleksandar Novaković 2019-07-25 02:22:28 +02:00 committed by Drasko DRASKOVIC
parent 99ced38229
commit 649986b19f
8 changed files with 168 additions and 40 deletions

View File

@ -15,6 +15,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"net/url"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@ -29,7 +30,11 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
const protocol = "coap" const (
protocol = "coap"
senMLJSON gocoap.MediaType = 110
senMLCBOR gocoap.MediaType = 112
)
var ( var (
errBadRequest = errors.New("bad request") errBadRequest = errors.New("bad request")
@ -112,23 +117,31 @@ func subtopic(msg *gocoap.Message) string {
func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, error) { func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, error) {
// Device Key is passed as Uri-Query parameter, which option ID is 15 (0xf). // Device Key is passed as Uri-Query parameter, which option ID is 15 (0xf).
key, err := authKey(msg.Option(gocoap.URIQuery)) query := msg.Option(gocoap.URIQuery)
if err != nil { queryStr, ok := query.(string)
switch err { if !ok {
case errBadOption: res.Code = gocoap.BadRequest
res.Code = gocoap.BadOption return "", errBadRequest
case errBadRequest:
res.Code = gocoap.BadRequest
}
return "", err
} }
params, err := url.ParseQuery(queryStr)
if err != nil {
res.Code = gocoap.BadRequest
return "", errBadRequest
}
auths, ok := params["authorization"]
if !ok || len(auths) != 1 {
res.Code = gocoap.BadRequest
return "", errBadRequest
}
key := auths[0]
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: key, ChanID: cid}) id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: key, ChanID: cid})
if err != nil { if err != nil {
e, ok := status.FromError(err) e, ok := status.FromError(err)
if ok { if ok {
@ -142,6 +155,7 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, er
} }
res.Code = gocoap.InternalServerError res.Code = gocoap.InternalServerError
} }
return id.GetValue(), nil return id.GetValue(), nil
} }
@ -207,6 +221,11 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message {
return res return res
} }
ct, err := contentType(msg)
if err != nil {
ct = mainflux.SenMLJSON
}
publisher, err := authorize(msg, res, chanID) publisher, err := authorize(msg, res, chanID)
if err != nil { if err != nil {
res.Code = gocoap.Forbidden res.Code = gocoap.Forbidden
@ -214,11 +233,12 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message {
} }
rawMsg := mainflux.RawMessage{ rawMsg := mainflux.RawMessage{
Channel: chanID, Channel: chanID,
Subtopic: subtopic, Subtopic: subtopic,
Publisher: publisher, Publisher: publisher,
Protocol: protocol, ContentType: ct,
Payload: msg.Payload, Protocol: protocol,
Payload: msg.Payload,
} }
if err := svc.Publish(context.Background(), "", rawMsg); err != nil { if err := svc.Publish(context.Background(), "", rawMsg); err != nil {
@ -316,6 +336,15 @@ func handleMessage(conn *net.UDPConn, addr *net.UDPAddr, o *coap.Observer, msg *
observeVal := buff.Bytes() observeVal := buff.Bytes()
notifyMsg.SetOption(gocoap.Observe, observeVal[len(observeVal)-3:]) notifyMsg.SetOption(gocoap.Observe, observeVal[len(observeVal)-3:])
coapCT := senMLJSON
switch msg.ContentType {
case mainflux.SenMLJSON:
coapCT = senMLJSON
case mainflux.SenMLCBOR:
coapCT = senMLCBOR
}
notifyMsg.SetOption(gocoap.ContentFormat, coapCT)
if err := gocoap.Transmit(conn, addr, notifyMsg); err != nil { if err := gocoap.Transmit(conn, addr, notifyMsg); err != nil {
logger.Warn(fmt.Sprintf("Failed to send message to observer: %s", err)) logger.Warn(fmt.Sprintf("Failed to send message to observer: %s", err))
} }
@ -360,3 +389,20 @@ func ping(svc coap.Service, obsID string, conn *net.UDPConn, addr *net.UDPAddr,
} }
} }
} }
func contentType(msg *gocoap.Message) (string, error) {
ctid, ok := msg.Option(gocoap.ContentFormat).(gocoap.MediaType)
if !ok {
return "", errBadRequest
}
ct := ""
switch ctid {
case senMLJSON:
ct = mainflux.SenMLJSON
case senMLCBOR:
ct = mainflux.SenMLCBOR
}
return ct, nil
}

View File

@ -10,20 +10,22 @@ To publish message over channel, thing should send following request:
curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/senml+json" -H "Authorization: <thing_token>" https://localhost/http/channels/<channel_id>/messages -d '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/senml+json" -H "Authorization: <thing_token>" https://localhost/http/channels/<channel_id>/messages -d '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]'
``` ```
Note that you should always send array of messages in senML format. Note that if you're going to use senml message format, you should always send
messages as an array.
## WebSocket ## WebSocket
To publish and receive messages over channel using web socket, you should first To publish and receive messages over channel using web socket, you should first
send handshake request to `/channels/<channel_id>/messages` path. Don't forget send handshake request to `/channels/<channel_id>/messages` path. Don't forget
to send `Authorization` header with thing authorization token. to send `Authorization` header with thing authorization token. In order to pass
message content type to WS adapter you can use `Content-Type` header.
If you are not able to send custom headers in your handshake request, send it as If you are not able to send custom headers in your handshake request, send them as
query parameter `authorization`. Then your path should look like this query parameter `authorization` and `content-type`. Then your path should look like
`/channels/<channel_id>/messages?authorization=<thing_auth_key>`. this `/channels/<channel_id>/messages?authorization=<thing_auth_key>&content-type=<content-type>`.
If you are using the docker environment prepend the url with `ws`. So for example If you are using the docker environment prepend the url with `ws`. So for example
`/ws/channels/<channel_id>/messages?authorization=<thing_auth_key>` `/ws/channels/<channel_id>/messages?authorization=<thing_auth_key>&content-type=<content-type>`.
### Basic nodejs example ### Basic nodejs example
@ -34,7 +36,7 @@ const WebSocket = require('ws');
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0' process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'
// cbf02d60-72f2-4180-9f82-2c957db929d1 is an example of a thing_auth_key // cbf02d60-72f2-4180-9f82-2c957db929d1 is an example of a thing_auth_key
const ws = new WebSocket('wss://localhost/ws/channels/1/messages?authorization=cbf02d60-72f2-4180-9f82-2c957db929d1') const ws = new WebSocket('wss://localhost/ws/channels/1/messages?authorization=cbf02d60-72f2-4180-9f82-2c957db929d1&content-type=application%2Fsenml%2Bjson')
ws.on('open', () => { ws.on('open', () => {
ws.send('something') ws.send('something')
@ -65,6 +67,16 @@ To subscribe to channel, thing should call following command:
mosquitto_sub -u <thing_id> -P <thing_key> -t channels/<channel_id>/messages -h localhost mosquitto_sub -u <thing_id> -P <thing_key> -t channels/<channel_id>/messages -h localhost
``` ```
In order to pass content type as part of topic, one should append it to the end
of an existing topic. Content type value should always be prefixed with `/ct/`.
If you want to use standard topic such as `channels/<channel_id>/messages`
with SenML content type, you should use following topic `channels/<channel_id>/messages/ct/application_senml-json`.
If there is no `/ct/` prefix in the subtopic, then content type will have the
default value which is `application/senml+json`. Content type will be removed from
the topic under the hood. You should pass content type only when you're publishing
a message. Characters like `_` and `-` in the content type will be replaced with `/`
and `+` respectively.
If you are using TLS to secure MQTT connection, add `--cafile docker/ssl/certs/ca.crt` If you are using TLS to secure MQTT connection, add `--cafile docker/ssl/certs/ca.crt`
to every command. to every command.
@ -76,7 +88,8 @@ CoAP adapter implements CoAP protocol using underlying UDP and according to [RFC
coap://localhost/channels/<channel_id>/messages?authorization=<thing_auth_key> coap://localhost/channels/<channel_id>/messages?authorization=<thing_auth_key>
``` ```
To send a message, use `POST` request. To subscribe, send `GET` request with Observe option set to 0. There are two ways to unsubscribe: To send a message, use `POST` request. When posting a message you can pass content type in `Content-Format` option.
To subscribe, send `GET` request with Observe option set to 0. There are two ways to unsubscribe:
1) Send `GET` request with Observe option set to 1. 1) Send `GET` request with Observe option set to 1.
2) Forget the token and send `RST` message as a response to `CONF` message received by the server. 2) Forget the token and send `RST` message as a response to `CONF` message received by the server.
@ -84,7 +97,7 @@ The most of the notifications received from the Adapter are non-confirmable. By
> Server must send a notification in a confirmable message instead of a non-confirmable message at least every 24 hours. This prevents a client that went away or is no longer interested from remaining in the list of observers indefinitely. > Server must send a notification in a confirmable message instead of a non-confirmable message at least every 24 hours. This prevents a client that went away or is no longer interested from remaining in the list of observers indefinitely.
CoAP Adapter sends these notifications every 12 hours. To configure this period, please check (adapter documentation)[https://www.github.com/mainflux/mainflux/tree/master/coap/README.md) If the client is no longer interested in receiving notifications, the second scenario described above can be used to unsubscribe CoAP Adapter sends these notifications every 12 hours. To configure this period, please check [adapter documentation](https://www.github.com/mainflux/mainflux/tree/master/coap/README.md) If the client is no longer interested in receiving notifications, the second scenario described above can be used to unsubscribe.
## Subtopics ## Subtopics

View File

@ -110,9 +110,14 @@ func decodeRequest(ctx context.Context, r *http.Request) (interface{}, error) {
return nil, err return nil, err
} }
ct := r.Header.Get("Content-Type")
if ct == "" {
ct = mainflux.SenMLJSON
}
msg := mainflux.RawMessage{ msg := mainflux.RawMessage{
Protocol: protocol, Protocol: protocol,
ContentType: r.Header.Get("Content-Type"), ContentType: ct,
Channel: chanID, Channel: chanID,
Subtopic: subtopic, Subtopic: subtopic,
Payload: payload, Payload: payload,

View File

@ -2,6 +2,14 @@ package mainflux
import "encoding/json" import "encoding/json"
const (
// SenMLJSON represents SenML in JSON format content type.
SenMLJSON = "application/senml+json"
// SenMLCBOR represents SenML in CBOR format content type.
SenMLCBOR = "application/senml+cbor"
)
// Type messageType is introduced to prevent cycle when calling Message // Type messageType is introduced to prevent cycle when calling Message
// MarshalJSON and UnmarshalJSON methods. // MarshalJSON and UnmarshalJSON methods.
type messageType Message type messageType Message

View File

@ -160,7 +160,8 @@ aedes.authorizePublish = function (client, packet, publish) {
isEmpty = function(value) { isEmpty = function(value) {
return value !== ''; return value !== '';
}, },
elements = packet.topic.split('/').slice(baseLength).join('.').split('.').filter(isEmpty), parts = packet.topic.split('/'),
elements = parts.slice(baseLength).join('.').split('.').filter(isEmpty),
baseTopic = 'channel.' + channelId; baseTopic = 'channel.' + channelId;
// Remove empty elements // Remove empty elements
for (var i = 0; i < elements.length; i++) { for (var i = 0; i < elements.length; i++) {
@ -171,14 +172,27 @@ aedes.authorizePublish = function (client, packet, publish) {
return; return;
} }
} }
var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic,
var contentType = 'application/senml+json',
st = elements;
if (elements.length > 1 && elements[elements.length - 2] === 'ct') {
// If there is ct prefix, read and decode content type.
contentType = elements[elements.length - 1].replace('_', '/').replace('-', '+');
st = elements.slice(0, elements.length - 2);
parts = parts.slice(0, parts.length - 2);
}
packet.topic = parts.join('/');
var channelTopic = st.length ? baseTopic + '.' + st.join('.') : baseTopic,
onAuthorize = function (err, res) { onAuthorize = function (err, res) {
var rawMsg; var rawMsg;
if (!err) { if (!err) {
rawMsg = RawMessage.encode({ rawMsg = RawMessage.encode({
publisher: client.thingId, publisher: client.thingId,
channel: channelId, channel: channelId,
subtopic: elements.join('.'), subtopic: st.join('.'),
contentType: contentType,
protocol: 'mqtt', protocol: 'mqtt',
payload: packet.payload payload: packet.payload
}).finish(); }).finish();

View File

@ -14,6 +14,11 @@ import (
"github.com/mainflux/mainflux" "github.com/mainflux/mainflux"
) )
var formats = map[string]senml.Format{
mainflux.SenMLJSON: senml.JSON,
mainflux.SenMLCBOR: senml.CBOR,
}
type normalizer struct{} type normalizer struct{}
// New returns normalizer service implementation. // New returns normalizer service implementation.
@ -22,7 +27,12 @@ func New() Service {
} }
func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) {
raw, err := senml.Decode(msg.Payload, senml.JSON) format, ok := formats[msg.ContentType]
if !ok {
format = senml.JSON
}
raw, err := senml.Decode(msg.Payload, format)
if err != nil { if err != nil {
return NormalizedData{}, err return NormalizedData{}, err
} }

View File

@ -14,8 +14,12 @@ import (
const version string = "0.9.0" const version string = "0.9.0"
// VersionInfo contains version endpoint response.
type VersionInfo struct { type VersionInfo struct {
// Service contains service name.
Service string `json:"service"` Service string `json:"service"`
// Version contains service current version value.
Version string `json:"version"` Version string `json:"version"`
} }

View File

@ -28,7 +28,9 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
const protocol = "ws" const (
protocol = "ws"
)
var ( var (
errUnauthorizedAccess = errors.New("missing or invalid credentials provided") errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
@ -48,6 +50,11 @@ var (
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
) )
var contentTypes = map[string]int{
mainflux.SenMLJSON: websocket.TextMessage,
mainflux.SenMLCBOR: websocket.BinaryMessage,
}
// MakeHandler returns http handler with handshake endpoint. // MakeHandler returns http handler with handshake endpoint.
func MakeHandler(svc ws.Service, tc mainflux.ThingsServiceClient, l log.Logger) http.Handler { func MakeHandler(svc ws.Service, tc mainflux.ThingsServiceClient, l log.Logger) http.Handler {
auth = tc auth = tc
@ -77,6 +84,8 @@ func handshake(svc ws.Service) http.HandlerFunc {
} }
} }
ct := contentType(r)
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI) channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 { if len(channelParts) < 2 {
logger.Warn(fmt.Sprintf("Empty channel id or malformed url")) logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
@ -108,7 +117,7 @@ func handshake(svc ws.Service) http.HandlerFunc {
go sub.listen() go sub.listen()
// Start listening for messages from NATS. // Start listening for messages from NATS.
go sub.broadcast(svc) go sub.broadcast(svc, ct)
} }
} }
@ -176,6 +185,19 @@ func authorize(r *http.Request) (subscription, error) {
return sub, nil return sub, nil
} }
func contentType(r *http.Request) string {
ct := r.Header.Get("Content-Type")
if ct == "" {
ctvals := bone.GetQuery(r, "content-type")
if len(ctvals) == 0 {
return mainflux.SenMLJSON
}
ct = ctvals[0]
}
return ct
}
type subscription struct { type subscription struct {
pubID string pubID string
chanID string chanID string
@ -184,7 +206,7 @@ type subscription struct {
channel *ws.Channel channel *ws.Channel
} }
func (sub subscription) broadcast(svc ws.Service) { func (sub subscription) broadcast(svc ws.Service, contentType string) {
for { for {
_, payload, err := sub.conn.ReadMessage() _, payload, err := sub.conn.ReadMessage()
if websocket.IsUnexpectedCloseError(err) { if websocket.IsUnexpectedCloseError(err) {
@ -196,11 +218,12 @@ func (sub subscription) broadcast(svc ws.Service) {
return return
} }
msg := mainflux.RawMessage{ msg := mainflux.RawMessage{
Channel: sub.chanID, Channel: sub.chanID,
Subtopic: sub.subtopic, Subtopic: sub.subtopic,
Publisher: sub.pubID, ContentType: contentType,
Protocol: protocol, Publisher: sub.pubID,
Payload: payload, Protocol: protocol,
Payload: payload,
} }
if err := svc.Publish(context.Background(), "", msg); err != nil { if err := svc.Publish(context.Background(), "", msg); err != nil {
logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err)) logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err))
@ -215,7 +238,12 @@ func (sub subscription) broadcast(svc ws.Service) {
func (sub subscription) listen() { func (sub subscription) listen() {
for msg := range sub.channel.Messages { for msg := range sub.channel.Messages {
if err := sub.conn.WriteMessage(websocket.TextMessage, msg.Payload); err != nil { format, ok := contentTypes[msg.ContentType]
if !ok {
format = websocket.TextMessage
}
if err := sub.conn.WriteMessage(format, msg.Payload); err != nil {
logger.Warn(fmt.Sprintf("Failed to broadcast message to thing: %s", err)) logger.Warn(fmt.Sprintf("Failed to broadcast message to thing: %s", err))
} }
} }