Retry sqlx transactions (#2203)

- Add exponential backoff method to MySQL datastore
- Refactor queries using transaction to use backup.

Fixes #845
This commit is contained in:
billcobbler 2020-03-10 20:14:02 -05:00 committed by GitHub
parent 53256917a3
commit 44c40e4255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 218 additions and 265 deletions

1
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go v1.26.8
github.com/beevik/etree v1.1.0
github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5
github.com/cenkalti/backoff/v4 v4.0.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/e-dard/netbug v0.0.0-20151029172837-e64d308a0b20
github.com/elazarl/go-bindata-assetfs v1.0.0

2
go.sum
View File

@ -41,6 +41,8 @@ github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5 h1:osZyZB7J4kE1
github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5/go.mod h1:hw/JEQBIE+c/BLI4aKM8UU8v+ZqrD3h7HC27kKt8JQU=
github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=
github.com/c-bata/go-prompt v0.2.3/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU=
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/crewjam/saml v0.0.0-20190521120225-344d075952c9/go.mod h1:w5eu+HNtubx+kRpQL6QFT2F3yIFfYVe6+EzOFVU7Hko=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -26,10 +26,6 @@ func (ds *Datastore) PendingEmailChange(uid uint, newEmail, token string) error
// ConfirmPendingEmailChange finds email change record, updates user with new email,
// then deletes change record if everything succeeds.
func (ds *Datastore) ConfirmPendingEmailChange(id uint, token string) (newEmail string, err error) {
var (
tx *sqlx.Tx
success bool // indicates all db operations success if true
)
changeRecord := struct {
ID uint
UserID uint `db:"user_id"`
@ -44,42 +40,36 @@ func (ds *Datastore) ConfirmPendingEmailChange(id uint, token string) (newEmail
return "", errors.Wrap(err, "email change")
}
tx, err = ds.db.Beginx()
if err != nil {
return "", errors.Wrap(err, "begin transaction to change email")
}
defer func() {
if success {
if err = tx.Commit(); err == nil {
return // success
}
err = errors.Wrap(err, "commit transaction for email change")
tx.Rollback()
}
}()
query := `
UPDATE users SET
email = ?
WHERE id = ?
err = ds.withRetryTxx(func(tx *sqlx.Tx) error {
query := `
UPDATE users SET
email = ?
WHERE id = ?
`
results, err := tx.Exec(query, changeRecord.NewEmail, changeRecord.UserID)
results, err := tx.Exec(query, changeRecord.NewEmail, changeRecord.UserID)
if err != nil {
return errors.Wrap(err, "updating user's email")
}
rowsAffected, err := results.RowsAffected()
if err != nil {
return errors.Wrap(err, "fetching affected rows updating user's email")
}
if rowsAffected == 0 {
return notFound("User").WithID(changeRecord.UserID)
}
_, err = tx.Exec("DELETE FROM email_changes WHERE id = ?", changeRecord.ID)
if err != nil {
return errors.Wrap(err, "deleting email change")
}
return nil
})
if err != nil {
return "", errors.Wrap(err, "updating user's email")
return "", err
}
rowsAffected, err := results.RowsAffected()
if err != nil {
return "", errors.Wrap(err, "fetching affected rows updating user's email")
}
if rowsAffected == 0 {
return "", notFound("User").WithID(changeRecord.UserID)
}
_, err = tx.Exec("DELETE FROM email_changes WHERE id = ?", changeRecord.ID)
if err != nil {
return "", errors.Wrap(err, "deleting email change")
}
success = true // cause things to be committed in defer func
return changeRecord.NewEmail, nil
return changeRecord.NewEmail, err
}

View File

@ -174,93 +174,79 @@ func (d *Datastore) SaveHost(host *kolide.Host) error {
logger_tls_period = ?
WHERE id = ?
`
tx, err := d.db.Beginx()
if err != nil {
return errors.Wrap(err, "creating transaction")
}
results, err := tx.Exec(sqlStatement,
host.DetailUpdateTime,
host.NodeKey,
host.HostName,
host.UUID,
host.Platform,
host.OsqueryVersion,
host.OSVersion,
host.Uptime,
host.PhysicalMemory,
host.CPUType,
host.CPUSubtype,
host.CPUBrand,
host.CPUPhysicalCores,
host.HardwareVendor,
host.HardwareModel,
host.HardwareVersion,
host.HardwareSerial,
host.ComputerName,
host.PrimaryNetworkInterfaceID,
host.Build,
host.PlatformLike,
host.CodeName,
host.CPULogicalCores,
host.SeenTime,
host.DistributedInterval,
host.ConfigTLSRefresh,
host.LoggerTLSPeriod,
host.ID)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "executing main SQL statement")
}
rowsAffected, err := results.RowsAffected()
if err != nil {
tx.Rollback()
return errors.Wrap(err, "rows affected updating host")
}
if rowsAffected == 0 {
tx.Rollback()
return notFound("Host").WithID(host.ID)
}
host.NetworkInterfaces, err = updateNicsForHost(tx, host)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "updating nics")
}
if err = removedUnusedNics(tx, host); err != nil {
tx.Rollback()
return errors.Wrap(err, "removing unused nics")
}
if needsUpdate := host.ResetPrimaryNetwork(); needsUpdate {
results, err = tx.Exec(
"UPDATE hosts SET primary_ip_id = ? WHERE id = ?",
err := d.withRetryTxx(func(tx *sqlx.Tx) error {
results, err := tx.Exec(sqlStatement,
host.DetailUpdateTime,
host.NodeKey,
host.HostName,
host.UUID,
host.Platform,
host.OsqueryVersion,
host.OSVersion,
host.Uptime,
host.PhysicalMemory,
host.CPUType,
host.CPUSubtype,
host.CPUBrand,
host.CPUPhysicalCores,
host.HardwareVendor,
host.HardwareModel,
host.HardwareVersion,
host.HardwareSerial,
host.ComputerName,
host.PrimaryNetworkInterfaceID,
host.ID,
)
host.Build,
host.PlatformLike,
host.CodeName,
host.CPULogicalCores,
host.SeenTime,
host.DistributedInterval,
host.ConfigTLSRefresh,
host.LoggerTLSPeriod,
host.ID)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "resetting primary network")
return errors.Wrap(err, "executing main SQL statement")
}
rowsAffected, err = results.RowsAffected()
rowsAffected, err := results.RowsAffected()
if err != nil {
tx.Rollback()
return errors.Wrap(err, "rows affected resetting primary network")
return errors.Wrap(err, "rows affected updating host")
}
if rowsAffected == 0 {
tx.Rollback()
return notFound("Host").WithID(host.ID)
}
}
if err = tx.Commit(); err != nil {
tx.Rollback()
return errors.Wrap(err, "committing transaction")
}
return nil
host.NetworkInterfaces, err = updateNicsForHost(tx, host)
if err != nil {
return errors.Wrap(err, "updating nics")
}
if err = removedUnusedNics(tx, host); err != nil {
return errors.Wrap(err, "removing unused nics")
}
if needsUpdate := host.ResetPrimaryNetwork(); needsUpdate {
results, err = tx.Exec(
"UPDATE hosts SET primary_ip_id = ? WHERE id = ?",
host.PrimaryNetworkInterfaceID,
host.ID,
)
if err != nil {
return errors.Wrap(err, "resetting primary network")
}
rowsAffected, err = results.RowsAffected()
if err != nil {
return errors.Wrap(err, "rows affected resetting primary network")
}
if rowsAffected == 0 {
return notFound("Host").WithID(host.ID)
}
}
return nil
})
return err
}
func (d *Datastore) DeleteHost(hid uint) error {

View File

@ -2,7 +2,6 @@ package mysql
import (
"database/sql"
"fmt"
"time"
"github.com/jmoiron/sqlx"
@ -11,25 +10,8 @@ import (
)
func (d *Datastore) ApplyLabelSpecs(specs []*kolide.LabelSpec) (err error) {
tx, err := d.db.Beginx()
if err != nil {
return errors.Wrap(err, "begin ApplyLabelSpecs transaction")
}
defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
}
}
}()
sql := `
err = d.withRetryTxx(func(tx *sqlx.Tx) error {
sql := `
INSERT INTO labels (
name,
description,
@ -45,23 +27,25 @@ func (d *Datastore) ApplyLabelSpecs(specs []*kolide.LabelSpec) (err error) {
label_type = VALUES(label_type),
deleted = false
`
stmt, err := tx.Prepare(sql)
if err != nil {
return errors.Wrap(err, "prepare ApplyLabelSpecs insert")
}
for _, s := range specs {
if s.Name == "" {
return errors.New("label name must not be empty")
}
_, err := stmt.Exec(s.Name, s.Description, s.Query, s.Platform, s.LabelType)
stmt, err := tx.Prepare(sql)
if err != nil {
return errors.Wrap(err, "exec ApplyLabelSpecs insert")
return errors.Wrap(err, "prepare ApplyLabelSpecs insert")
}
}
err = tx.Commit()
return errors.Wrap(err, "commit ApplyLabelSpecs transaction")
for _, s := range specs {
if s.Name == "" {
return errors.New("label name must not be empty")
}
_, err := stmt.Exec(s.Name, s.Description, s.Query, s.Platform, s.LabelType)
if err != nil {
return errors.Wrap(err, "exec ApplyLabelSpecs insert")
}
}
return nil
})
return errors.Wrap(err, "ApplyLabelSpecs transaction")
}
func (d *Datastore) GetLabelSpecs() ([]*kolide.LabelSpec, error) {

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/WatchBeam/clock"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/kit/log"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
@ -50,6 +51,41 @@ func (d *Datastore) getTransaction(opts []kolide.OptionalArg) dbfunctions {
return result
}
type txFn func(*sqlx.Tx) error
// withRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff
func (d *Datastore) withRetryTxx(fn txFn) (err error) {
operation := func() error {
tx, err := d.db.Beginx()
if err != nil {
return errors.Wrap(err, "creating transaction")
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
}
}()
err = fn(tx)
if err != nil {
rbErr := tx.Rollback()
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
}
} else {
err = tx.Commit()
}
return errors.Wrap(err, "committing transaction")
}
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 5 * time.Second
return backoff.Retry(operation, bo)
}
// New creates an MySQL datastore.
func New(config config.MysqlConfig, c clock.Clock, opts ...DBOption) (*Datastore, error) {
options := &dbOptions{

View File

@ -10,33 +10,17 @@ import (
)
func (d *Datastore) ApplyPackSpecs(specs []*kolide.PackSpec) (err error) {
tx, err := d.db.Beginx()
if err != nil {
return errors.Wrap(err, "begin ApplyPackSpec transaction")
}
defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
err = d.withRetryTxx(func(tx *sqlx.Tx) error {
for _, spec := range specs {
if err := applyPackSpec(tx, spec); err != nil {
return errors.Wrapf(err, "applying pack '%s'", spec.Name)
}
}
}()
for _, spec := range specs {
err = applyPackSpec(tx, spec)
if err != nil {
return errors.Wrapf(err, "applying pack '%s'", spec.Name)
}
}
return nil
})
err = tx.Commit()
return errors.Wrap(err, "commit transaction")
return err
}
func applyPackSpec(tx *sqlx.Tx, spec *kolide.PackSpec) error {
@ -121,44 +105,77 @@ func applyPackSpec(tx *sqlx.Tx, spec *kolide.PackSpec) error {
}
func (d *Datastore) GetPackSpecs() (specs []*kolide.PackSpec, err error) {
tx, err := d.db.Beginx()
if err != nil {
return nil, errors.Wrap(err, "begin GetPackSpecs transaction")
}
err = d.withRetryTxx(func(tx *sqlx.Tx) error {
// Get basic specs
query := "SELECT id, name, description, platform FROM packs"
if err := tx.Select(&specs, query); err != nil {
return errors.Wrap(err, "get packs")
}
defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
// Load targets
for _, spec := range specs {
query = `
SELECT l.name
FROM labels l JOIN pack_targets pt
WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id
`
if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil {
return errors.Wrap(err, "get pack targets")
}
}
}()
// Get basic specs
query := "SELECT id, name, description, platform FROM packs"
if err := tx.Select(&specs, query); err != nil {
return nil, errors.Wrap(err, "get packs")
// Load queries
for _, spec := range specs {
query = `
SELECT
query_name, name, description, ` + "`interval`" + `,
snapshot, removed, shard, platform, version
FROM scheduled_queries
WHERE pack_id = ?
`
if err := tx.Select(&spec.Queries, query, spec.ID); err != nil {
return errors.Wrap(err, "get pack queries")
}
}
return nil
})
if err != nil {
return nil, err
}
// Load targets
for _, spec := range specs {
return specs, nil
}
func (d *Datastore) GetPackSpec(name string) (spec *kolide.PackSpec, err error) {
err = d.withRetryTxx(func(tx *sqlx.Tx) error {
// Get basic spec
var specs []*kolide.PackSpec
query := "SELECT id, name, description, platform FROM packs WHERE name = ?"
if err := tx.Select(&specs, query, name); err != nil {
return errors.Wrap(err, "get packs")
}
if len(specs) == 0 {
return notFound("Pack").WithName(name)
}
if len(specs) > 1 {
return errors.Errorf("expected 1 pack row, got %d", len(specs))
}
spec = specs[0]
// Load targets
query = `
SELECT l.name
FROM labels l JOIN pack_targets pt
WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id
`
if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil {
return nil, errors.Wrap(err, "get pack targets")
return errors.Wrap(err, "get pack targets")
}
}
// Load queries
for _, spec := range specs {
// Load queries
query = `
SELECT
query_name, name, description, ` + "`interval`" + `,
@ -167,77 +184,14 @@ FROM scheduled_queries
WHERE pack_id = ?
`
if err := tx.Select(&spec.Queries, query, spec.ID); err != nil {
return nil, errors.Wrap(err, "get pack queries")
return errors.Wrap(err, "get pack queries")
}
}
err = tx.Commit()
return nil
})
if err != nil {
return nil, errors.Wrap(err, "commit transaction")
}
return specs, nil
}
func (d *Datastore) GetPackSpec(name string) (spec *kolide.PackSpec, err error) {
tx, err := d.db.Beginx()
if err != nil {
return nil, errors.Wrap(err, "begin GetPackSpecs transaction")
}
defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
}
}
}()
// Get basic spec
var specs []*kolide.PackSpec
query := "SELECT id, name, description, platform FROM packs WHERE name = ?"
if err := tx.Select(&specs, query, name); err != nil {
return nil, errors.Wrap(err, "get packs")
}
if len(specs) == 0 {
return nil, notFound("Pack").WithName(name)
}
if len(specs) > 1 {
return nil, errors.Errorf("expected 1 pack row, got %d", len(specs))
}
spec = specs[0]
// Load targets
query = `
SELECT l.name
FROM labels l JOIN pack_targets pt
WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id
`
if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil {
return nil, errors.Wrap(err, "get pack targets")
}
// Load queries
query = `
SELECT
query_name, name, description, ` + "`interval`" + `,
snapshot, removed, shard, platform, version
FROM scheduled_queries
WHERE pack_id = ?
`
if err := tx.Select(&spec.Queries, query, spec.ID); err != nil {
return nil, errors.Wrap(err, "get pack queries")
}
err = tx.Commit()
if err != nil {
return nil, errors.Wrap(err, "commit transaction")
return nil, err
}
return spec, nil