MF-715 - Conflict on updating connection with a valid list of channels (#716)
* Add check if Channels already exist Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update streams tests to use time offset Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
parent
2f17a05b3e
commit
d9d9da49a8
|
@ -171,6 +171,7 @@ func (crm *configRepositoryMock) UpdateConnections(key, id string, channels []bo
|
|||
if !ok {
|
||||
return bootstrap.ErrNotFound
|
||||
}
|
||||
|
||||
for _, ch := range channels {
|
||||
crm.channels[ch.ID] = ch
|
||||
}
|
||||
|
|
|
@ -331,14 +331,17 @@ func (cr configRepository) ChangeState(key, id string, state bootstrap.State) er
|
|||
}
|
||||
|
||||
func (cr configRepository) ListExisting(key string, ids []string) ([]bootstrap.Channel, error) {
|
||||
q := "SELECT mainflux_channel, name, metadata FROM channels WHERE owner = $1 AND mainflux_channel = ANY ($2)"
|
||||
var channels []bootstrap.Channel
|
||||
if len(ids) == 0 {
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
q := "SELECT mainflux_channel, name, metadata FROM channels WHERE owner = $1 AND mainflux_channel = ANY ($2)"
|
||||
rows, err := cr.db.Queryx(q, key, pq.Array(ids))
|
||||
if err != nil {
|
||||
return []bootstrap.Channel{}, err
|
||||
}
|
||||
|
||||
var channels []bootstrap.Channel
|
||||
for rows.Next() {
|
||||
var dbch dbChannel
|
||||
if err := rows.StructScan(&dbch); err != nil {
|
||||
|
@ -545,8 +548,7 @@ func updateConnections(key, id string, connections []string, tx *sqlx.Tx) error
|
|||
}
|
||||
|
||||
q = `INSERT INTO connections (config_id, channel_id, config_owner, channel_owner)
|
||||
VALUES (:config_id, :channel_id, :config_owner, :channel_owner)
|
||||
ON CONFLICT (config_id, config_owner, channel_id, channel_owner) DO NOTHING`
|
||||
VALUES (:config_id, :channel_id, :config_owner, :channel_owner)`
|
||||
|
||||
conns := []dbConnection{}
|
||||
for _, conn := range connections {
|
||||
|
|
|
@ -339,7 +339,7 @@ func TestUpdateConnections(t *testing.T) {
|
|||
c.ExternalID = id
|
||||
c.ExternalKey = id
|
||||
c.MFChannels = []bootstrap.Channel{}
|
||||
_, err = repo.Save(c, []string{channels[0]})
|
||||
c2, err := repo.Save(c, []string{channels[0]})
|
||||
require.Nil(t, err, fmt.Sprintf("Saving a config expected to succeed: %s.\n", err))
|
||||
|
||||
cases := []struct {
|
||||
|
@ -366,6 +366,14 @@ func TestUpdateConnections(t *testing.T) {
|
|||
connections: []string{channels[1]},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "update connections with existing channels",
|
||||
key: config.Owner,
|
||||
id: c2,
|
||||
channels: nil,
|
||||
connections: channels,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "update connections no channels",
|
||||
key: config.Owner,
|
||||
|
|
|
@ -29,12 +29,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
streamID = "mainflux.bootstrap"
|
||||
email = "user@example.com"
|
||||
validToken = "validToken"
|
||||
unknownID = "1"
|
||||
unknownKey = "2"
|
||||
channelsNum = 3
|
||||
streamID = "mainflux.bootstrap"
|
||||
email = "user@example.com"
|
||||
validToken = "validToken"
|
||||
unknownID = "1"
|
||||
unknownKey = "2"
|
||||
channelsNum = 3
|
||||
defaultTimout = 5
|
||||
|
||||
configPrefix = "config."
|
||||
configCreate = configPrefix + "create"
|
||||
|
@ -125,7 +126,7 @@ func TestAdd(t *testing.T) {
|
|||
"channels": strings.Join(channels, ", "),
|
||||
"external_id": config.ExternalID,
|
||||
"content": config.Content,
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": configCreate,
|
||||
},
|
||||
},
|
||||
|
@ -156,7 +157,7 @@ func TestAdd(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,7 +218,7 @@ func TestUpdate(t *testing.T) {
|
|||
"thing_id": modified.MFThing,
|
||||
"name": modified.Name,
|
||||
"content": modified.Content,
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": configUpdate,
|
||||
},
|
||||
},
|
||||
|
@ -248,7 +249,7 @@ func TestUpdate(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -281,7 +282,7 @@ func TestUpdateConnections(t *testing.T) {
|
|||
event: map[string]interface{}{
|
||||
"thing_id": saved.MFThing,
|
||||
"channels": "2",
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": thingUpdateConnections,
|
||||
},
|
||||
},
|
||||
|
@ -313,7 +314,7 @@ func TestUpdateConnections(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
func TestList(t *testing.T) {
|
||||
|
@ -363,7 +364,7 @@ func TestRemove(t *testing.T) {
|
|||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"thing_id": saved.MFThing,
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": configRemove,
|
||||
},
|
||||
},
|
||||
|
@ -394,7 +395,7 @@ func TestRemove(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -427,7 +428,7 @@ func TestBootstrap(t *testing.T) {
|
|||
event: map[string]interface{}{
|
||||
"external_id": saved.ExternalID,
|
||||
"success": "1",
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": thingBootstrap,
|
||||
},
|
||||
},
|
||||
|
@ -439,7 +440,7 @@ func TestBootstrap(t *testing.T) {
|
|||
event: map[string]interface{}{
|
||||
"external_id": saved.ExternalID,
|
||||
"success": "0",
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": thingBootstrap,
|
||||
},
|
||||
},
|
||||
|
@ -463,7 +464,7 @@ func TestBootstrap(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -498,7 +499,7 @@ func TestChangeState(t *testing.T) {
|
|||
event: map[string]interface{}{
|
||||
"thing_id": saved.MFThing,
|
||||
"state": bootstrap.Active.String(),
|
||||
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
|
||||
"timestamp": time.Now().Unix(),
|
||||
"operation": thingStateChange,
|
||||
},
|
||||
},
|
||||
|
@ -530,6 +531,19 @@ func TestChangeState(t *testing.T) {
|
|||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
test(t, tc.event, event, tc.desc)
|
||||
}
|
||||
}
|
||||
|
||||
func test(t *testing.T, expected, actual map[string]interface{}, description string) {
|
||||
if expected != nil && actual != nil {
|
||||
ts1 := expected["timestamp"].(int64)
|
||||
ts2, err := strconv.ParseInt(actual["timestamp"].(string), 10, 64)
|
||||
require.Nil(t, err, fmt.Sprintf("%s: expected to get a valid timestamp, got %s", description, err))
|
||||
val := ts1 == ts2 || ts2 <= ts1+defaultTimout
|
||||
assert.True(t, val, fmt.Sprintf("%s: timestamp is not in valid range", description))
|
||||
delete(expected, "timestamp")
|
||||
delete(actual, "timestamp")
|
||||
assert.Equal(t, expected, actual, fmt.Sprintf("%s: expected %v got %v\n", description, expected, actual))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ func (bs bootstrapService) Add(key string, cfg Config) (Config, error) {
|
|||
|
||||
toConnect := bs.toIDList(cfg.MFChannels)
|
||||
|
||||
// Check if channels exist. This is the way to prevent invalid configuration to be saved.
|
||||
// Check if channels exist. This is the way to prevent fetching channels that already exist.
|
||||
existing, err := bs.configs.ListExisting(owner, toConnect)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
|
@ -187,7 +187,14 @@ func (bs bootstrapService) UpdateConnections(key, id string, connections []strin
|
|||
}
|
||||
|
||||
add, remove := bs.updateList(cfg, connections)
|
||||
channels, err := bs.updateChannels(add, key)
|
||||
|
||||
// Check if channels exist. This is the way to prevent fetching channels that already exist.
|
||||
existing, err := bs.configs.ListExisting(owner, connections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels, err := bs.connectionChannels(connections, bs.toIDList(existing), key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -347,6 +354,35 @@ func (bs bootstrapService) thing(key, id string) (mfsdk.Thing, error) {
|
|||
return thing, nil
|
||||
}
|
||||
|
||||
func (bs bootstrapService) connectionChannels(channels, existing []string, key string) ([]Channel, error) {
|
||||
add := make(map[string]bool, len(channels))
|
||||
for _, ch := range channels {
|
||||
add[ch] = true
|
||||
}
|
||||
|
||||
for _, ch := range existing {
|
||||
if add[ch] == true {
|
||||
delete(add, ch)
|
||||
}
|
||||
}
|
||||
|
||||
var ret []Channel
|
||||
for id := range add {
|
||||
ch, err := bs.sdk.Channel(id, key)
|
||||
if err != nil {
|
||||
return nil, ErrMalformedEntity
|
||||
}
|
||||
|
||||
ret = append(ret, Channel{
|
||||
ID: ch.ID,
|
||||
Name: ch.Name,
|
||||
Metadata: ch.Metadata,
|
||||
})
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Method updateList accepts config and channel IDs and returns three lists:
|
||||
// 1) IDs of Channels to be added
|
||||
// 2) IDs of Channels to be removed
|
||||
|
@ -383,50 +419,3 @@ func (bs bootstrapService) toIDList(channels []Channel) []string {
|
|||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (bs bootstrapService) updateChannels(add []string, key string) ([]Channel, error) {
|
||||
var ret []Channel
|
||||
for _, id := range add {
|
||||
ch, err := bs.sdk.Channel(id, key)
|
||||
if err != nil {
|
||||
return []Channel{}, ErrMalformedEntity
|
||||
}
|
||||
|
||||
ret = append(ret, Channel{
|
||||
ID: ch.ID,
|
||||
Name: ch.Name,
|
||||
Metadata: ch.Metadata,
|
||||
})
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (bs bootstrapService) connectionChannels(channels, existing []string, key string) ([]Channel, error) {
|
||||
add := make(map[string]bool, len(channels))
|
||||
for _, ch := range channels {
|
||||
add[ch] = true
|
||||
}
|
||||
|
||||
for _, ch := range existing {
|
||||
if add[ch] == true {
|
||||
delete(add, ch)
|
||||
}
|
||||
}
|
||||
|
||||
var ret []Channel
|
||||
for id := range add {
|
||||
ch, err := bs.sdk.Channel(id, key)
|
||||
if err != nil {
|
||||
return nil, ErrMalformedEntity
|
||||
}
|
||||
|
||||
ret = append(ret, Channel{
|
||||
ID: ch.ID,
|
||||
Name: ch.Name,
|
||||
Metadata: ch.Metadata,
|
||||
})
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue