Mainflux.mainflux/bootstrap/postgres/configs.go

746 lines
21 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux/bootstrap"
"github.com/mainflux/mainflux/internal/postgres"
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/clients"
"github.com/mainflux/mainflux/pkg/errors"
)
var (
errSaveChannels = errors.New("failed to insert channels to database")
errSaveConnections = errors.New("failed to insert connections to database")
errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database")
errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database")
errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database")
)
const cleanupQuery = `DELETE FROM channels ch WHERE NOT EXISTS (
SELECT channel_id FROM connections c WHERE ch.mainflux_channel = c.channel_id);`
var _ bootstrap.ConfigRepository = (*configRepository)(nil)
type configRepository struct {
db postgres.Database
log mflog.Logger
}
// NewConfigRepository instantiates a PostgreSQL implementation of config
// repository.
func NewConfigRepository(db postgres.Database, log mflog.Logger) bootstrap.ConfigRepository {
return &configRepository{db: db, log: log}
}
func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (string, error) {
q := `INSERT INTO configs (mainflux_thing, owner, name, client_cert, client_key, ca_cert, mainflux_key, external_id, external_key, content, state)
VALUES (:mainflux_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :mainflux_key, :external_id, :external_key, :content, :state)`
tx, err := cr.db.BeginTxx(ctx, nil)
if err != nil {
return "", errors.Wrap(errors.ErrCreateEntity, err)
}
dbcfg := toDBConfig(cfg)
if _, err := tx.NamedExec(q, dbcfg); err != nil {
e := err
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation {
e = errors.ErrConflict
}
cr.rollback("Failed to insert a Config", tx)
return "", errors.Wrap(errors.ErrCreateEntity, e)
}
if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil {
cr.rollback("Failed to insert Channels", tx)
return "", errors.Wrap(errSaveChannels, err)
}
if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil {
cr.rollback("Failed to insert connections", tx)
return "", errors.Wrap(errSaveConnections, err)
}
if err := tx.Commit(); err != nil {
cr.rollback("Failed to commit Config save", tx)
return "", err
}
return cfg.ThingID, nil
}
func (cr configRepository) RetrieveByID(ctx context.Context, owner, id string) (bootstrap.Config, error) {
q := `SELECT mainflux_thing, mainflux_key, external_id, external_key, name, content, state, client_cert, ca_cert
FROM configs
WHERE mainflux_thing = :mainflux_thing AND owner = :owner`
dbcfg := dbConfig{
MFThing: id,
Owner: owner,
}
row, err := cr.db.NamedQueryContext(ctx, q, dbcfg)
if err != nil {
if err == sql.ErrNoRows {
return bootstrap.Config{}, errors.Wrap(errors.ErrNotFound, err)
}
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
if ok := row.Next(); !ok {
return bootstrap.Config{}, errors.Wrap(errors.ErrNotFound, row.Err())
}
if err := row.StructScan(&dbcfg); err != nil {
return bootstrap.Config{}, err
}
q = `SELECT mainflux_channel, name, metadata FROM channels ch
INNER JOIN connections conn
ON ch.mainflux_channel = conn.channel_id AND ch.owner = conn.config_owner
WHERE conn.config_id = :mainflux_thing AND conn.config_owner = :owner`
rows, err := cr.db.NamedQueryContext(ctx, q, dbcfg)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve connected due to %s", err))
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
defer rows.Close()
chans := []bootstrap.Channel{}
for rows.Next() {
dbch := dbChannel{}
if err := rows.StructScan(&dbch); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read connected thing due to %s", err))
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
dbch.Owner = nullString(dbcfg.Owner)
ch, err := toChannel(dbch)
if err != nil {
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
chans = append(chans, ch)
}
cfg := toConfig(dbcfg)
cfg.Channels = chans
return cfg, nil
}
func (cr configRepository) RetrieveAll(ctx context.Context, owner string, filter bootstrap.Filter, offset, limit uint64) bootstrap.ConfigsPage {
search, params := cr.retrieveAll(owner, filter)
n := len(params)
q := `SELECT mainflux_thing, mainflux_key, external_id, external_key, name, content, state
FROM configs %s ORDER BY mainflux_thing LIMIT $%d OFFSET $%d`
q = fmt.Sprintf(q, search, n+1, n+2)
rows, err := cr.db.QueryContext(ctx, q, append(params, limit, offset)...)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err))
return bootstrap.ConfigsPage{}
}
defer rows.Close()
var name, content sql.NullString
configs := []bootstrap.Config{}
for rows.Next() {
c := bootstrap.Config{Owner: owner}
if err := rows.Scan(&c.ThingID, &c.ThingKey, &c.ExternalID, &c.ExternalKey, &name, &content, &c.State); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved config due to %s", err))
return bootstrap.ConfigsPage{}
}
c.Name = name.String
c.Content = content.String
configs = append(configs, c)
}
q = fmt.Sprintf(`SELECT COUNT(*) FROM configs %s`, search)
var total uint64
if err := cr.db.QueryRowxContext(ctx, q, params...).Scan(&total); err != nil {
cr.log.Error(fmt.Sprintf("Failed to count configs due to %s", err))
return bootstrap.ConfigsPage{}
}
return bootstrap.ConfigsPage{
Total: total,
Limit: limit,
Offset: offset,
Configs: configs,
}
}
func (cr configRepository) RetrieveByExternalID(ctx context.Context, externalID string) (bootstrap.Config, error) {
q := `SELECT mainflux_thing, mainflux_key, external_key, owner, name, client_cert, client_key, ca_cert, content, state
FROM configs
WHERE external_id = :external_id`
dbcfg := dbConfig{
ExternalID: externalID,
}
row, err := cr.db.NamedQueryContext(ctx, q, dbcfg)
if err != nil {
if err == sql.ErrNoRows {
return bootstrap.Config{}, errors.Wrap(errors.ErrNotFound, err)
}
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
if ok := row.Next(); !ok {
return bootstrap.Config{}, errors.Wrap(errors.ErrNotFound, row.Err())
}
if err := row.StructScan(&dbcfg); err != nil {
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
q = `SELECT mainflux_channel, name, metadata FROM channels ch
INNER JOIN connections conn
ON ch.mainflux_channel = conn.channel_id AND ch.owner = conn.config_owner
WHERE conn.config_id = :mainflux_thing AND conn.config_owner = :owner`
rows, err := cr.db.NamedQueryContext(ctx, q, dbcfg)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve connected due to %s", err))
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
defer rows.Close()
channels := []bootstrap.Channel{}
for rows.Next() {
dbch := dbChannel{}
if err := rows.StructScan(&dbch); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read connected thing due to %s", err))
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
ch, err := toChannel(dbch)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to deserialize channel due to %s", err))
return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err)
}
channels = append(channels, ch)
}
cfg := toConfig(dbcfg)
cfg.Channels = channels
return cfg, nil
}
func (cr configRepository) Update(ctx context.Context, cfg bootstrap.Config) error {
q := `UPDATE configs SET name = :name, content = :content WHERE mainflux_thing = :mainflux_thing AND owner = :owner `
dbcfg := dbConfig{
Name: nullString(cfg.Name),
Content: nullString(cfg.Content),
MFThing: cfg.ThingID,
Owner: cfg.Owner,
}
res, err := cr.db.NamedExecContext(ctx, q, dbcfg)
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
cnt, err := res.RowsAffected()
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
if cnt == 0 {
return errors.ErrNotFound
}
return nil
}
func (cr configRepository) UpdateCert(ctx context.Context, owner, thingID, clientCert, clientKey, caCert string) (bootstrap.Config, error) {
q := `UPDATE configs SET client_cert = :client_cert, client_key = :client_key, ca_cert = :ca_cert WHERE mainflux_thing = :mainflux_thing AND owner = :owner
RETURNING mainflux_thing, client_cert, client_key, ca_cert`
dbcfg := dbConfig{
MFThing: thingID,
ClientCert: nullString(clientCert),
Owner: owner,
ClientKey: nullString(clientKey),
CaCert: nullString(caCert),
}
row, err := cr.db.NamedQueryContext(ctx, q, dbcfg)
if err != nil {
return bootstrap.Config{}, errors.Wrap(errors.ErrUpdateEntity, err)
}
defer row.Close()
if ok := row.Next(); !ok {
return bootstrap.Config{}, errors.Wrap(errors.ErrNotFound, row.Err())
}
if err := row.StructScan(&dbcfg); err != nil {
return bootstrap.Config{}, err
}
return toConfig(dbcfg), nil
}
func (cr configRepository) UpdateConnections(ctx context.Context, owner, id string, channels []bootstrap.Channel, connections []string) error {
tx, err := cr.db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
if err := insertChannels(ctx, owner, channels, tx); err != nil {
cr.rollback("Failed to insert Channels during the update", tx)
return errors.Wrap(errors.ErrUpdateEntity, err)
}
if err := updateConnections(ctx, owner, id, connections, tx); err != nil {
if e, ok := err.(*pgconn.PgError); ok {
if e.Code == pgerrcode.ForeignKeyViolation {
return errors.ErrNotFound
}
}
cr.rollback("Failed to update connections during the update", tx)
return errors.Wrap(errors.ErrUpdateEntity, err)
}
if err := tx.Commit(); err != nil {
cr.rollback("Failed to commit Config update", tx)
return errors.Wrap(errors.ErrUpdateEntity, err)
}
return nil
}
func (cr configRepository) Remove(ctx context.Context, owner, id string) error {
q := `DELETE FROM configs WHERE mainflux_thing = :mainflux_thing AND owner = :owner`
dbcfg := dbConfig{
MFThing: id,
Owner: owner,
}
if _, err := cr.db.NamedExecContext(ctx, q, dbcfg); err != nil {
return errors.Wrap(errors.ErrRemoveEntity, err)
}
if _, err := cr.db.ExecContext(ctx, cleanupQuery); err != nil {
cr.log.Warn("Failed to clean dangling channels after removal")
}
return nil
}
func (cr configRepository) ChangeState(ctx context.Context, owner, id string, state bootstrap.State) error {
q := `UPDATE configs SET state = :state WHERE mainflux_thing = :mainflux_thing AND owner = :owner;`
dbcfg := dbConfig{
MFThing: id,
State: state,
Owner: owner,
}
res, err := cr.db.NamedExecContext(ctx, q, dbcfg)
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
cnt, err := res.RowsAffected()
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
if cnt == 0 {
return errors.ErrNotFound
}
return nil
}
func (cr configRepository) ListExisting(ctx context.Context, owner string, ids []string) ([]bootstrap.Channel, error) {
var channels []bootstrap.Channel
if len(ids) == 0 {
return channels, nil
}
var chans pgtype.TextArray
if err := chans.Set(ids); err != nil {
return []bootstrap.Channel{}, err
}
q := "SELECT mainflux_channel, name, metadata FROM channels WHERE owner = $1 AND mainflux_channel = ANY ($2)"
rows, err := cr.db.QueryxContext(ctx, q, owner, chans)
if err != nil {
return []bootstrap.Channel{}, errors.Wrap(errors.ErrViewEntity, err)
}
for rows.Next() {
var dbch dbChannel
if err := rows.StructScan(&dbch); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved channels due to %s", err))
return []bootstrap.Channel{}, errors.Wrap(errors.ErrViewEntity, err)
}
ch, err := toChannel(dbch)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to deserialize channel due to %s", err))
return []bootstrap.Channel{}, err
}
channels = append(channels, ch)
}
return channels, nil
}
func (cr configRepository) RemoveThing(ctx context.Context, id string) error {
q := `DELETE FROM configs WHERE mainflux_thing = $1`
_, err := cr.db.ExecContext(ctx, q, id)
if _, err := cr.db.ExecContext(ctx, cleanupQuery); err != nil {
cr.log.Warn("Failed to clean dangling channels after removal")
}
if err != nil {
return errors.Wrap(errors.ErrRemoveEntity, err)
}
return nil
}
func (cr configRepository) UpdateChannel(ctx context.Context, c bootstrap.Channel) error {
dbch, err := toDBChannel("", c)
if err != nil {
return errors.Wrap(errors.ErrUpdateEntity, err)
}
q := `UPDATE channels SET name = :name, metadata = :metadata, updated_at = :updated_at, updated_by = :updated_by
WHERE mainflux_channel = :mainflux_channel`
if _, err = cr.db.NamedExecContext(ctx, q, dbch); err != nil {
return errors.Wrap(errUpdateChannels, err)
}
return nil
}
func (cr configRepository) RemoveChannel(ctx context.Context, id string) error {
q := `DELETE FROM channels WHERE mainflux_channel = $1`
if _, err := cr.db.ExecContext(ctx, q, id); err != nil {
return errors.Wrap(errRemoveChannels, err)
}
return nil
}
func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil {
return errors.Wrap(errDisconnectThing, err)
}
return nil
}
func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (string, []interface{}) {
template := `WHERE owner = $1 %s`
params := []interface{}{owner}
// One empty string so that strings Join works if only one filter is applied.
queries := []string{""}
// Since owner is the first param, start from 2.
counter := 2
for k, v := range filter.FullMatch {
queries = append(queries, fmt.Sprintf("%s = $%d", k, counter))
params = append(params, v)
counter++
}
for k, v := range filter.PartialMatch {
queries = append(queries, fmt.Sprintf("LOWER(%s) LIKE '%%' || $%d || '%%'", k, counter))
params = append(params, v)
counter++
}
f := strings.Join(queries, " AND ")
return fmt.Sprintf(template, f), params
}
func (cr configRepository) rollback(content string, tx *sqlx.Tx) {
if err := tx.Rollback(); err != nil {
cr.log.Error(fmt.Sprintf("Failed to rollback due to %s", err))
}
}
func insertChannels(ctx context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error {
if len(channels) == 0 {
return nil
}
var chans []dbChannel
for _, ch := range channels {
dbch, err := toDBChannel(owner, ch)
if err != nil {
return err
}
chans = append(chans, dbch)
}
q := `INSERT INTO channels (mainflux_channel, owner, name, metadata, parent_id, description, created_at, updated_at, updated_by, status)
VALUES (:mainflux_channel, :owner, :name, :metadata, :parent_id, :description, :created_at, :updated_at, :updated_by, :status)`
if _, err := tx.NamedExec(q, chans); err != nil {
e := err
if pqErr, ok := err.(*pgconn.PgError); ok && pqErr.Code == pgerrcode.UniqueViolation {
e = errors.ErrConflict
}
return e
}
return nil
}
func insertConnections(ctx context.Context, cfg bootstrap.Config, connections []string, tx *sqlx.Tx) error {
if len(connections) == 0 {
return nil
}
q := `INSERT INTO connections (config_id, channel_id, config_owner, channel_owner)
VALUES (:config_id, :channel_id, :config_owner, :channel_owner)`
conns := []dbConnection{}
for _, conn := range connections {
dbconn := dbConnection{
Config: cfg.ThingID,
Channel: conn,
ConfigOwner: cfg.Owner,
ChannelOwner: cfg.Owner,
}
conns = append(conns, dbconn)
}
_, err := tx.NamedExec(q, conns)
return err
}
func updateConnections(ctx context.Context, owner, id string, connections []string, tx *sqlx.Tx) error {
if len(connections) == 0 {
return nil
}
q := `DELETE FROM connections
WHERE config_id = $1 AND config_owner = $2 AND channel_owner = $2
AND channel_id NOT IN ($3)`
var conn pgtype.TextArray
if err := conn.Set(connections); err != nil {
return err
}
res, err := tx.Exec(q, id, owner, conn)
if err != nil {
return err
}
cnt, err := res.RowsAffected()
if err != nil {
return err
}
q = `INSERT INTO connections (config_id, channel_id, config_owner, channel_owner)
VALUES (:config_id, :channel_id, :config_owner, :channel_owner)`
conns := []dbConnection{}
for _, conn := range connections {
dbconn := dbConnection{
Config: id,
Channel: conn,
ConfigOwner: owner,
ChannelOwner: owner,
}
conns = append(conns, dbconn)
}
if _, err := tx.NamedExec(q, conns); err != nil {
return err
}
if cnt == 0 {
return nil
}
_, err = tx.Exec(cleanupQuery)
return err
}
func nullString(s string) sql.NullString {
if s == "" {
return sql.NullString{}
}
return sql.NullString{
String: s,
Valid: true,
}
}
func nullTime(t time.Time) sql.NullTime {
if t.IsZero() {
return sql.NullTime{}
}
return sql.NullTime{
Time: t,
Valid: true,
}
}
type dbConfig struct {
MFThing string `db:"mainflux_thing"`
Owner string `db:"owner"`
Name sql.NullString `db:"name"`
ClientCert sql.NullString `db:"client_cert"`
ClientKey sql.NullString `db:"client_key"`
CaCert sql.NullString `db:"ca_cert"`
MFKey string `db:"mainflux_key"`
ExternalID string `db:"external_id"`
ExternalKey string `db:"external_key"`
Content sql.NullString `db:"content"`
State bootstrap.State `db:"state"`
}
func toDBConfig(cfg bootstrap.Config) dbConfig {
return dbConfig{
MFThing: cfg.ThingID,
Owner: cfg.Owner,
Name: nullString(cfg.Name),
ClientCert: nullString(cfg.ClientCert),
ClientKey: nullString(cfg.ClientKey),
CaCert: nullString(cfg.CACert),
MFKey: cfg.ThingKey,
ExternalID: cfg.ExternalID,
ExternalKey: cfg.ExternalKey,
Content: nullString(cfg.Content),
State: cfg.State,
}
}
func toConfig(dbcfg dbConfig) bootstrap.Config {
cfg := bootstrap.Config{
ThingID: dbcfg.MFThing,
Owner: dbcfg.Owner,
ThingKey: dbcfg.MFKey,
ExternalID: dbcfg.ExternalID,
ExternalKey: dbcfg.ExternalKey,
State: dbcfg.State,
}
if dbcfg.Name.Valid {
cfg.Name = dbcfg.Name.String
}
if dbcfg.Content.Valid {
cfg.Content = dbcfg.Content.String
}
if dbcfg.ClientCert.Valid {
cfg.ClientCert = dbcfg.ClientCert.String
}
if dbcfg.ClientKey.Valid {
cfg.ClientKey = dbcfg.ClientKey.String
}
if dbcfg.CaCert.Valid {
cfg.CACert = dbcfg.CaCert.String
}
return cfg
}
type dbChannel struct {
ID string `db:"mainflux_channel"`
Name sql.NullString `db:"name"`
Owner sql.NullString `db:"owner"`
Metadata string `db:"metadata"`
Parent sql.NullString `db:"parent_id,omitempty"`
Description string `db:"description,omitempty"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt sql.NullTime `db:"updated_at,omitempty"`
UpdatedBy sql.NullString `db:"updated_by,omitempty"`
Status clients.Status `db:"status"`
}
func toDBChannel(owner string, ch bootstrap.Channel) (dbChannel, error) {
dbch := dbChannel{
ID: ch.ID,
Name: nullString(ch.Name),
Owner: nullString(owner),
Parent: nullString(ch.Parent),
Description: ch.Description,
CreatedAt: ch.CreatedAt,
UpdatedAt: nullTime(ch.UpdatedAt),
UpdatedBy: nullString(ch.UpdatedBy),
Status: ch.Status,
}
metadata, err := json.Marshal(ch.Metadata)
if err != nil {
return dbChannel{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
dbch.Metadata = string(metadata)
return dbch, nil
}
func toChannel(dbch dbChannel) (bootstrap.Channel, error) {
ch := bootstrap.Channel{
ID: dbch.ID,
Description: dbch.Description,
CreatedAt: dbch.CreatedAt,
Status: dbch.Status,
}
if dbch.Name.Valid {
ch.Name = dbch.Name.String
}
if dbch.Owner.Valid {
ch.Owner = dbch.Owner.String
}
if dbch.Parent.Valid {
ch.Parent = dbch.Parent.String
}
if dbch.UpdatedBy.Valid {
ch.UpdatedBy = dbch.UpdatedBy.String
}
if dbch.UpdatedAt.Valid {
ch.UpdatedAt = dbch.UpdatedAt.Time
}
if err := json.Unmarshal([]byte(dbch.Metadata), &ch.Metadata); err != nil {
return bootstrap.Channel{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
return ch, nil
}
type dbConnection struct {
Config string `db:"config_id"`
Channel string `db:"channel_id"`
ConfigOwner string `db:"config_owner"`
ChannelOwner string `db:"channel_owner"`
}