Refactor webhooks cron to new schedule package (#7840)

This commit is contained in:
gillespi314 2022-09-20 14:26:36 -05:00 committed by GitHub
parent 325adad941
commit 34688f531a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 132 additions and 126 deletions

View File

@ -357,114 +357,67 @@ func checkNVDVulnerabilities(
return vulns
}
func cronWebhooks(
func startAutomationsSchedule(
ctx context.Context,
instanceID string,
ds fleet.Datastore,
logger kitlog.Logger,
identifier string,
failingPoliciesSet fleet.FailingPolicySet,
intervalReload time.Duration,
) {
failingPoliciesSet fleet.FailingPolicySet,
) (*schedule.Schedule, error) {
const defaultAutomationsInterval = 24 * time.Hour
appConfig, err := ds.AppConfig(ctx)
if err != nil {
level.Error(logger).Log("config", "couldn't read app config", "err", err)
return
return nil, fmt.Errorf("getting app config: %w", err)
}
interval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour)
level.Debug(logger).Log("interval", interval.String())
ticker := time.NewTicker(interval)
start := time.Now()
for {
level.Debug(logger).Log("waiting", "on ticker")
select {
case <-ticker.C:
level.Debug(logger).Log("waiting", "done")
case <-ctx.Done():
level.Debug(logger).Log("exit", "done with cron.")
return
case <-time.After(intervalReload):
// Reload interval and check if it has been reduced.
s := schedule.New(
// TODO(sarah): Reconfigure settings so automations interval doesn't reside under webhook settings
ctx, "automations", instanceID, appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval), ds,
schedule.WithLogger(kitlog.With(logger, "cron", "automations")),
schedule.WithConfigReloadInterval(intervalReload, func(ctx context.Context) (time.Duration, error) {
appConfig, err := ds.AppConfig(ctx)
if err != nil {
level.Error(logger).Log("config", "couldn't read app config", "err", err)
continue
return 0, err
}
if currInterval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour); time.Since(start) < currInterval {
continue
}
}
// Reread app config to be able to read latest data used by the webhook
// and update the ticker for the next run.
appConfig, err = ds.AppConfig(ctx)
if err != nil {
errHandler(ctx, logger, "couldn't read app config", err)
} else {
ticker.Reset(appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour))
start = time.Now()
}
// We set the db lock durations to match the intervalReload.
maybeTriggerHostStatus(ctx, ds, logger, identifier, appConfig, intervalReload)
maybeTriggerFailingPoliciesAutomation(ctx, ds, logger, identifier, appConfig, intervalReload, failingPoliciesSet)
level.Debug(logger).Log("loop", "done")
}
newInterval := appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval)
return newInterval, nil
}),
schedule.WithJob(
"host_status_webhook",
func(ctx context.Context) error {
return webhooks.TriggerHostStatusWebhook(
ctx, ds, kitlog.With(logger, "automation", "host_status"),
)
},
),
schedule.WithJob(
"failing_policies_automation",
func(ctx context.Context) error {
return triggerFailingPoliciesAutomation(ctx, ds, kitlog.With(logger, "automation", "failing_policies"), failingPoliciesSet)
},
),
)
s.Start()
return s, nil
}
func maybeTriggerHostStatus(
func triggerFailingPoliciesAutomation(
ctx context.Context,
ds fleet.Datastore,
logger kitlog.Logger,
identifier string,
appConfig *fleet.AppConfig,
lockDuration time.Duration,
) {
logger = kitlog.With(logger, "cron", lockKeyWebhooksHostStatus)
if locked, err := ds.Lock(ctx, lockKeyWebhooksHostStatus, identifier, lockDuration); err != nil {
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
return
} else if !locked {
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
return
}
if err := webhooks.TriggerHostStatusWebhook(
ctx, ds, kitlog.With(logger, "webhook", "host_status"), appConfig,
); err != nil {
errHandler(ctx, logger, "triggering host status webhook", err)
}
}
func maybeTriggerFailingPoliciesAutomation(
ctx context.Context,
ds fleet.Datastore,
logger kitlog.Logger,
identifier string,
appConfig *fleet.AppConfig,
lockDuration time.Duration,
failingPoliciesSet fleet.FailingPolicySet,
) {
logger = kitlog.With(logger, "cron", lockKeyWebhooksFailingPolicies)
if locked, err := ds.Lock(ctx, lockKeyWebhooksFailingPolicies, identifier, lockDuration); err != nil {
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
return
} else if !locked {
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
return
) error {
appConfig, err := ds.AppConfig(ctx)
if err != nil {
return fmt.Errorf("getting app config: %w", err)
}
serverURL, err := url.Parse(appConfig.ServerSettings.ServerURL)
if err != nil {
errHandler(ctx, logger, "parsing appConfig.ServerSettings.ServerURL", err)
return
return fmt.Errorf("parsing appConfig.ServerSettings.ServerURL: %w", err)
}
logger = kitlog.With(logger, "webhook", "failing_policies")
err = policies.TriggerFailingPoliciesAutomation(ctx, ds, logger, appConfig, failingPoliciesSet, func(policy *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
err = policies.TriggerFailingPoliciesAutomation(ctx, ds, logger, failingPoliciesSet, func(policy *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
switch cfg.AutomationType {
case policies.FailingPolicyWebhook:
return webhooks.SendFailingPoliciesBatchedPOSTs(
@ -497,8 +450,10 @@ func maybeTriggerFailingPoliciesAutomation(
return nil
})
if err != nil {
errHandler(ctx, logger, "triggering failing policies automation", err)
return fmt.Errorf("triggering failing policies automation: %w", err)
}
return nil
}
func cronWorker(
@ -649,7 +604,7 @@ func startCleanupsAndAggregationSchedule(
schedule.WithLogger(kitlog.With(logger, "cron", "cleanups_then_aggregation")),
// Run cleanup jobs first.
schedule.WithJob(
"distributed_query_campaings",
"distributed_query_campaigns",
func(ctx context.Context) error {
_, err := ds.CleanupDistributedQueryCampaigns(ctx, time.Now())
return err

View File

@ -381,7 +381,7 @@ the way that the Fleet server works.
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
defer cancelFunc() // TODO(sarah); Handle release of locks in graceful shutdown
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
ctx = ctxerr.NewContext(ctx, eh)
svc, err := service.NewService(ctx, ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, installerStore, *license, failingPolicySet, geoIP, redisWrapperDS)
@ -400,8 +400,8 @@ the way that the Fleet server works.
if err != nil {
initFatal(errors.New("Error generating random instance identifier"), "")
}
runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, failingPolicySet, instanceID)
if err := startSchedules(ctx, ds, logger, config, license, redisWrapperDS, instanceID); err != nil {
runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, instanceID)
if err := startSchedules(ctx, ds, logger, config, license, redisWrapperDS, failingPolicySet, instanceID); err != nil {
initFatal(err, "failed to register schedules")
}
@ -653,9 +653,7 @@ func basicAuthHandler(username, password string, next http.Handler) http.Handler
}
const (
lockKeyWebhooksHostStatus = "webhooks" // keeping this name for backwards compatibility.
lockKeyWebhooksFailingPolicies = "webhooks:global_failing_policies"
lockKeyWorker = "worker"
lockKeyWorker = "worker"
)
// runCrons runs cron jobs not yet ported to use the schedule package (startSchedules)
@ -666,13 +664,11 @@ func runCrons(
logger kitlog.Logger,
config configpkg.FleetConfig,
license *fleet.LicenseInfo,
failingPoliciesSet fleet.FailingPolicySet,
ourIdentifier string,
) {
// StartCollectors starts a goroutine per collector, using ctx to cancel.
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
}
@ -683,12 +679,15 @@ func startSchedules(
config config.FleetConfig,
license *fleet.LicenseInfo,
enrollHostLimiter fleet.EnrollHostLimiter,
failingPoliciesSet fleet.FailingPolicySet,
instanceID string,
) error {
startCleanupsAndAggregationSchedule(ctx, instanceID, ds, logger, enrollHostLimiter)
startSendStatsSchedule(ctx, instanceID, ds, config, license, logger)
startVulnerabilitiesSchedule(ctx, instanceID, ds, logger, &config.Vulnerabilities, license)
if _, err := startAutomationsSchedule(ctx, instanceID, ds, logger, 5*time.Minute, failingPoliciesSet); err != nil {
return err
}
return nil
}

View File

@ -129,7 +129,7 @@ func TestMaybeSendStatisticsSkipsIfNotConfigured(t *testing.T) {
assert.False(t, called)
}
func TestCronWebhooks(t *testing.T) {
func TestAutomationsSchedule(t *testing.T) {
ds := new(mock.Store)
endpointCalled := int32(0)
@ -180,7 +180,7 @@ func TestCronWebhooks(t *testing.T) {
defer cancelFunc()
failingPoliciesSet := service.NewMemFailingPolicySet()
go cronWebhooks(ctx, ds, kitlog.With(kitlog.NewNopLogger(), "cron", "webhooks"), "1234", failingPoliciesSet, 5*time.Minute)
startAutomationsSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), 5*time.Minute, failingPoliciesSet)
<-calledOnce
time.Sleep(1 * time.Second)
@ -309,17 +309,28 @@ func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) {
}, 24*time.Second, 12*time.Second)
}
// TestCronWebhooksLockDuration tests that the Lock method is being called
// for the current webhook crons and that their duration is always one hour (see #3584).
func TestCronWebhooksLockDuration(t *testing.T) {
// TestCronAutomationsLockDuration tests that the Lock method is being called
// for the current automation crons and that their duration is equal to the current
// schedule interval.
func TestAutomationsScheduleLockDuration(t *testing.T) {
ds := new(mock.Store)
expectedInterval := 1 * time.Second
intitalConfigLoaded := make(chan struct{}, 1)
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
ac := fleet.AppConfig{
WebhookSettings: fleet.WebhookSettings{
Interval: fleet.Duration{Duration: 1 * time.Second},
Interval: fleet.Duration{Duration: 1 * time.Hour},
},
}, nil
}
select {
case <-intitalConfigLoaded:
ac.WebhookSettings.Interval = fleet.Duration{Duration: expectedInterval}
default:
// initial config
close(intitalConfigLoaded)
}
return &ac, nil
}
hostStatus := make(chan struct{})
hostStatusClosed := false
@ -327,16 +338,15 @@ func TestCronWebhooksLockDuration(t *testing.T) {
failingPoliciesClosed := false
unknownName := false
ds.LockFunc = func(ctx context.Context, name string, owner string, expiration time.Duration) (bool, error) {
if expiration != 1*time.Hour {
if expiration != expectedInterval {
return false, nil
}
switch name {
case lockKeyWebhooksHostStatus:
case "automations":
if !hostStatusClosed {
close(hostStatus)
hostStatusClosed = true
}
case lockKeyWebhooksFailingPolicies:
if !failingPoliciesClosed {
close(failingPolicies)
failingPoliciesClosed = true
@ -346,11 +356,14 @@ func TestCronWebhooksLockDuration(t *testing.T) {
}
return true, nil
}
ds.UnlockFunc = func(context.Context, string, string) error {
return nil
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
go cronWebhooks(ctx, ds, kitlog.NewNopLogger(), "1234", service.NewMemFailingPolicySet(), 1*time.Hour)
startAutomationsSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), 1*time.Second, service.NewMemFailingPolicySet())
select {
case <-failingPolicies:
@ -365,7 +378,7 @@ func TestCronWebhooksLockDuration(t *testing.T) {
require.False(t, unknownName)
}
func TestCronWebhooksIntervalChange(t *testing.T) {
func TestAutomationsScheduleIntervalChange(t *testing.T) {
ds := new(mock.Store)
interval := struct {
@ -402,16 +415,22 @@ func TestCronWebhooksIntervalChange(t *testing.T) {
}
return true, nil
}
ds.UnlockFunc = func(context.Context, string, string) error {
return nil
}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
go cronWebhooks(ctx, ds, kitlog.NewNopLogger(), "1234", service.NewMemFailingPolicySet(), 200*time.Millisecond)
startAutomationsSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), 200*time.Millisecond, service.NewMemFailingPolicySet())
select {
case <-configLoaded:
case <-time.After(5 * time.Second):
t.Fatal("timeout: initial config load")
// wait for config to be called once by startAutomationsSchedule and again by configReloadFunc
for c := 0; c < 2; c++ {
select {
case <-configLoaded:
case <-time.After(5 * time.Second):
t.Fatal("timeout: initial config load")
}
}
interval.Lock()

View File

@ -41,10 +41,13 @@ func TriggerFailingPoliciesAutomation(
ctx context.Context,
ds fleet.Datastore,
logger kitlog.Logger,
appConfig *fleet.AppConfig,
failingPoliciesSet fleet.FailingPolicySet,
sendFunc func(*fleet.Policy, FailingPolicyAutomationConfig) error,
) error {
appConfig, err := ds.AppConfig(ctx)
if err != nil {
return ctxerr.Wrap(ctx, err, "getting app config")
}
// build the global automation configuration
var globalCfg FailingPolicyAutomationConfig

View File

@ -140,6 +140,10 @@ func TestTriggerFailingPolicies(t *testing.T) {
},
}
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return ac, nil
}
// add a failing policy host for every known policy
failingPolicySet := service.NewMemFailingPolicySet()
for polID := range pols {
@ -161,7 +165,7 @@ func TestTriggerFailingPolicies(t *testing.T) {
automation FailingPolicyAutomationType
}
var triggerCalls []policyAutomation
err = TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg FailingPolicyAutomationConfig) error {
err = TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg FailingPolicyAutomationConfig) error {
triggerCalls = append(triggerCalls, policyAutomation{pol.ID, cfg.AutomationType})
hosts, err := failingPolicySet.ListHosts(pol.ID)
@ -207,7 +211,7 @@ func TestTriggerFailingPolicies(t *testing.T) {
// policy sets should be empty (no host to process).
var countHosts int
triggerCalls = triggerCalls[:0]
err = TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg FailingPolicyAutomationConfig) error {
err = TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg FailingPolicyAutomationConfig) error {
hosts, err := failingPolicySet.ListHosts(pol.ID)
require.NoError(t, err)
countHosts += len(hosts)

View File

@ -59,6 +59,7 @@ type Job struct {
// Locker allows a Schedule to acquire a lock before running jobs.
type Locker interface {
Lock(ctx context.Context, scheduleName string, scheduleInstanceID string, expiration time.Duration) (bool, error)
Unlock(ctx context.Context, scheduleName string, scheduleInstanceID string) error
}
// Option allows configuring a Schedule.
@ -211,6 +212,7 @@ func (s *Schedule) Start() {
for {
select {
case <-s.ctx.Done():
level.Info(s.logger).Log("msg", "done")
return
case <-configTicker.C:
level.Debug(s.logger).Log("msg", "config reload check")

View File

@ -18,6 +18,10 @@ func (nopLocker) Lock(context.Context, string, string, time.Duration) (bool, err
return true, nil
}
func (nopLocker) Unlock(context.Context, string, string) error {
return nil
}
func TestNewSchedule(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@ -54,6 +58,10 @@ func (l *counterLocker) Lock(context.Context, string, string, time.Duration) (bo
return true, nil
}
func (l *counterLocker) Unlock(context.Context, string, string) error {
return nil
}
func TestScheduleLocker(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -73,6 +73,10 @@ func TestTriggerFailingPoliciesWebhookBasic(t *testing.T) {
},
}
ds.AppConfigFunc = func(context.Context) (*fleet.AppConfig, error) {
return ac, nil
}
failingPolicySet := service.NewMemFailingPolicySet()
err := failingPolicySet.AddHost(policyID1, fleet.PolicySetHost{
ID: 1,
@ -86,7 +90,7 @@ func TestTriggerFailingPoliciesWebhookBasic(t *testing.T) {
require.NoError(t, err)
mockClock := time.Now()
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
serverURL, err := url.Parse(ac.ServerSettings.ServerURL)
if err != nil {
return err
@ -137,7 +141,7 @@ func TestTriggerFailingPoliciesWebhookBasic(t *testing.T) {
requestBody = ""
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
serverURL, err := url.Parse(ac.ServerSettings.ServerURL)
if err != nil {
return err
@ -245,6 +249,10 @@ func TestTriggerFailingPoliciesWebhookTeam(t *testing.T) {
},
}
ds.AppConfigFunc = func(context.Context) (*fleet.AppConfig, error) {
return ac, nil
}
failingPolicySet := service.NewMemFailingPolicySet()
err := failingPolicySet.AddHost(1, fleet.PolicySetHost{
ID: 1,
@ -258,7 +266,7 @@ func TestTriggerFailingPoliciesWebhookTeam(t *testing.T) {
require.NoError(t, err)
now := time.Now()
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
serverURL, err := url.Parse(ac.ServerSettings.ServerURL)
if err != nil {
return err
@ -307,7 +315,7 @@ func TestTriggerFailingPoliciesWebhookTeam(t *testing.T) {
webhookBody = ""
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), ac, failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
err = policies.TriggerFailingPoliciesAutomation(context.Background(), ds, kitlog.NewNopLogger(), failingPolicySet, func(pol *fleet.Policy, cfg policies.FailingPolicyAutomationConfig) error {
serverURL, err := url.Parse(ac.ServerSettings.ServerURL)
if err != nil {
return err

View File

@ -15,8 +15,12 @@ func TriggerHostStatusWebhook(
ctx context.Context,
ds fleet.Datastore,
logger kitlog.Logger,
appConfig *fleet.AppConfig,
) error {
appConfig, err := ds.AppConfig(ctx)
if err != nil {
return ctxerr.Wrap(ctx, err, "getting app config")
}
if !appConfig.WebhookSettings.HostStatusWebhook.Enable {
return nil
}

View File

@ -37,12 +37,16 @@ func TestTriggerHostStatusWebhook(t *testing.T) {
},
}
ds.AppConfigFunc = func(context.Context) (*fleet.AppConfig, error) {
return ac, nil
}
ds.TotalAndUnseenHostsSinceFunc = func(ctx context.Context, daysCount int) (int, int, error) {
assert.Equal(t, 2, daysCount)
return 10, 6, nil
}
require.NoError(t, TriggerHostStatusWebhook(context.Background(), ds, kitlog.NewNopLogger(), ac))
require.NoError(t, TriggerHostStatusWebhook(context.Background(), ds, kitlog.NewNopLogger()))
assert.Equal(
t,
`{"data":{"days_unseen":2,"total_hosts":10,"unseen_hosts":6},"text":"More than 60.00% of your hosts have not checked into Fleet for more than 2 days. You've been sent this message because the Host status webhook is enabled in your Fleet instance."}`,
@ -55,6 +59,6 @@ func TestTriggerHostStatusWebhook(t *testing.T) {
return 10, 1, nil
}
require.NoError(t, TriggerHostStatusWebhook(context.Background(), ds, kitlog.NewNopLogger(), ac))
require.NoError(t, TriggerHostStatusWebhook(context.Background(), ds, kitlog.NewNopLogger()))
assert.Equal(t, "", requestBody)
}