// Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 package bootstrap import ( "context" "crypto/aes" "crypto/cipher" "encoding/hex" "time" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/pkg/errors" mfsdk "github.com/mainflux/mainflux/pkg/sdk/go" ) var ( // ErrThings indicates failure to communicate with Mainflux Things service. // It can be due to networking error or invalid/unauthenticated request. ErrThings = errors.New("failed to receive response from Things service") // ErrExternalKey indicates a non-existent bootstrap configuration for given external key. ErrExternalKey = errors.New("failed to get bootstrap configuration for given external key") // ErrExternalKeySecure indicates error in getting bootstrap configuration for given encrypted external key. ErrExternalKeySecure = errors.New("failed to get bootstrap configuration for given encrypted external key") // ErrBootstrap indicates error in getting bootstrap configuration. ErrBootstrap = errors.New("failed to read bootstrap configuration") errAddBootstrap = errors.New("failed to add bootstrap configuration") errUpdateConnections = errors.New("failed to update connections") errRemoveBootstrap = errors.New("failed to remove bootstrap configuration") errChangeState = errors.New("failed to change state of bootstrap configuration") errUpdateChannel = errors.New("failed to update channel") errRemoveConfig = errors.New("failed to remove bootstrap configuration") errRemoveChannel = errors.New("failed to remove channel") errCreateThing = errors.New("failed to create thing") errDisconnectThing = errors.New("failed to disconnect thing") errCheckChannels = errors.New("failed to check if channels exists") errConnectionChannels = errors.New("failed to check channels connections") errThingNotFound = errors.New("failed to find thing") errUpdateCert = errors.New("failed to update cert") ) var _ Service = (*bootstrapService)(nil) // Service specifies an API that must be fulfilled by the domain service // implementation, and all of its decorators (e.g. logging & metrics). type Service interface { // Add adds new Thing Config to the user identified by the provided token. Add(ctx context.Context, token string, cfg Config) (Config, error) // View returns Thing Config with given ID belonging to the user identified by the given token. View(ctx context.Context, token, id string) (Config, error) // Update updates editable fields of the provided Config. Update(ctx context.Context, token string, cfg Config) error // UpdateCert updates an existing Config certificate and token. // A non-nil error is returned to indicate operation failure. UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) (Config, error) // UpdateConnections updates list of Channels related to given Config. UpdateConnections(ctx context.Context, token, id string, connections []string) error // List returns subset of Configs with given search params that belong to the // user identified by the given token. List(ctx context.Context, token string, filter Filter, offset, limit uint64) (ConfigsPage, error) // Remove removes Config with specified token that belongs to the user identified by the given token. Remove(ctx context.Context, token, id string) error // Bootstrap returns Config to the Thing with provided external ID using external key. Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (Config, error) // ChangeState changes state of the Thing with given ID and owner. ChangeState(ctx context.Context, token, id string, state State) error // Methods RemoveConfig, UpdateChannel, and RemoveChannel are used as // handlers for events. That's why these methods surpass ownership check. // UpdateChannelHandler updates Channel with data received from an event. UpdateChannelHandler(ctx context.Context, channel Channel) error // RemoveConfigHandler removes Configuration with id received from an event. RemoveConfigHandler(ctx context.Context, id string) error // RemoveChannelHandler removes Channel with id received from an event. RemoveChannelHandler(ctx context.Context, id string) error // DisconnectHandler changes state of the Config when connect/disconnect event occurs. DisconnectThingHandler(ctx context.Context, channelID, thingID string) error } // ConfigReader is used to parse Config into format which will be encoded // as a JSON and consumed from the client side. The purpose of this interface // is to provide convenient way to generate custom configuration response // based on the specific Config which will be consumed by the client. type ConfigReader interface { ReadConfig(Config, bool) (interface{}, error) } type bootstrapService struct { auth mainflux.AuthServiceClient configs ConfigRepository sdk mfsdk.SDK encKey []byte } // New returns new Bootstrap service. func New(auth mainflux.AuthServiceClient, configs ConfigRepository, sdk mfsdk.SDK, encKey []byte) Service { return &bootstrapService{ configs: configs, sdk: sdk, auth: auth, encKey: encKey, } } func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (Config, error) { owner, err := bs.identify(ctx, token) if err != nil { return Config{}, err } toConnect := bs.toIDList(cfg.Channels) // Check if channels exist. This is the way to prevent fetching channels that already exist. existing, err := bs.configs.ListExisting(ctx, owner, toConnect) if err != nil { return Config{}, errors.Wrap(errCheckChannels, err) } cfg.Channels, err = bs.connectionChannels(toConnect, bs.toIDList(existing), token) if err != nil { return Config{}, errors.Wrap(errConnectionChannels, err) } id := cfg.ThingID mfThing, err := bs.thing(id, token) if err != nil { return Config{}, errors.Wrap(errThingNotFound, err) } cfg.ThingID = mfThing.ID cfg.Owner = owner cfg.State = Inactive cfg.ThingKey = mfThing.Credentials.Secret saved, err := bs.configs.Save(ctx, cfg, toConnect) if err != nil { if id == "" { if _, errT := bs.sdk.DisableThing(cfg.ThingID, token); errT != nil { err = errors.Wrap(err, errT) } } return Config{}, errors.Wrap(errAddBootstrap, err) } cfg.ThingID = saved cfg.Channels = append(cfg.Channels, existing...) return cfg, nil } func (bs bootstrapService) View(ctx context.Context, token, id string) (Config, error) { owner, err := bs.identify(ctx, token) if err != nil { return Config{}, err } return bs.configs.RetrieveByID(ctx, owner, id) } func (bs bootstrapService) Update(ctx context.Context, token string, cfg Config) error { owner, err := bs.identify(ctx, token) if err != nil { return err } cfg.Owner = owner return bs.configs.Update(ctx, cfg) } func (bs bootstrapService) UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) (Config, error) { owner, err := bs.identify(ctx, token) if err != nil { return Config{}, err } cfg, err := bs.configs.UpdateCert(ctx, owner, thingID, clientCert, clientKey, caCert) if err != nil { return Config{}, errors.Wrap(errUpdateCert, err) } return cfg, nil } func (bs bootstrapService) UpdateConnections(ctx context.Context, token, id string, connections []string) error { owner, err := bs.identify(ctx, token) if err != nil { return err } cfg, err := bs.configs.RetrieveByID(ctx, owner, id) if err != nil { return errors.Wrap(errUpdateConnections, err) } add, remove := bs.updateList(cfg, connections) // Check if channels exist. This is the way to prevent fetching channels that already exist. existing, err := bs.configs.ListExisting(ctx, owner, connections) if err != nil { return errors.Wrap(errUpdateConnections, err) } channels, err := bs.connectionChannels(connections, bs.toIDList(existing), token) if err != nil { return errors.Wrap(errUpdateConnections, err) } cfg.Channels = channels var connect, disconnect []string if cfg.State == Active { connect = add disconnect = remove } for _, c := range disconnect { if err := bs.sdk.DisconnectThing(id, c, token); err != nil { if errors.Contains(err, errors.ErrNotFound) { continue } return ErrThings } } for _, c := range connect { conIDs := mfsdk.Connection{ ChannelID: c, ThingID: id, } if err := bs.sdk.Connect(conIDs, token); err != nil { return ErrThings } } return bs.configs.UpdateConnections(ctx, owner, id, channels, connections) } func (bs bootstrapService) List(ctx context.Context, token string, filter Filter, offset, limit uint64) (ConfigsPage, error) { owner, err := bs.identify(ctx, token) if err != nil { return ConfigsPage{}, err } return bs.configs.RetrieveAll(ctx, owner, filter, offset, limit), nil } func (bs bootstrapService) Remove(ctx context.Context, token, id string) error { owner, err := bs.identify(ctx, token) if err != nil { return err } if err := bs.configs.Remove(ctx, owner, id); err != nil { return errors.Wrap(errRemoveBootstrap, err) } return nil } func (bs bootstrapService) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (Config, error) { cfg, err := bs.configs.RetrieveByExternalID(ctx, externalID) if err != nil { return cfg, errors.Wrap(ErrBootstrap, err) } if secure { dec, err := bs.dec(externalKey) if err != nil { return Config{}, errors.Wrap(ErrExternalKeySecure, err) } externalKey = dec } if cfg.ExternalKey != externalKey { return Config{}, ErrExternalKey } return cfg, nil } func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, state State) error { owner, err := bs.identify(ctx, token) if err != nil { return err } cfg, err := bs.configs.RetrieveByID(ctx, owner, id) if err != nil { return errors.Wrap(errChangeState, err) } if cfg.State == state { return nil } switch state { case Active: for _, c := range cfg.Channels { conIDs := mfsdk.Connection{ ChannelID: c.ID, ThingID: cfg.ThingID, } if err := bs.sdk.Connect(conIDs, token); err != nil { return ErrThings } } case Inactive: for _, c := range cfg.Channels { if err := bs.sdk.DisconnectThing(cfg.ThingID, c.ID, token); err != nil { if errors.Contains(err, errors.ErrNotFound) { continue } return ErrThings } } } if err := bs.configs.ChangeState(ctx, owner, id, state); err != nil { return errors.Wrap(errChangeState, err) } return nil } func (bs bootstrapService) UpdateChannelHandler(ctx context.Context, channel Channel) error { if err := bs.configs.UpdateChannel(ctx, channel); err != nil { return errors.Wrap(errUpdateChannel, err) } return nil } func (bs bootstrapService) RemoveConfigHandler(ctx context.Context, id string) error { if err := bs.configs.RemoveThing(ctx, id); err != nil { return errors.Wrap(errRemoveConfig, err) } return nil } func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) error { if err := bs.configs.RemoveChannel(ctx, id); err != nil { return errors.Wrap(errRemoveChannel, err) } return nil } func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil } func (bs bootstrapService) identify(ctx context.Context, token string) (string, error) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() res, err := bs.auth.Identify(ctx, &mainflux.IdentityReq{Token: token}) if err != nil { return "", errors.ErrAuthentication } return res.GetId(), nil } // Method thing retrieves Mainflux Thing creating one if an empty ID is passed. func (bs bootstrapService) thing(id, token string) (mfsdk.Thing, error) { var thing mfsdk.Thing var err error var sdkErr errors.SDKError thing.ID = id if id == "" { thing, sdkErr = bs.sdk.CreateThing(mfsdk.Thing{}, token) if err != nil { return mfsdk.Thing{}, errors.Wrap(errCreateThing, errors.New(sdkErr.Err().Msg())) } } thing, sdkErr = bs.sdk.Thing(thing.ID, token) if sdkErr != nil { err = errors.New(sdkErr.Error()) if id != "" { if _, sdkErr2 := bs.sdk.DisableThing(thing.ID, token); sdkErr2 != nil { err = errors.Wrap(errors.New(sdkErr.Msg()), errors.New(sdkErr2.Msg())) } } return mfsdk.Thing{}, errors.Wrap(ErrThings, err) } return thing, nil } func (bs bootstrapService) connectionChannels(channels, existing []string, token string) ([]Channel, error) { add := make(map[string]bool, len(channels)) for _, ch := range channels { add[ch] = true } for _, ch := range existing { if add[ch] { delete(add, ch) } } var ret []Channel for id := range add { ch, err := bs.sdk.Channel(id, token) if err != nil { return nil, errors.Wrap(errors.ErrMalformedEntity, err) } ret = append(ret, Channel{ ID: ch.ID, Name: ch.Name, Metadata: ch.Metadata, }) } return ret, nil } // Method updateList accepts config and channel IDs and returns three lists: // 1) IDs of Channels to be added // 2) IDs of Channels to be removed // 3) IDs of common Channels for these two configs. func (bs bootstrapService) updateList(cfg Config, connections []string) (add, remove []string) { disconnect := make(map[string]bool, len(cfg.Channels)) for _, c := range cfg.Channels { disconnect[c.ID] = true } for _, c := range connections { if disconnect[c] { // Don't disconnect common elements. delete(disconnect, c) continue } // Connect new elements. add = append(add, c) } for v := range disconnect { remove = append(remove, v) } return } func (bs bootstrapService) toIDList(channels []Channel) []string { var ret []string for _, ch := range channels { ret = append(ret, ch.ID) } return ret } func (bs bootstrapService) dec(in string) (string, error) { ciphertext, err := hex.DecodeString(in) if err != nil { return "", err } block, err := aes.NewCipher(bs.encKey) if err != nil { return "", err } if len(ciphertext) < aes.BlockSize { return "", err } iv := ciphertext[:aes.BlockSize] ciphertext = ciphertext[aes.BlockSize:] stream := cipher.NewCFBDecrypter(block, iv) stream.XORKeyStream(ciphertext, ciphertext) return string(ciphertext), nil }