Mainflux.mainflux/lora/events/streams.go

197 lines
4.3 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package events
import (
"context"
"encoding/json"
"errors"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/pkg/events"
)
const (
keyType = "lora"
keyDevEUI = "dev_eui"
keyAppID = "app_id"
thingPrefix = "thing."
thingCreate = thingPrefix + "create"
thingUpdate = thingPrefix + "update"
thingRemove = thingPrefix + "remove"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
channelPrefix = "channel."
channelCreate = channelPrefix + "create"
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"
)
var (
errMetadataType = errors.New("field lora is missing in the metadata")
errMetadataFormat = errors.New("malformed metadata")
errMetadataAppID = errors.New("application ID not found in channel metadatada")
errMetadataDevEUI = errors.New("device EUI not found in thing metadatada")
)
type eventHandler struct {
svc lora.Service
}
// NewEventHandler returns new event store handler.
func NewEventHandler(svc lora.Service) events.EventHandler {
return &eventHandler{
svc: svc,
}
}
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
msg, err := event.Encode()
if err != nil {
return err
}
switch msg["operation"] {
case thingCreate:
cte, derr := decodeCreateThing(msg)
if derr != nil {
err = derr
break
}
err = es.svc.CreateThing(ctx, cte.id, cte.loraDevEUI)
case thingUpdate:
ute, derr := decodeCreateThing(msg)
if derr != nil {
err = derr
break
}
err = es.svc.CreateThing(ctx, ute.id, ute.loraDevEUI)
case channelCreate:
cce, derr := decodeCreateChannel(msg)
if derr != nil {
err = derr
break
}
err = es.svc.CreateChannel(ctx, cce.id, cce.loraAppID)
case channelUpdate:
uce, derr := decodeCreateChannel(msg)
if derr != nil {
err = derr
break
}
err = es.svc.CreateChannel(ctx, uce.id, uce.loraAppID)
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveThing(ctx, rte.id)
case channelRemove:
rce := decodeRemoveChannel(msg)
err = es.svc.RemoveChannel(ctx, rce.id)
case thingConnect:
tce := decodeConnectionThing(msg)
err = es.svc.ConnectThing(ctx, tce.chanID, tce.thingID)
case thingDisconnect:
tde := decodeConnectionThing(msg)
err = es.svc.DisconnectThing(ctx, tde.chanID, tde.thingID)
}
if err != nil && err != errMetadataType {
return err
}
return nil
}
func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) {
strmeta := read(event, "metadata", "{}")
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
return createThingEvent{}, err
}
cte := createThingEvent{
id: read(event, "id", ""),
}
m, ok := metadata[keyType]
if !ok {
return createThingEvent{}, errMetadataType
}
lm, ok := m.(map[string]interface{})
if !ok {
return createThingEvent{}, errMetadataFormat
}
val, ok := lm[keyDevEUI].(string)
if !ok {
return createThingEvent{}, errMetadataDevEUI
}
cte.loraDevEUI = val
return cte, nil
}
func decodeRemoveThing(event map[string]interface{}) removeThingEvent {
return removeThingEvent{
id: read(event, "id", ""),
}
}
func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, error) {
strmeta := read(event, "metadata", "{}")
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
return createChannelEvent{}, err
}
cce := createChannelEvent{
id: read(event, "id", ""),
}
m, ok := metadata[keyType]
if !ok {
return createChannelEvent{}, errMetadataType
}
lm, ok := m.(map[string]interface{})
if !ok {
return createChannelEvent{}, errMetadataFormat
}
val, ok := lm[keyAppID].(string)
if !ok {
return createChannelEvent{}, errMetadataAppID
}
cce.loraAppID = val
return cce, nil
}
func decodeConnectionThing(event map[string]interface{}) connectionThingEvent {
return connectionThingEvent{
chanID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
}
}
func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent {
return removeChannelEvent{
id: read(event, "id", ""),
}
}
func read(event map[string]interface{}, key, def string) string {
val, ok := event[key].(string)
if !ok {
return def
}
return val
}