Merge pull request #49104 from garethgreenaway/49082_scheduler_seconds_minute_hours_etc_fix

[fluorine] Fixes to scheduler for jobs with seconds, minutes, etc.
This commit is contained in:
Mike Place 2018-08-17 18:06:08 +02:00 committed by GitHub
commit bfa7775d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 300 additions and 28 deletions

View File

@ -1311,6 +1311,12 @@ class Schedule(object):
else:
data['run'] = True
def _chop_ms(dt):
'''
Remove the microseconds from a datetime object
'''
return dt - datetime.timedelta(microseconds=dt.microsecond)
schedule = self._get_schedule()
if not isinstance(schedule, dict):
raise ValueError('Schedule must be of type dict.')
@ -1336,6 +1342,7 @@ class Schedule(object):
# Clear these out between runs
for item in ['_continue',
'_error',
'_skipped',
'_skip_reason']:
if item in data:
del data[item]
@ -1431,7 +1438,7 @@ class Schedule(object):
if '_error' in data and data['_error']:
continue
seconds = int((data['_next_fire_time'] - now).total_seconds())
seconds = int((_chop_ms(data['_next_fire_time']) - _chop_ms(now)).total_seconds())
# If there is no job specific splay available,
# grab the global which defaults to None.
@ -1564,45 +1571,54 @@ class Schedule(object):
'by {0} seconds)'.format(abs(seconds))
try:
# Job is disabled, continue
if 'enabled' in data and not data['enabled']:
log.debug('Job: %s is disabled', job_name)
data['_skip_reason'] = 'disabled'
data['_skipped_time'] = now
data['_skipped'] = True
continue
if run:
# Job is disabled, continue
if 'enabled' in data and not data['enabled']:
log.debug('Job: %s is disabled', job_name)
data['_skip_reason'] = 'disabled'
data['_skipped_time'] = now
data['_skipped'] = True
continue
if 'jid_include' not in data or data['jid_include']:
data['jid_include'] = True
log.debug('schedule: Job %s was scheduled with jid_include, '
'adding to cache (jid_include defaults to True)',
job_name)
if 'maxrunning' in data:
log.debug('schedule: Job %s was scheduled with a max '
'number of %s', job_name, data['maxrunning'])
else:
log.info('schedule: maxrunning parameter was not specified for '
'job %s, defaulting to 1.', job_name)
data['maxrunning'] = 1
if 'jid_include' not in data or data['jid_include']:
data['jid_include'] = True
log.debug('schedule: Job %s was scheduled with jid_include, '
'adding to cache (jid_include defaults to True)',
job_name)
if 'maxrunning' in data:
log.debug('schedule: Job %s was scheduled with a max '
'number of %s', job_name, data['maxrunning'])
else:
log.info('schedule: maxrunning parameter was not specified for '
'job %s, defaulting to 1.', job_name)
data['maxrunning'] = 1
if not self.standalone:
data['run'] = run
data = self._check_max_running(func,
data,
self.opts,
now)
run = data['run']
if not self.standalone:
data['run'] = run
data = self._check_max_running(func,
data,
self.opts,
now)
run = data['run']
# Check run again, just in case _check_max_running
# set run to False
if run:
log.info('Running scheduled job: %s%s', job_name, miss_msg)
self._run_job(func, data)
finally:
# Only set _last_run if the job ran
if run:
data['_last_run'] = now
data['_splay'] = None
if '_seconds' in data:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
if self.standalone:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
elif '_skipped' in data and data['_skipped']:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
elif run:
data['_next_fire_time'] = now + datetime.timedelta(seconds=data['_seconds'])
def _run_job(self, func, data):
job_dry_run = data.get('dry_run', False)

View File

