197 lines
4.3 KiB
Go
197 lines
4.3 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package events
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
|
|
"github.com/mainflux/mainflux/lora"
|
|
"github.com/mainflux/mainflux/pkg/events"
|
|
)
|
|
|
|
const (
|
|
keyType = "lora"
|
|
keyDevEUI = "dev_eui"
|
|
keyAppID = "app_id"
|
|
|
|
thingPrefix = "thing."
|
|
thingCreate = thingPrefix + "create"
|
|
thingUpdate = thingPrefix + "update"
|
|
thingRemove = thingPrefix + "remove"
|
|
thingConnect = thingPrefix + "connect"
|
|
thingDisconnect = thingPrefix + "disconnect"
|
|
|
|
channelPrefix = "channel."
|
|
channelCreate = channelPrefix + "create"
|
|
channelUpdate = channelPrefix + "update"
|
|
channelRemove = channelPrefix + "remove"
|
|
)
|
|
|
|
var (
|
|
errMetadataType = errors.New("field lora is missing in the metadata")
|
|
|
|
errMetadataFormat = errors.New("malformed metadata")
|
|
|
|
errMetadataAppID = errors.New("application ID not found in channel metadatada")
|
|
|
|
errMetadataDevEUI = errors.New("device EUI not found in thing metadatada")
|
|
)
|
|
|
|
type eventHandler struct {
|
|
svc lora.Service
|
|
}
|
|
|
|
// NewEventHandler returns new event store handler.
|
|
func NewEventHandler(svc lora.Service) events.EventHandler {
|
|
return &eventHandler{
|
|
svc: svc,
|
|
}
|
|
}
|
|
|
|
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
|
|
msg, err := event.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch msg["operation"] {
|
|
case thingCreate:
|
|
cte, derr := decodeCreateThing(msg)
|
|
if derr != nil {
|
|
err = derr
|
|
break
|
|
}
|
|
err = es.svc.CreateThing(ctx, cte.id, cte.loraDevEUI)
|
|
case thingUpdate:
|
|
ute, derr := decodeCreateThing(msg)
|
|
if derr != nil {
|
|
err = derr
|
|
break
|
|
}
|
|
err = es.svc.CreateThing(ctx, ute.id, ute.loraDevEUI)
|
|
|
|
case channelCreate:
|
|
cce, derr := decodeCreateChannel(msg)
|
|
if derr != nil {
|
|
err = derr
|
|
break
|
|
}
|
|
err = es.svc.CreateChannel(ctx, cce.id, cce.loraAppID)
|
|
case channelUpdate:
|
|
uce, derr := decodeCreateChannel(msg)
|
|
if derr != nil {
|
|
err = derr
|
|
break
|
|
}
|
|
err = es.svc.CreateChannel(ctx, uce.id, uce.loraAppID)
|
|
case thingRemove:
|
|
rte := decodeRemoveThing(msg)
|
|
err = es.svc.RemoveThing(ctx, rte.id)
|
|
case channelRemove:
|
|
rce := decodeRemoveChannel(msg)
|
|
err = es.svc.RemoveChannel(ctx, rce.id)
|
|
case thingConnect:
|
|
tce := decodeConnectionThing(msg)
|
|
err = es.svc.ConnectThing(ctx, tce.chanID, tce.thingID)
|
|
case thingDisconnect:
|
|
tde := decodeConnectionThing(msg)
|
|
err = es.svc.DisconnectThing(ctx, tde.chanID, tde.thingID)
|
|
}
|
|
if err != nil && err != errMetadataType {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) {
|
|
strmeta := read(event, "metadata", "{}")
|
|
var metadata map[string]interface{}
|
|
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
|
|
return createThingEvent{}, err
|
|
}
|
|
|
|
cte := createThingEvent{
|
|
id: read(event, "id", ""),
|
|
}
|
|
|
|
m, ok := metadata[keyType]
|
|
if !ok {
|
|
return createThingEvent{}, errMetadataType
|
|
}
|
|
|
|
lm, ok := m.(map[string]interface{})
|
|
if !ok {
|
|
return createThingEvent{}, errMetadataFormat
|
|
}
|
|
|
|
val, ok := lm[keyDevEUI].(string)
|
|
if !ok {
|
|
return createThingEvent{}, errMetadataDevEUI
|
|
}
|
|
|
|
cte.loraDevEUI = val
|
|
return cte, nil
|
|
}
|
|
|
|
func decodeRemoveThing(event map[string]interface{}) removeThingEvent {
|
|
return removeThingEvent{
|
|
id: read(event, "id", ""),
|
|
}
|
|
}
|
|
|
|
func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, error) {
|
|
strmeta := read(event, "metadata", "{}")
|
|
var metadata map[string]interface{}
|
|
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
|
|
return createChannelEvent{}, err
|
|
}
|
|
|
|
cce := createChannelEvent{
|
|
id: read(event, "id", ""),
|
|
}
|
|
|
|
m, ok := metadata[keyType]
|
|
if !ok {
|
|
return createChannelEvent{}, errMetadataType
|
|
}
|
|
|
|
lm, ok := m.(map[string]interface{})
|
|
if !ok {
|
|
return createChannelEvent{}, errMetadataFormat
|
|
}
|
|
|
|
val, ok := lm[keyAppID].(string)
|
|
if !ok {
|
|
return createChannelEvent{}, errMetadataAppID
|
|
}
|
|
|
|
cce.loraAppID = val
|
|
return cce, nil
|
|
}
|
|
|
|
func decodeConnectionThing(event map[string]interface{}) connectionThingEvent {
|
|
return connectionThingEvent{
|
|
chanID: read(event, "chan_id", ""),
|
|
thingID: read(event, "thing_id", ""),
|
|
}
|
|
}
|
|
|
|
func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent {
|
|
return removeChannelEvent{
|
|
id: read(event, "id", ""),
|
|
}
|
|
}
|
|
|
|
func read(event map[string]interface{}, key, def string) string {
|
|
val, ok := event[key].(string)
|
|
if !ok {
|
|
return def
|
|
}
|
|
|
|
return val
|
|
}
|