mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 08:55:24 +00:00
Release schedule lock when triggered run spans schedule interval (#10240)
This commit is contained in:
parent
82e81a7b06
commit
21c6733c1b
1
changes/bugfix-trigger-release-lock
Normal file
1
changes/bugfix-trigger-release-lock
Normal file
@ -0,0 +1 @@
|
||||
- Fixed a bug where `fleetctl trigger` doesn't release the schedule lock when the triggered run spans the regularly scheduled interval. This can prevent a second Fleet instance from using `fleetctl trigger` until the lock expires. This issue occurs infrequently under normal use. When it does occur, it resolves on its own in time; however, it may last up one full interval.
|
@ -225,7 +225,6 @@ func (s *Schedule) Start() {
|
||||
s.setIntervalStartedAt(newStart)
|
||||
schedTicker.Reset(s.getRemainingInterval(newStart))
|
||||
level.Debug(s.logger).Log("waiting", fmt.Sprintf("triggered run spanned schedule interval, new wait %v", s.getRemainingInterval(newStart)))
|
||||
continue
|
||||
}
|
||||
|
||||
cancelHold()
|
||||
|
@ -446,6 +446,69 @@ func TestScheduleHoldLock(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggerReleaseLock(t *testing.T) {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
defer cancelFn()
|
||||
|
||||
name := "test_trigger_release_lock"
|
||||
instanceID := "test_instance"
|
||||
schedInterval := 2 * time.Second
|
||||
jobRuntime := 2200 * time.Millisecond
|
||||
|
||||
locker := SetupMockLocker(name, instanceID, time.Now().Truncate(1*time.Second))
|
||||
err := locker.AddChannels(t, "unlocked")
|
||||
require.NoError(t, err)
|
||||
seedStats := fleet.CronStats{
|
||||
ID: 1,
|
||||
StatsType: fleet.CronStatsTypeScheduled,
|
||||
Name: name,
|
||||
Instance: instanceID,
|
||||
CreatedAt: time.Now().Truncate(1 * time.Second),
|
||||
UpdatedAt: time.Now().Truncate(1 * time.Second),
|
||||
|
||||
Status: fleet.CronStatsStatusCompleted,
|
||||
}
|
||||
statsStore := SetUpMockStatsStore(name, seedStats)
|
||||
|
||||
jobsRun := uint32(0)
|
||||
s := New(
|
||||
ctx, name, instanceID, schedInterval, locker, statsStore,
|
||||
WithJob("test_job", func(ctx context.Context) error {
|
||||
time.Sleep(jobRuntime)
|
||||
atomic.AddUint32(&jobsRun, 1)
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
s.Start()
|
||||
|
||||
<-time.After(1 * time.Second)
|
||||
_, err = s.Trigger()
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-time.After(4 * schedInterval):
|
||||
t.Errorf("timeout")
|
||||
t.FailNow()
|
||||
case <-locker.Unlocked:
|
||||
stats, err := statsStore.GetLatestCronStats(ctx, name)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, stats, 2)
|
||||
|
||||
statsByType := make(map[fleet.CronStatsType]fleet.CronStats)
|
||||
for _, s := range stats {
|
||||
statsByType[s.StatsType] = s
|
||||
}
|
||||
require.Len(t, statsByType, 2)
|
||||
require.Contains(t, statsByType, fleet.CronStatsTypeTriggered)
|
||||
require.Contains(t, statsByType, fleet.CronStatsTypeScheduled)
|
||||
|
||||
require.Equal(t, fleet.CronStatsStatusCompleted, statsByType[fleet.CronStatsTypeTriggered].Status)
|
||||
require.Equal(t, seedStats, statsByType[fleet.CronStatsTypeScheduled])
|
||||
}
|
||||
|
||||
require.True(t, locker.expiresAt.Before(time.Now()))
|
||||
}
|
||||
|
||||
func TestMultipleScheduleInstancesConfigChangesDS(t *testing.T) {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
@ -88,6 +88,7 @@ func (ml *MockLock) Unlock(ctx context.Context, name string, owner string) error
|
||||
if ml.Unlocked != nil {
|
||||
ml.Unlocked <- struct{}{}
|
||||
}
|
||||
ml.expiresAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user