@ -9,6 +9,7 @@ import os
import random
import dateutil.parser as dateutil_parser
import datetime
# Import Salt Testing libs
from tests.support.case import ModuleCase
@ -587,3 +588,216 @@ class SchedulerEvalTest(ModuleCase, SaltReturnAssertsMixin):
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_splay')
self.assertEqual(ret['_last_run'], run_time)
def test_eval_seconds(self):
'''
verify that scheduled job run mutiple times with seconds
'''
job = {
'schedule': {
'job_eval_seconds': {
'function': 'test.ping',
'seconds': '30',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_seconds')
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_seconds')
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:30pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:00:30pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_seconds')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:01:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:01:00pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_seconds')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:01:30pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:01:30pm')
next_run_time = run_time + datetime.timedelta(seconds=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_seconds')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
def test_eval_minutes(self):
'''
verify that scheduled job run mutiple times with minutes
'''
job = {
'schedule': {
'job_eval_minutes': {
'function': 'test.ping',
'minutes': '30',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(minutes=30)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_minutes')
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_minutes')
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:30:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:30:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_minutes')
self.assertEqual(ret['_last_run'], run_time)
# eval at 3:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 3:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_minutes')
self.assertEqual(ret['_last_run'], run_time)
# eval at 3:30:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 3:30:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_minutes')
self.assertEqual(ret['_last_run'], run_time)
def test_eval_hours(self):
'''
verify that scheduled job run mutiple times with hours
'''
job = {
'schedule': {
'job_eval_hours': {
'function': 'test.ping',
'hours': '2',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(hours=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_hours')
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 2:00:01pm, will not run.
run_time = dateutil_parser.parse('11/29/2017 2:00:01pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_hours')
self.assertNotIn('_last_run', ret)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 4:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 4:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_hours')
self.assertEqual(ret['_last_run'], run_time)
# eval at 6:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 6:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_hours')
self.assertEqual(ret['_last_run'], run_time)
# eval at 8:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 8:00:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_hours')
self.assertEqual(ret['_last_run'], run_time)
def test_eval_days(self):
'''
verify that scheduled job run mutiple times with days
'''
job = {
'schedule': {
'job_eval_days': {
'function': 'test.ping',
'days': '2',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 11/23/2017 2:00pm to prime, simulate minion start up.
run_time = dateutil_parser.parse('11/23/2017 2:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/25/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/25/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/26/2017 2:00:00pm, will not run.
run_time = dateutil_parser.parse('11/26/2017 2:00:00pm')
last_run_time = run_time - datetime.timedelta(days=1)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_last_run'], last_run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/27/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/27/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/28/2017 2:00:00pm, will not run.
run_time = dateutil_parser.parse('11/28/2017 2:00:00pm')
last_run_time = run_time - datetime.timedelta(days=1)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_last_run'], last_run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)
# eval at 11/29/2017 2:00:00pm, will run.
run_time = dateutil_parser.parse('11/29/2017 2:00:00pm')
next_run_time = run_time + datetime.timedelta(days=2)
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job_eval_days')
self.assertEqual(ret['_last_run'], run_time)
self.assertEqual(ret['_next_fire_time'], next_run_time)

View File

@ -254,3 +254,45 @@ class SchedulerSkipTest(ModuleCase, SaltReturnAssertsMixin):
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertEqual(ret['_last_run'], run_time)
def test_run_seconds_skip(self):
'''
verify that scheduled job is skipped during the specified range
'''
job = {
'schedule': {
'job1': {
'function': 'test.ping',
'seconds': '10',
}
}
}
# Add job to schedule
self.schedule.opts.update(job)
# eval at 2:00pm, to prime the scheduler
run_time = dateutil_parser.parse('11/29/2017 2:00pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
# eval at 2:00:10pm
run_time = dateutil_parser.parse('11/29/2017 2:00:10pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
# Skip at 2:00:20pm
run_time = dateutil_parser.parse('11/29/2017 2:00:20pm')
self.schedule.skip_job('job1', {'time': run_time.strftime('%Y-%m-%dT%H:%M:%S'),
'time_fmt': '%Y-%m-%dT%H:%M:%S'})
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertIn('_next_fire_time', ret)
self.assertEqual(ret['_skip_reason'], 'skip_explicit')
self.assertEqual(ret['_skipped_time'], run_time)
# Run at 2:00:30pm
run_time = dateutil_parser.parse('11/29/2017 2:00:30pm')
self.schedule.eval(now=run_time)
ret = self.schedule.job_status('job1')
self.assertIn('_last_run', ret)