mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 00:45:19 +00:00
Fix edge case of AppConfig changes getting lost in cached mysql. (#15352)
This commit is contained in:
parent
1609c0fcb5
commit
0b5eedb801
1
changes/issue-14714-fix-cached-appconfig-race
Normal file
1
changes/issue-14714-fix-cached-appconfig-race
Normal file
@ -0,0 +1 @@
|
||||
* Fixed an edge-case where the caching of data could lead to some organization settings changes being lost when running with multiple Fleet instances.
|
@ -387,15 +387,15 @@ func TestApplyAppConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
defaultAgentOpts := json.RawMessage(`{"config":{"foo":"bar"}}`)
|
||||
savedAppConfig := &fleet.AppConfig{
|
||||
OrgInfo: fleet.OrgInfo{OrgName: "Fleet"},
|
||||
ServerSettings: fleet.ServerSettings{ServerURL: "https://example.org"},
|
||||
AgentOptions: &defaultAgentOpts,
|
||||
}
|
||||
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
|
||||
return &fleet.AppConfig{
|
||||
OrgInfo: fleet.OrgInfo{OrgName: "Fleet"},
|
||||
ServerSettings: fleet.ServerSettings{ServerURL: "https://example.org"},
|
||||
AgentOptions: &defaultAgentOpts,
|
||||
}, nil
|
||||
return savedAppConfig, nil
|
||||
}
|
||||
|
||||
var savedAppConfig *fleet.AppConfig
|
||||
ds.SaveAppConfigFunc = func(ctx context.Context, config *fleet.AppConfig) error {
|
||||
savedAppConfig = config
|
||||
return nil
|
||||
|
@ -6,7 +6,10 @@ import (
|
||||
|
||||
type key int
|
||||
|
||||
const requirePrimaryKey key = 0
|
||||
const (
|
||||
requirePrimaryKey key = 0
|
||||
bypassCachedMysqlKey key = 1
|
||||
)
|
||||
|
||||
// RequirePrimary returns a new context that indicates to the database layer if
|
||||
// the primary instance must always be used instead of the replica, even for
|
||||
@ -21,3 +24,18 @@ func IsPrimaryRequired(ctx context.Context) bool {
|
||||
v, _ := ctx.Value(requirePrimaryKey).(bool)
|
||||
return v
|
||||
}
|
||||
|
||||
// BypassCachedMysql returns a new context that indicates to the mysql cache
|
||||
// layer if the cache should be bypassed. This is required when reading data
|
||||
// with the intention of writing it back with changes, to avoid reading stale
|
||||
// data from the cache.
|
||||
func BypassCachedMysql(ctx context.Context, bypass bool) context.Context {
|
||||
return context.WithValue(ctx, bypassCachedMysqlKey, bypass)
|
||||
}
|
||||
|
||||
// IsCachedMysqlBypassed returns true if the context indicates that the mysql
|
||||
// cache must be bypassed, false otherwise.
|
||||
func IsCachedMysqlBypassed(ctx context.Context) bool {
|
||||
v, _ := ctx.Value(bypassCachedMysqlKey).(bool)
|
||||
return v
|
||||
}
|
||||
|
@ -25,3 +25,22 @@ func TestIsPrimaryRequired(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsCachedMysqlBypassed(t *testing.T) {
|
||||
cases := []struct {
|
||||
desc string
|
||||
ctx context.Context
|
||||
want bool
|
||||
}{
|
||||
{"not set", context.Background(), false},
|
||||
{"set to true", BypassCachedMysql(context.Background(), true), true},
|
||||
{"set to false", BypassCachedMysql(context.Background(), false), false},
|
||||
{"set to true then false", BypassCachedMysql(BypassCachedMysql(context.Background(), true), false), false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.desc, func(t *testing.T) {
|
||||
got := IsCachedMysqlBypassed(c.ctx)
|
||||
require.Equal(t, c.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package cached_mysql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -39,14 +40,15 @@ func BenchmarkCacheGetCustomClone(b *testing.B) {
|
||||
}
|
||||
|
||||
func benchmarkCacheGet(b *testing.B, v any) {
|
||||
ctx := context.Background()
|
||||
c := &cloneCache{cache.New(time.Minute, time.Minute)}
|
||||
c.Set("k", v, cache.DefaultExpiration)
|
||||
c.Set(ctx, "k", v, cache.DefaultExpiration)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
var ok bool
|
||||
for i := 0; i < b.N; i++ {
|
||||
Result, ok = c.Get("k")
|
||||
Result, ok = c.Get(ctx, "k")
|
||||
if !ok {
|
||||
b.Fatal("expected ok")
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxdb"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/jinzhu/copier"
|
||||
"github.com/patrickmn/go-cache"
|
||||
@ -84,7 +85,12 @@ type cloneCache struct {
|
||||
*cache.Cache
|
||||
}
|
||||
|
||||
func (c *cloneCache) Get(k string) (interface{}, bool) {
|
||||
func (c *cloneCache) Get(ctx context.Context, k string) (interface{}, bool) {
|
||||
if ctxdb.IsCachedMysqlBypassed(ctx) {
|
||||
// cache miss if the caller explicitly asked to bypass the cache
|
||||
return nil, false
|
||||
}
|
||||
|
||||
x, found := c.Cache.Get(k)
|
||||
if !found {
|
||||
return nil, false
|
||||
@ -98,10 +104,13 @@ func (c *cloneCache) Get(k string) (interface{}, bool) {
|
||||
return clone, true
|
||||
}
|
||||
|
||||
func (c *cloneCache) Set(k string, x interface{}, d time.Duration) {
|
||||
func (c *cloneCache) Set(ctx context.Context, k string, x interface{}, d time.Duration) {
|
||||
clone, err := clone(x)
|
||||
if err != nil {
|
||||
// Unfortunately, we can't return an error here. Skip caching it if clone fails.
|
||||
// Unfortunately, we can't return an error here. Skip caching it if clone
|
||||
// fails, but ensure that we clear any existing cached item for this key,
|
||||
// as the call to Set indicates the cache is now stale.
|
||||
c.Cache.Delete(k)
|
||||
return
|
||||
}
|
||||
|
||||
@ -113,6 +122,7 @@ type cachedMysql struct {
|
||||
|
||||
c *cloneCache
|
||||
|
||||
appConfigExp time.Duration
|
||||
packsExp time.Duration
|
||||
scheduledQueriesExp time.Duration
|
||||
teamAgentOptionsExp time.Duration
|
||||
@ -124,6 +134,12 @@ type cachedMysql struct {
|
||||
|
||||
type Option func(*cachedMysql)
|
||||
|
||||
func WithAppConfigExpiration(d time.Duration) Option {
|
||||
return func(o *cachedMysql) {
|
||||
o.appConfigExp = d
|
||||
}
|
||||
}
|
||||
|
||||
func WithPacksExpiration(d time.Duration) Option {
|
||||
return func(o *cachedMysql) {
|
||||
o.packsExp = d
|
||||
@ -158,6 +174,7 @@ func New(ds fleet.Datastore, opts ...Option) fleet.Datastore {
|
||||
c := &cachedMysql{
|
||||
Datastore: ds,
|
||||
c: &cloneCache{cache.New(5*time.Minute, 10*time.Minute)},
|
||||
appConfigExp: defaultAppConfigExpiration,
|
||||
packsExp: defaultPacksExpiration,
|
||||
scheduledQueriesExp: defaultScheduledQueriesExpiration,
|
||||
teamAgentOptionsExp: defaultTeamAgentOptionsExpiration,
|
||||
@ -178,13 +195,13 @@ func (ds *cachedMysql) NewAppConfig(ctx context.Context, info *fleet.AppConfig)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
|
||||
ds.c.Set(ctx, appConfigKey, ac, ds.appConfigExp)
|
||||
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error) {
|
||||
if x, found := ds.c.Get(appConfigKey); found {
|
||||
if x, found := ds.c.Get(ctx, appConfigKey); found {
|
||||
ac, ok := x.(*fleet.AppConfig)
|
||||
if ok {
|
||||
return ac, nil
|
||||
@ -196,7 +213,7 @@ func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
|
||||
ds.c.Set(ctx, appConfigKey, ac, ds.appConfigExp)
|
||||
|
||||
return ac, nil
|
||||
}
|
||||
@ -207,14 +224,14 @@ func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
ds.c.Set(appConfigKey, info, defaultAppConfigExpiration)
|
||||
ds.c.Set(ctx, appConfigKey, info, ds.appConfigExp)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet.Pack, error) {
|
||||
key := fmt.Sprintf(packsHostKey, hid)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
cachedPacks, ok := x.([]*fleet.Pack)
|
||||
if ok {
|
||||
return cachedPacks, nil
|
||||
@ -226,14 +243,14 @@ func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, packs, ds.packsExp)
|
||||
ds.c.Set(ctx, key, packs, ds.packsExp)
|
||||
|
||||
return packs, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID uint) (fleet.ScheduledQueryList, error) {
|
||||
key := fmt.Sprintf(scheduledQueriesKey, packID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
scheduledQueries, ok := x.(fleet.ScheduledQueryList)
|
||||
if ok {
|
||||
return scheduledQueries, nil
|
||||
@ -245,14 +262,14 @@ func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID ui
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, scheduledQueries, ds.scheduledQueriesExp)
|
||||
ds.c.Set(ctx, key, scheduledQueries, ds.scheduledQueriesExp)
|
||||
|
||||
return scheduledQueries, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json.RawMessage, error) {
|
||||
key := fmt.Sprintf(teamAgentOptionsKey, teamID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
if agentOptions, ok := x.(*json.RawMessage); ok {
|
||||
return agentOptions, nil
|
||||
}
|
||||
@ -263,14 +280,14 @@ func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, agentOptions, ds.teamAgentOptionsExp)
|
||||
ds.c.Set(ctx, key, agentOptions, ds.teamAgentOptionsExp)
|
||||
|
||||
return agentOptions, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) TeamFeatures(ctx context.Context, teamID uint) (*fleet.Features, error) {
|
||||
key := fmt.Sprintf(teamFeaturesKey, teamID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
if features, ok := x.(*fleet.Features); ok {
|
||||
return features, nil
|
||||
}
|
||||
@ -281,14 +298,14 @@ func (ds *cachedMysql) TeamFeatures(ctx context.Context, teamID uint) (*fleet.Fe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, features, ds.teamFeaturesExp)
|
||||
ds.c.Set(ctx, key, features, ds.teamFeaturesExp)
|
||||
|
||||
return features, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) TeamMDMConfig(ctx context.Context, teamID uint) (*fleet.TeamMDM, error) {
|
||||
key := fmt.Sprintf(teamMDMConfigKey, teamID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
if cfg, ok := x.(*fleet.TeamMDM); ok {
|
||||
return cfg, nil
|
||||
}
|
||||
@ -299,7 +316,7 @@ func (ds *cachedMysql) TeamMDMConfig(ctx context.Context, teamID uint) (*fleet.T
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, cfg, ds.teamMDMConfigExp)
|
||||
ds.c.Set(ctx, key, cfg, ds.teamMDMConfigExp)
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
@ -314,9 +331,9 @@ func (ds *cachedMysql) SaveTeam(ctx context.Context, team *fleet.Team) (*fleet.T
|
||||
featuresKey := fmt.Sprintf(teamFeaturesKey, team.ID)
|
||||
mdmConfigKey := fmt.Sprintf(teamMDMConfigKey, team.ID)
|
||||
|
||||
ds.c.Set(agentOptionsKey, team.Config.AgentOptions, ds.teamAgentOptionsExp)
|
||||
ds.c.Set(featuresKey, &team.Config.Features, ds.teamFeaturesExp)
|
||||
ds.c.Set(mdmConfigKey, &team.Config.MDM, ds.teamMDMConfigExp)
|
||||
ds.c.Set(ctx, agentOptionsKey, team.Config.AgentOptions, ds.teamAgentOptionsExp)
|
||||
ds.c.Set(ctx, featuresKey, &team.Config.Features, ds.teamFeaturesExp)
|
||||
ds.c.Set(ctx, mdmConfigKey, &team.Config.MDM, ds.teamMDMConfigExp)
|
||||
|
||||
return team, nil
|
||||
}
|
||||
@ -345,7 +362,7 @@ func (ds *cachedMysql) QueryByName(ctx context.Context, teamID *uint, name strin
|
||||
}
|
||||
key := fmt.Sprintf(queryByNameKey, teamID_, name)
|
||||
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
if query, ok := x.(*fleet.Query); ok {
|
||||
return query, nil
|
||||
}
|
||||
@ -356,7 +373,7 @@ func (ds *cachedMysql) QueryByName(ctx context.Context, teamID *uint, name strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, query, ds.queryByNameExp)
|
||||
ds.c.Set(ctx, key, query, ds.queryByNameExp)
|
||||
|
||||
return query, nil
|
||||
}
|
||||
@ -364,7 +381,7 @@ func (ds *cachedMysql) QueryByName(ctx context.Context, teamID *uint, name strin
|
||||
func (ds *cachedMysql) ResultCountForQuery(ctx context.Context, queryID uint) (int, error) {
|
||||
key := fmt.Sprintf(queryResultsCountKey, queryID)
|
||||
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if x, found := ds.c.Get(ctx, key); found {
|
||||
if count, ok := x.(int); ok {
|
||||
return count, nil
|
||||
}
|
||||
@ -375,7 +392,7 @@ func (ds *cachedMysql) ResultCountForQuery(ctx context.Context, queryID uint) (i
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, count, ds.queryResultsCountExp)
|
||||
ds.c.Set(ctx, key, count, ds.queryResultsCountExp)
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/pkg/optjson"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxdb"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/mock"
|
||||
"github.com/fleetdm/fleet/v4/server/ptr"
|
||||
@ -204,6 +205,82 @@ func TestCachedAppConfig(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBypassAppConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
mockedDS := new(mock.Store)
|
||||
ds := New(mockedDS, WithAppConfigExpiration(time.Minute))
|
||||
|
||||
var appConfigSet *fleet.AppConfig
|
||||
mockedDS.NewAppConfigFunc = func(ctx context.Context, info *fleet.AppConfig) (*fleet.AppConfig, error) {
|
||||
appConfigSet = info
|
||||
return info, nil
|
||||
}
|
||||
mockedDS.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
|
||||
return appConfigSet, nil
|
||||
}
|
||||
mockedDS.SaveAppConfigFunc = func(ctx context.Context, info *fleet.AppConfig) error {
|
||||
appConfigSet = info
|
||||
return nil
|
||||
}
|
||||
|
||||
// calling NewAppConfig initializes the cache
|
||||
_, err := ds.NewAppConfig(context.Background(), &fleet.AppConfig{
|
||||
OrgInfo: fleet.OrgInfo{
|
||||
OrgName: "A",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// used the cached value
|
||||
ac, err := ds.AppConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "A", ac.OrgInfo.OrgName)
|
||||
require.False(t, mockedDS.AppConfigFuncInvoked)
|
||||
|
||||
// update and save it, calls the DB
|
||||
ac.OrgInfo.OrgName = "B"
|
||||
err = ds.SaveAppConfig(ctx, ac)
|
||||
require.NoError(t, err)
|
||||
require.True(t, mockedDS.SaveAppConfigFuncInvoked)
|
||||
mockedDS.SaveAppConfigFuncInvoked = false
|
||||
|
||||
// read it back, uses the cache
|
||||
ac, err = ds.AppConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "B", ac.OrgInfo.OrgName)
|
||||
require.False(t, mockedDS.AppConfigFuncInvoked)
|
||||
|
||||
// simulate a database change from another process, not via the cached_mysql store
|
||||
ac.OrgInfo.OrgName = "C"
|
||||
err = mockedDS.SaveAppConfig(ctx, ac)
|
||||
require.NoError(t, err)
|
||||
|
||||
// reading it via the store uses the old cached value
|
||||
ac, err = ds.AppConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "B", ac.OrgInfo.OrgName)
|
||||
require.False(t, mockedDS.AppConfigFuncInvoked)
|
||||
|
||||
// force-bypassing the cache gets the updated value
|
||||
ctx = ctxdb.BypassCachedMysql(ctx, true)
|
||||
ac, err = ds.AppConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "C", ac.OrgInfo.OrgName)
|
||||
require.True(t, mockedDS.AppConfigFuncInvoked)
|
||||
mockedDS.AppConfigFuncInvoked = false
|
||||
|
||||
// bypassing the cache to read AppConfig did update the cache, so if we don't
|
||||
// bypass it anymore, it now gets the updated value from the cache
|
||||
ctx = ctxdb.BypassCachedMysql(ctx, false)
|
||||
ac, err = ds.AppConfig(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "C", ac.OrgInfo.OrgName)
|
||||
require.False(t, mockedDS.AppConfigFuncInvoked)
|
||||
}
|
||||
|
||||
func TestCachedPacksforHost(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/fleetdm/fleet/v4/server/authz"
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
authz_ctx "github.com/fleetdm/fleet/v4/server/contexts/authz"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxdb"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/license"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/viewer"
|
||||
@ -262,12 +263,20 @@ func (svc *Service) ModifyAppConfig(ctx context.Context, p []byte, applyOpts fle
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// we need the config from the datastore because API tokens are obfuscated at the service layer
|
||||
// we will retrieve the obfuscated config before we return
|
||||
// we need the config from the datastore because API tokens are obfuscated at
|
||||
// the service layer we will retrieve the obfuscated config before we return.
|
||||
// We bypass the mysql cache because this is a read that will be followed by
|
||||
// modifications and a save, so we need up-to-date data.
|
||||
ctx = ctxdb.BypassCachedMysql(ctx, true)
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// the rest of the calls can use the cache safely (we read the AppConfig
|
||||
// again before returning, either after a dry-run or after saving the
|
||||
// AppConfig, in which case the cache will be up-to-date and safe to use).
|
||||
ctx = ctxdb.BypassCachedMysql(ctx, false)
|
||||
|
||||
oldAppConfig := appConfig.Copy()
|
||||
|
||||
// We do not use svc.License(ctx) to allow roles (like GitOps) write but not read access to AppConfig.
|
||||
|
Loading…
Reference in New Issue
Block a user