From 44c40e425502a1da98a8c3158c81cf120398f5b9 Mon Sep 17 00:00:00 2001 From: billcobbler Date: Tue, 10 Mar 2020 20:14:02 -0500 Subject: [PATCH] Retry sqlx transactions (#2203) - Add exponential backoff method to MySQL datastore - Refactor queries using transaction to use backup. Fixes #845 --- go.mod | 1 + go.sum | 2 + server/datastore/mysql/email_changes.go | 66 ++++----- server/datastore/mysql/hosts.go | 142 +++++++++--------- server/datastore/mysql/labels.go | 52 +++---- server/datastore/mysql/mysql.go | 36 +++++ server/datastore/mysql/packs.go | 184 +++++++++--------------- 7 files changed, 218 insertions(+), 265 deletions(-) diff --git a/go.mod b/go.mod index a5f2d6208..165bf7f56 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.26.8 github.com/beevik/etree v1.1.0 github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5 + github.com/cenkalti/backoff/v4 v4.0.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/e-dard/netbug v0.0.0-20151029172837-e64d308a0b20 github.com/elazarl/go-bindata-assetfs v1.0.0 diff --git a/go.sum b/go.sum index b1ab38f82..51dcc19c4 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5 h1:osZyZB7J4kE1 github.com/briandowns/spinner v0.0.0-20170614154858-48dbb65d7bd5/go.mod h1:hw/JEQBIE+c/BLI4aKM8UU8v+ZqrD3h7HC27kKt8JQU= github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s= github.com/c-bata/go-prompt v0.2.3/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU= +github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/crewjam/saml v0.0.0-20190521120225-344d075952c9/go.mod h1:w5eu+HNtubx+kRpQL6QFT2F3yIFfYVe6+EzOFVU7Hko= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/server/datastore/mysql/email_changes.go b/server/datastore/mysql/email_changes.go index 2d246d1b3..408fa20c4 100644 --- a/server/datastore/mysql/email_changes.go +++ b/server/datastore/mysql/email_changes.go @@ -26,10 +26,6 @@ func (ds *Datastore) PendingEmailChange(uid uint, newEmail, token string) error // ConfirmPendingEmailChange finds email change record, updates user with new email, // then deletes change record if everything succeeds. func (ds *Datastore) ConfirmPendingEmailChange(id uint, token string) (newEmail string, err error) { - var ( - tx *sqlx.Tx - success bool // indicates all db operations success if true - ) changeRecord := struct { ID uint UserID uint `db:"user_id"` @@ -44,42 +40,36 @@ func (ds *Datastore) ConfirmPendingEmailChange(id uint, token string) (newEmail return "", errors.Wrap(err, "email change") } - tx, err = ds.db.Beginx() - if err != nil { - return "", errors.Wrap(err, "begin transaction to change email") - } - defer func() { - if success { - if err = tx.Commit(); err == nil { - return // success - } - err = errors.Wrap(err, "commit transaction for email change") - tx.Rollback() - } - }() - - query := ` - UPDATE users SET - email = ? - WHERE id = ? + err = ds.withRetryTxx(func(tx *sqlx.Tx) error { + query := ` + UPDATE users SET + email = ? + WHERE id = ? ` - results, err := tx.Exec(query, changeRecord.NewEmail, changeRecord.UserID) + results, err := tx.Exec(query, changeRecord.NewEmail, changeRecord.UserID) + if err != nil { + return errors.Wrap(err, "updating user's email") + } + + rowsAffected, err := results.RowsAffected() + if err != nil { + return errors.Wrap(err, "fetching affected rows updating user's email") + } + if rowsAffected == 0 { + return notFound("User").WithID(changeRecord.UserID) + } + + _, err = tx.Exec("DELETE FROM email_changes WHERE id = ?", changeRecord.ID) + if err != nil { + return errors.Wrap(err, "deleting email change") + } + + return nil + }) + if err != nil { - return "", errors.Wrap(err, "updating user's email") + return "", err } - rowsAffected, err := results.RowsAffected() - if err != nil { - return "", errors.Wrap(err, "fetching affected rows updating user's email") - } - if rowsAffected == 0 { - return "", notFound("User").WithID(changeRecord.UserID) - } - - _, err = tx.Exec("DELETE FROM email_changes WHERE id = ?", changeRecord.ID) - if err != nil { - return "", errors.Wrap(err, "deleting email change") - } - success = true // cause things to be committed in defer func - return changeRecord.NewEmail, nil + return changeRecord.NewEmail, err } diff --git a/server/datastore/mysql/hosts.go b/server/datastore/mysql/hosts.go index bfa24ecfd..30e6fefaf 100644 --- a/server/datastore/mysql/hosts.go +++ b/server/datastore/mysql/hosts.go @@ -174,93 +174,79 @@ func (d *Datastore) SaveHost(host *kolide.Host) error { logger_tls_period = ? WHERE id = ? ` - - tx, err := d.db.Beginx() - if err != nil { - return errors.Wrap(err, "creating transaction") - } - - results, err := tx.Exec(sqlStatement, - host.DetailUpdateTime, - host.NodeKey, - host.HostName, - host.UUID, - host.Platform, - host.OsqueryVersion, - host.OSVersion, - host.Uptime, - host.PhysicalMemory, - host.CPUType, - host.CPUSubtype, - host.CPUBrand, - host.CPUPhysicalCores, - host.HardwareVendor, - host.HardwareModel, - host.HardwareVersion, - host.HardwareSerial, - host.ComputerName, - host.PrimaryNetworkInterfaceID, - host.Build, - host.PlatformLike, - host.CodeName, - host.CPULogicalCores, - host.SeenTime, - host.DistributedInterval, - host.ConfigTLSRefresh, - host.LoggerTLSPeriod, - host.ID) - if err != nil { - tx.Rollback() - return errors.Wrap(err, "executing main SQL statement") - } - rowsAffected, err := results.RowsAffected() - if err != nil { - tx.Rollback() - return errors.Wrap(err, "rows affected updating host") - } - if rowsAffected == 0 { - tx.Rollback() - return notFound("Host").WithID(host.ID) - } - - host.NetworkInterfaces, err = updateNicsForHost(tx, host) - if err != nil { - tx.Rollback() - return errors.Wrap(err, "updating nics") - } - - if err = removedUnusedNics(tx, host); err != nil { - tx.Rollback() - return errors.Wrap(err, "removing unused nics") - } - - if needsUpdate := host.ResetPrimaryNetwork(); needsUpdate { - results, err = tx.Exec( - "UPDATE hosts SET primary_ip_id = ? WHERE id = ?", + err := d.withRetryTxx(func(tx *sqlx.Tx) error { + results, err := tx.Exec(sqlStatement, + host.DetailUpdateTime, + host.NodeKey, + host.HostName, + host.UUID, + host.Platform, + host.OsqueryVersion, + host.OSVersion, + host.Uptime, + host.PhysicalMemory, + host.CPUType, + host.CPUSubtype, + host.CPUBrand, + host.CPUPhysicalCores, + host.HardwareVendor, + host.HardwareModel, + host.HardwareVersion, + host.HardwareSerial, + host.ComputerName, host.PrimaryNetworkInterfaceID, - host.ID, - ) - + host.Build, + host.PlatformLike, + host.CodeName, + host.CPULogicalCores, + host.SeenTime, + host.DistributedInterval, + host.ConfigTLSRefresh, + host.LoggerTLSPeriod, + host.ID) if err != nil { - tx.Rollback() - return errors.Wrap(err, "resetting primary network") + return errors.Wrap(err, "executing main SQL statement") } - rowsAffected, err = results.RowsAffected() + rowsAffected, err := results.RowsAffected() if err != nil { - tx.Rollback() - return errors.Wrap(err, "rows affected resetting primary network") + return errors.Wrap(err, "rows affected updating host") } if rowsAffected == 0 { - tx.Rollback() return notFound("Host").WithID(host.ID) } - } - if err = tx.Commit(); err != nil { - tx.Rollback() - return errors.Wrap(err, "committing transaction") - } - return nil + host.NetworkInterfaces, err = updateNicsForHost(tx, host) + if err != nil { + return errors.Wrap(err, "updating nics") + } + + if err = removedUnusedNics(tx, host); err != nil { + return errors.Wrap(err, "removing unused nics") + } + + if needsUpdate := host.ResetPrimaryNetwork(); needsUpdate { + results, err = tx.Exec( + "UPDATE hosts SET primary_ip_id = ? WHERE id = ?", + host.PrimaryNetworkInterfaceID, + host.ID, + ) + + if err != nil { + return errors.Wrap(err, "resetting primary network") + } + rowsAffected, err = results.RowsAffected() + if err != nil { + return errors.Wrap(err, "rows affected resetting primary network") + } + if rowsAffected == 0 { + return notFound("Host").WithID(host.ID) + } + } + + return nil + }) + + return err } func (d *Datastore) DeleteHost(hid uint) error { diff --git a/server/datastore/mysql/labels.go b/server/datastore/mysql/labels.go index 29ae7e9c3..0d7eabaea 100644 --- a/server/datastore/mysql/labels.go +++ b/server/datastore/mysql/labels.go @@ -2,7 +2,6 @@ package mysql import ( "database/sql" - "fmt" "time" "github.com/jmoiron/sqlx" @@ -11,25 +10,8 @@ import ( ) func (d *Datastore) ApplyLabelSpecs(specs []*kolide.LabelSpec) (err error) { - tx, err := d.db.Beginx() - if err != nil { - return errors.Wrap(err, "begin ApplyLabelSpecs transaction") - } - - defer func() { - if err != nil { - rbErr := tx.Rollback() - // It seems possible that there might be a case in - // which the error we are dealing with here was thrown - // by the call to tx.Commit(), and the docs suggest - // this call would then result in sql.ErrTxDone. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) - } - } - }() - - sql := ` + err = d.withRetryTxx(func(tx *sqlx.Tx) error { + sql := ` INSERT INTO labels ( name, description, @@ -45,23 +27,25 @@ func (d *Datastore) ApplyLabelSpecs(specs []*kolide.LabelSpec) (err error) { label_type = VALUES(label_type), deleted = false ` - stmt, err := tx.Prepare(sql) - if err != nil { - return errors.Wrap(err, "prepare ApplyLabelSpecs insert") - } - - for _, s := range specs { - if s.Name == "" { - return errors.New("label name must not be empty") - } - _, err := stmt.Exec(s.Name, s.Description, s.Query, s.Platform, s.LabelType) + stmt, err := tx.Prepare(sql) if err != nil { - return errors.Wrap(err, "exec ApplyLabelSpecs insert") + return errors.Wrap(err, "prepare ApplyLabelSpecs insert") } - } - err = tx.Commit() - return errors.Wrap(err, "commit ApplyLabelSpecs transaction") + for _, s := range specs { + if s.Name == "" { + return errors.New("label name must not be empty") + } + _, err := stmt.Exec(s.Name, s.Description, s.Query, s.Platform, s.LabelType) + if err != nil { + return errors.Wrap(err, "exec ApplyLabelSpecs insert") + } + } + + return nil + }) + + return errors.Wrap(err, "ApplyLabelSpecs transaction") } func (d *Datastore) GetLabelSpecs() ([]*kolide.LabelSpec, error) { diff --git a/server/datastore/mysql/mysql.go b/server/datastore/mysql/mysql.go index d2a0f64ad..ab66795d2 100644 --- a/server/datastore/mysql/mysql.go +++ b/server/datastore/mysql/mysql.go @@ -10,6 +10,7 @@ import ( "time" "github.com/WatchBeam/clock" + "github.com/cenkalti/backoff/v4" "github.com/go-kit/kit/log" "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" @@ -50,6 +51,41 @@ func (d *Datastore) getTransaction(opts []kolide.OptionalArg) dbfunctions { return result } +type txFn func(*sqlx.Tx) error + +// withRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff +func (d *Datastore) withRetryTxx(fn txFn) (err error) { + operation := func() error { + tx, err := d.db.Beginx() + if err != nil { + return errors.Wrap(err, "creating transaction") + } + + defer func() { + if p := recover(); p != nil { + tx.Rollback() + panic(p) + } + }() + + err = fn(tx) + if err != nil { + rbErr := tx.Rollback() + if rbErr != nil && rbErr != sql.ErrTxDone { + panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) + } + } else { + err = tx.Commit() + } + + return errors.Wrap(err, "committing transaction") + } + + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 5 * time.Second + return backoff.Retry(operation, bo) +} + // New creates an MySQL datastore. func New(config config.MysqlConfig, c clock.Clock, opts ...DBOption) (*Datastore, error) { options := &dbOptions{ diff --git a/server/datastore/mysql/packs.go b/server/datastore/mysql/packs.go index eb318a261..c53052bac 100644 --- a/server/datastore/mysql/packs.go +++ b/server/datastore/mysql/packs.go @@ -10,33 +10,17 @@ import ( ) func (d *Datastore) ApplyPackSpecs(specs []*kolide.PackSpec) (err error) { - tx, err := d.db.Beginx() - if err != nil { - return errors.Wrap(err, "begin ApplyPackSpec transaction") - } - - defer func() { - if err != nil { - rbErr := tx.Rollback() - // It seems possible that there might be a case in - // which the error we are dealing with here was thrown - // by the call to tx.Commit(), and the docs suggest - // this call would then result in sql.ErrTxDone. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) + err = d.withRetryTxx(func(tx *sqlx.Tx) error { + for _, spec := range specs { + if err := applyPackSpec(tx, spec); err != nil { + return errors.Wrapf(err, "applying pack '%s'", spec.Name) } } - }() - for _, spec := range specs { - err = applyPackSpec(tx, spec) - if err != nil { - return errors.Wrapf(err, "applying pack '%s'", spec.Name) - } - } + return nil + }) - err = tx.Commit() - return errors.Wrap(err, "commit transaction") + return err } func applyPackSpec(tx *sqlx.Tx, spec *kolide.PackSpec) error { @@ -121,44 +105,77 @@ func applyPackSpec(tx *sqlx.Tx, spec *kolide.PackSpec) error { } func (d *Datastore) GetPackSpecs() (specs []*kolide.PackSpec, err error) { - tx, err := d.db.Beginx() - if err != nil { - return nil, errors.Wrap(err, "begin GetPackSpecs transaction") - } + err = d.withRetryTxx(func(tx *sqlx.Tx) error { + // Get basic specs + query := "SELECT id, name, description, platform FROM packs" + if err := tx.Select(&specs, query); err != nil { + return errors.Wrap(err, "get packs") + } - defer func() { - if err != nil { - rbErr := tx.Rollback() - // It seems possible that there might be a case in - // which the error we are dealing with here was thrown - // by the call to tx.Commit(), and the docs suggest - // this call would then result in sql.ErrTxDone. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) + // Load targets + for _, spec := range specs { + query = ` +SELECT l.name +FROM labels l JOIN pack_targets pt +WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id +` + if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil { + return errors.Wrap(err, "get pack targets") } } - }() - // Get basic specs - query := "SELECT id, name, description, platform FROM packs" - if err := tx.Select(&specs, query); err != nil { - return nil, errors.Wrap(err, "get packs") + // Load queries + for _, spec := range specs { + query = ` +SELECT +query_name, name, description, ` + "`interval`" + `, +snapshot, removed, shard, platform, version +FROM scheduled_queries +WHERE pack_id = ? +` + if err := tx.Select(&spec.Queries, query, spec.ID); err != nil { + return errors.Wrap(err, "get pack queries") + } + } + + return nil + }) + + if err != nil { + return nil, err } - // Load targets - for _, spec := range specs { + return specs, nil +} + +func (d *Datastore) GetPackSpec(name string) (spec *kolide.PackSpec, err error) { + err = d.withRetryTxx(func(tx *sqlx.Tx) error { + // Get basic spec + var specs []*kolide.PackSpec + query := "SELECT id, name, description, platform FROM packs WHERE name = ?" + if err := tx.Select(&specs, query, name); err != nil { + return errors.Wrap(err, "get packs") + } + if len(specs) == 0 { + return notFound("Pack").WithName(name) + } + if len(specs) > 1 { + return errors.Errorf("expected 1 pack row, got %d", len(specs)) + } + + spec = specs[0] + + // Load targets query = ` SELECT l.name FROM labels l JOIN pack_targets pt WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id ` if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil { - return nil, errors.Wrap(err, "get pack targets") + return errors.Wrap(err, "get pack targets") } - } - // Load queries - for _, spec := range specs { + // Load queries query = ` SELECT query_name, name, description, ` + "`interval`" + `, @@ -167,77 +184,14 @@ FROM scheduled_queries WHERE pack_id = ? ` if err := tx.Select(&spec.Queries, query, spec.ID); err != nil { - return nil, errors.Wrap(err, "get pack queries") + return errors.Wrap(err, "get pack queries") } - } - err = tx.Commit() + return nil + }) + if err != nil { - return nil, errors.Wrap(err, "commit transaction") - } - - return specs, nil -} - -func (d *Datastore) GetPackSpec(name string) (spec *kolide.PackSpec, err error) { - tx, err := d.db.Beginx() - if err != nil { - return nil, errors.Wrap(err, "begin GetPackSpecs transaction") - } - - defer func() { - if err != nil { - rbErr := tx.Rollback() - // It seems possible that there might be a case in - // which the error we are dealing with here was thrown - // by the call to tx.Commit(), and the docs suggest - // this call would then result in sql.ErrTxDone. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) - } - } - }() - - // Get basic spec - var specs []*kolide.PackSpec - query := "SELECT id, name, description, platform FROM packs WHERE name = ?" - if err := tx.Select(&specs, query, name); err != nil { - return nil, errors.Wrap(err, "get packs") - } - if len(specs) == 0 { - return nil, notFound("Pack").WithName(name) - } - if len(specs) > 1 { - return nil, errors.Errorf("expected 1 pack row, got %d", len(specs)) - } - - spec = specs[0] - - // Load targets - query = ` -SELECT l.name -FROM labels l JOIN pack_targets pt -WHERE pack_id = ? AND pt.type = ? AND pt.target_id = l.id -` - if err := tx.Select(&spec.Targets.Labels, query, spec.ID, kolide.TargetLabel); err != nil { - return nil, errors.Wrap(err, "get pack targets") - } - - // Load queries - query = ` -SELECT -query_name, name, description, ` + "`interval`" + `, -snapshot, removed, shard, platform, version -FROM scheduled_queries -WHERE pack_id = ? -` - if err := tx.Select(&spec.Queries, query, spec.ID); err != nil { - return nil, errors.Wrap(err, "get pack queries") - } - - err = tx.Commit() - if err != nil { - return nil, errors.Wrap(err, "commit transaction") + return nil, err } return spec, nil