MF-1055 - rollback/release transaction on error (#1166)
* fix error message Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * release transaction on error Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * reove println Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * move defer func up Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * refactor errors handling Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * refactor errors handling Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * move defer Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
This commit is contained in:
parent
16ac44f377
commit
f482df9033
|
@ -20,7 +20,8 @@ var (
|
|||
// ErrInvalidMessage indicates that service received message that
|
||||
// doesn't fit required format.
|
||||
ErrInvalidMessage = errors.New("invalid message representation")
|
||||
errSaveMessage = errors.New("faled to save message to postgress database")
|
||||
errSaveMessage = errors.New("failed to save message to postgres database")
|
||||
errTransRollback = errors.New("failed to rollback transaction")
|
||||
)
|
||||
|
||||
var _ writers.MessageRepository = (*postgresRepo)(nil)
|
||||
|
@ -34,7 +35,7 @@ func New(db *sqlx.DB) writers.MessageRepository {
|
|||
return &postgresRepo{db: db}
|
||||
}
|
||||
|
||||
func (pr postgresRepo) Save(messages ...senml.Message) error {
|
||||
func (pr postgresRepo) Save(messages ...senml.Message) (err error) {
|
||||
q := `INSERT INTO messages (id, channel, subtopic, publisher, protocol,
|
||||
name, unit, value, string_value, bool_value, data_value, sum,
|
||||
time, update_time)
|
||||
|
@ -46,6 +47,19 @@ func (pr postgresRepo) Save(messages ...senml.Message) error {
|
|||
if err != nil {
|
||||
return errors.Wrap(errSaveMessage, err)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if txErr := tx.Rollback(); txErr != nil {
|
||||
err = errors.Wrap(err, errors.Wrap(errTransRollback, txErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
err = errors.Wrap(errSaveMessage, err)
|
||||
}
|
||||
return
|
||||
}()
|
||||
|
||||
for _, msg := range messages {
|
||||
dbth, err := toDBMessage(msg)
|
||||
|
@ -65,10 +79,7 @@ func (pr postgresRepo) Save(messages ...senml.Message) error {
|
|||
return errors.Wrap(errSaveMessage, err)
|
||||
}
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.Wrap(errSaveMessage, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
type dbMessage struct {
|
||||
|
|
Loading…
Reference in New Issue