487 lines
14 KiB
Go
487 lines
14 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package bootstrap
|
|
|
|
import (
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"encoding/hex"
|
|
"time"
|
|
|
|
"github.com/mainflux/mainflux"
|
|
"github.com/mainflux/mainflux/pkg/errors"
|
|
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
|
|
)
|
|
|
|
var (
|
|
// ErrThings indicates failure to communicate with Mainflux Things service.
|
|
// It can be due to networking error or invalid/unauthenticated request.
|
|
ErrThings = errors.New("failed to receive response from Things service")
|
|
|
|
// ErrExternalKey indicates a non-existent bootstrap configuration for given external key.
|
|
ErrExternalKey = errors.New("failed to get bootstrap configuration for given external key")
|
|
|
|
// ErrExternalKeySecure indicates error in getting bootstrap configuration for given encrypted external key.
|
|
ErrExternalKeySecure = errors.New("failed to get bootstrap configuration for given encrypted external key")
|
|
|
|
// ErrBootstrap indicates error in getting bootstrap configuration.
|
|
ErrBootstrap = errors.New("failed to read bootstrap configuration")
|
|
|
|
errAddBootstrap = errors.New("failed to add bootstrap configuration")
|
|
errUpdateConnections = errors.New("failed to update connections")
|
|
errRemoveBootstrap = errors.New("failed to remove bootstrap configuration")
|
|
errChangeState = errors.New("failed to change state of bootstrap configuration")
|
|
errUpdateChannel = errors.New("failed to update channel")
|
|
errRemoveConfig = errors.New("failed to remove bootstrap configuration")
|
|
errRemoveChannel = errors.New("failed to remove channel")
|
|
errCreateThing = errors.New("failed to create thing")
|
|
errDisconnectThing = errors.New("failed to disconnect thing")
|
|
errCheckChannels = errors.New("failed to check if channels exists")
|
|
errConnectionChannels = errors.New("failed to check channels connections")
|
|
errThingNotFound = errors.New("failed to find thing")
|
|
errUpdateCert = errors.New("failed to update cert")
|
|
)
|
|
|
|
var _ Service = (*bootstrapService)(nil)
|
|
|
|
// Service specifies an API that must be fulfilled by the domain service
|
|
// implementation, and all of its decorators (e.g. logging & metrics).
|
|
type Service interface {
|
|
// Add adds new Thing Config to the user identified by the provided token.
|
|
Add(ctx context.Context, token string, cfg Config) (Config, error)
|
|
|
|
// View returns Thing Config with given ID belonging to the user identified by the given token.
|
|
View(ctx context.Context, token, id string) (Config, error)
|
|
|
|
// Update updates editable fields of the provided Config.
|
|
Update(ctx context.Context, token string, cfg Config) error
|
|
|
|
// UpdateCert updates an existing Config certificate and token.
|
|
// A non-nil error is returned to indicate operation failure.
|
|
UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) (Config, error)
|
|
|
|
// UpdateConnections updates list of Channels related to given Config.
|
|
UpdateConnections(ctx context.Context, token, id string, connections []string) error
|
|
|
|
// List returns subset of Configs with given search params that belong to the
|
|
// user identified by the given token.
|
|
List(ctx context.Context, token string, filter Filter, offset, limit uint64) (ConfigsPage, error)
|
|
|
|
// Remove removes Config with specified token that belongs to the user identified by the given token.
|
|
Remove(ctx context.Context, token, id string) error
|
|
|
|
// Bootstrap returns Config to the Thing with provided external ID using external key.
|
|
Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (Config, error)
|
|
|
|
// ChangeState changes state of the Thing with given ID and owner.
|
|
ChangeState(ctx context.Context, token, id string, state State) error
|
|
|
|
// Methods RemoveConfig, UpdateChannel, and RemoveChannel are used as
|
|
// handlers for events. That's why these methods surpass ownership check.
|
|
|
|
// UpdateChannelHandler updates Channel with data received from an event.
|
|
UpdateChannelHandler(ctx context.Context, channel Channel) error
|
|
|
|
// RemoveConfigHandler removes Configuration with id received from an event.
|
|
RemoveConfigHandler(ctx context.Context, id string) error
|
|
|
|
// RemoveChannelHandler removes Channel with id received from an event.
|
|
RemoveChannelHandler(ctx context.Context, id string) error
|
|
|
|
// DisconnectHandler changes state of the Config when connect/disconnect event occurs.
|
|
DisconnectThingHandler(ctx context.Context, channelID, thingID string) error
|
|
}
|
|
|
|
// ConfigReader is used to parse Config into format which will be encoded
|
|
// as a JSON and consumed from the client side. The purpose of this interface
|
|
// is to provide convenient way to generate custom configuration response
|
|
// based on the specific Config which will be consumed by the client.
|
|
type ConfigReader interface {
|
|
ReadConfig(Config, bool) (interface{}, error)
|
|
}
|
|
|
|
type bootstrapService struct {
|
|
auth mainflux.AuthServiceClient
|
|
configs ConfigRepository
|
|
sdk mfsdk.SDK
|
|
encKey []byte
|
|
}
|
|
|
|
// New returns new Bootstrap service.
|
|
func New(auth mainflux.AuthServiceClient, configs ConfigRepository, sdk mfsdk.SDK, encKey []byte) Service {
|
|
return &bootstrapService{
|
|
configs: configs,
|
|
sdk: sdk,
|
|
auth: auth,
|
|
encKey: encKey,
|
|
}
|
|
}
|
|
|
|
func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (Config, error) {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return Config{}, err
|
|
}
|
|
|
|
toConnect := bs.toIDList(cfg.Channels)
|
|
|
|
// Check if channels exist. This is the way to prevent fetching channels that already exist.
|
|
existing, err := bs.configs.ListExisting(ctx, owner, toConnect)
|
|
if err != nil {
|
|
return Config{}, errors.Wrap(errCheckChannels, err)
|
|
}
|
|
|
|
cfg.Channels, err = bs.connectionChannels(toConnect, bs.toIDList(existing), token)
|
|
|
|
if err != nil {
|
|
return Config{}, errors.Wrap(errConnectionChannels, err)
|
|
}
|
|
|
|
id := cfg.ThingID
|
|
mfThing, err := bs.thing(id, token)
|
|
if err != nil {
|
|
return Config{}, errors.Wrap(errThingNotFound, err)
|
|
}
|
|
|
|
cfg.ThingID = mfThing.ID
|
|
cfg.Owner = owner
|
|
cfg.State = Inactive
|
|
cfg.ThingKey = mfThing.Credentials.Secret
|
|
|
|
saved, err := bs.configs.Save(ctx, cfg, toConnect)
|
|
if err != nil {
|
|
if id == "" {
|
|
if _, errT := bs.sdk.DisableThing(cfg.ThingID, token); errT != nil {
|
|
err = errors.Wrap(err, errT)
|
|
}
|
|
}
|
|
return Config{}, errors.Wrap(errAddBootstrap, err)
|
|
}
|
|
|
|
cfg.ThingID = saved
|
|
cfg.Channels = append(cfg.Channels, existing...)
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func (bs bootstrapService) View(ctx context.Context, token, id string) (Config, error) {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return Config{}, err
|
|
}
|
|
|
|
return bs.configs.RetrieveByID(ctx, owner, id)
|
|
}
|
|
|
|
func (bs bootstrapService) Update(ctx context.Context, token string, cfg Config) error {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cfg.Owner = owner
|
|
|
|
return bs.configs.Update(ctx, cfg)
|
|
}
|
|
|
|
func (bs bootstrapService) UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) (Config, error) {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return Config{}, err
|
|
}
|
|
cfg, err := bs.configs.UpdateCert(ctx, owner, thingID, clientCert, clientKey, caCert)
|
|
if err != nil {
|
|
return Config{}, errors.Wrap(errUpdateCert, err)
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (bs bootstrapService) UpdateConnections(ctx context.Context, token, id string, connections []string) error {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cfg, err := bs.configs.RetrieveByID(ctx, owner, id)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateConnections, err)
|
|
}
|
|
|
|
add, remove := bs.updateList(cfg, connections)
|
|
|
|
// Check if channels exist. This is the way to prevent fetching channels that already exist.
|
|
existing, err := bs.configs.ListExisting(ctx, owner, connections)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateConnections, err)
|
|
}
|
|
|
|
channels, err := bs.connectionChannels(connections, bs.toIDList(existing), token)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateConnections, err)
|
|
}
|
|
|
|
cfg.Channels = channels
|
|
var connect, disconnect []string
|
|
|
|
if cfg.State == Active {
|
|
connect = add
|
|
disconnect = remove
|
|
}
|
|
|
|
for _, c := range disconnect {
|
|
if err := bs.sdk.DisconnectThing(id, c, token); err != nil {
|
|
if errors.Contains(err, errors.ErrNotFound) {
|
|
continue
|
|
}
|
|
return ErrThings
|
|
}
|
|
}
|
|
|
|
for _, c := range connect {
|
|
conIDs := mfsdk.Connection{
|
|
ChannelID: c,
|
|
ThingID: id,
|
|
}
|
|
if err := bs.sdk.Connect(conIDs, token); err != nil {
|
|
return ErrThings
|
|
}
|
|
}
|
|
|
|
return bs.configs.UpdateConnections(ctx, owner, id, channels, connections)
|
|
}
|
|
|
|
func (bs bootstrapService) List(ctx context.Context, token string, filter Filter, offset, limit uint64) (ConfigsPage, error) {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return ConfigsPage{}, err
|
|
}
|
|
|
|
return bs.configs.RetrieveAll(ctx, owner, filter, offset, limit), nil
|
|
}
|
|
|
|
func (bs bootstrapService) Remove(ctx context.Context, token, id string) error {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := bs.configs.Remove(ctx, owner, id); err != nil {
|
|
return errors.Wrap(errRemoveBootstrap, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (Config, error) {
|
|
cfg, err := bs.configs.RetrieveByExternalID(ctx, externalID)
|
|
if err != nil {
|
|
return cfg, errors.Wrap(ErrBootstrap, err)
|
|
}
|
|
if secure {
|
|
dec, err := bs.dec(externalKey)
|
|
if err != nil {
|
|
return Config{}, errors.Wrap(ErrExternalKeySecure, err)
|
|
}
|
|
externalKey = dec
|
|
}
|
|
if cfg.ExternalKey != externalKey {
|
|
return Config{}, ErrExternalKey
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, state State) error {
|
|
owner, err := bs.identify(ctx, token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cfg, err := bs.configs.RetrieveByID(ctx, owner, id)
|
|
if err != nil {
|
|
return errors.Wrap(errChangeState, err)
|
|
}
|
|
|
|
if cfg.State == state {
|
|
return nil
|
|
}
|
|
|
|
switch state {
|
|
case Active:
|
|
for _, c := range cfg.Channels {
|
|
conIDs := mfsdk.Connection{
|
|
ChannelID: c.ID,
|
|
ThingID: cfg.ThingID,
|
|
}
|
|
if err := bs.sdk.Connect(conIDs, token); err != nil {
|
|
return ErrThings
|
|
}
|
|
}
|
|
case Inactive:
|
|
for _, c := range cfg.Channels {
|
|
if err := bs.sdk.DisconnectThing(cfg.ThingID, c.ID, token); err != nil {
|
|
if errors.Contains(err, errors.ErrNotFound) {
|
|
continue
|
|
}
|
|
return ErrThings
|
|
}
|
|
}
|
|
}
|
|
if err := bs.configs.ChangeState(ctx, owner, id, state); err != nil {
|
|
return errors.Wrap(errChangeState, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) UpdateChannelHandler(ctx context.Context, channel Channel) error {
|
|
if err := bs.configs.UpdateChannel(ctx, channel); err != nil {
|
|
return errors.Wrap(errUpdateChannel, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) RemoveConfigHandler(ctx context.Context, id string) error {
|
|
if err := bs.configs.RemoveThing(ctx, id); err != nil {
|
|
return errors.Wrap(errRemoveConfig, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) error {
|
|
if err := bs.configs.RemoveChannel(ctx, id); err != nil {
|
|
return errors.Wrap(errRemoveChannel, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
|
|
if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil {
|
|
return errors.Wrap(errDisconnectThing, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs bootstrapService) identify(ctx context.Context, token string) (string, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
|
defer cancel()
|
|
|
|
res, err := bs.auth.Identify(ctx, &mainflux.IdentityReq{Token: token})
|
|
if err != nil {
|
|
return "", errors.ErrAuthentication
|
|
}
|
|
|
|
return res.GetId(), nil
|
|
}
|
|
|
|
// Method thing retrieves Mainflux Thing creating one if an empty ID is passed.
|
|
func (bs bootstrapService) thing(id, token string) (mfsdk.Thing, error) {
|
|
var thing mfsdk.Thing
|
|
var err error
|
|
var sdkErr errors.SDKError
|
|
|
|
thing.ID = id
|
|
if id == "" {
|
|
thing, sdkErr = bs.sdk.CreateThing(mfsdk.Thing{}, token)
|
|
if err != nil {
|
|
return mfsdk.Thing{}, errors.Wrap(errCreateThing, errors.New(sdkErr.Err().Msg()))
|
|
}
|
|
}
|
|
|
|
thing, sdkErr = bs.sdk.Thing(thing.ID, token)
|
|
if sdkErr != nil {
|
|
err = errors.New(sdkErr.Error())
|
|
if id != "" {
|
|
if _, sdkErr2 := bs.sdk.DisableThing(thing.ID, token); sdkErr2 != nil {
|
|
err = errors.Wrap(errors.New(sdkErr.Msg()), errors.New(sdkErr2.Msg()))
|
|
}
|
|
}
|
|
return mfsdk.Thing{}, errors.Wrap(ErrThings, err)
|
|
}
|
|
|
|
return thing, nil
|
|
}
|
|
|
|
func (bs bootstrapService) connectionChannels(channels, existing []string, token string) ([]Channel, error) {
|
|
add := make(map[string]bool, len(channels))
|
|
for _, ch := range channels {
|
|
add[ch] = true
|
|
}
|
|
|
|
for _, ch := range existing {
|
|
if add[ch] {
|
|
delete(add, ch)
|
|
}
|
|
}
|
|
|
|
var ret []Channel
|
|
for id := range add {
|
|
ch, err := bs.sdk.Channel(id, token)
|
|
if err != nil {
|
|
return nil, errors.Wrap(errors.ErrMalformedEntity, err)
|
|
}
|
|
|
|
ret = append(ret, Channel{
|
|
ID: ch.ID,
|
|
Name: ch.Name,
|
|
Metadata: ch.Metadata,
|
|
})
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
// Method updateList accepts config and channel IDs and returns three lists:
|
|
// 1) IDs of Channels to be added
|
|
// 2) IDs of Channels to be removed
|
|
// 3) IDs of common Channels for these two configs.
|
|
func (bs bootstrapService) updateList(cfg Config, connections []string) (add, remove []string) {
|
|
disconnect := make(map[string]bool, len(cfg.Channels))
|
|
for _, c := range cfg.Channels {
|
|
disconnect[c.ID] = true
|
|
}
|
|
|
|
for _, c := range connections {
|
|
if disconnect[c] {
|
|
// Don't disconnect common elements.
|
|
delete(disconnect, c)
|
|
continue
|
|
}
|
|
// Connect new elements.
|
|
add = append(add, c)
|
|
}
|
|
|
|
for v := range disconnect {
|
|
remove = append(remove, v)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (bs bootstrapService) toIDList(channels []Channel) []string {
|
|
var ret []string
|
|
for _, ch := range channels {
|
|
ret = append(ret, ch.ID)
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func (bs bootstrapService) dec(in string) (string, error) {
|
|
ciphertext, err := hex.DecodeString(in)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
block, err := aes.NewCipher(bs.encKey)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(ciphertext) < aes.BlockSize {
|
|
return "", err
|
|
}
|
|
iv := ciphertext[:aes.BlockSize]
|
|
ciphertext = ciphertext[aes.BlockSize:]
|
|
stream := cipher.NewCFBDecrypter(block, iv)
|
|
stream.XORKeyStream(ciphertext, ciphertext)
|
|
return string(ciphertext), nil
|
|
}
|