mirror of
https://github.com/valitydev/salt.git
synced 2024-11-06 16:45:27 +00:00
Merge pull request #13974 from rallytime/schedule_unit_tests
Some unit tests for salt.utils.schedule
This commit is contained in:
commit
89bf75fe84
72
tests/unit/utils/schedule_test.py
Normal file
72
tests/unit/utils/schedule_test.py
Normal file
@ -0,0 +1,72 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
:codeauthor: :email:`Nicole Thomas <nicole@satlstack.com>`
|
||||
'''
|
||||
|
||||
# Import Salt Libs
|
||||
from salt.utils.schedule import Schedule
|
||||
|
||||
# Import Salt Testing Libs
|
||||
from salttesting import TestCase
|
||||
from salttesting.mock import MagicMock, patch
|
||||
from salttesting.helpers import ensure_in_syspath
|
||||
|
||||
ensure_in_syspath('../../')
|
||||
|
||||
|
||||
class ScheduleTestCase(TestCase):
|
||||
'''
|
||||
Unit tests for salt.utils.schedule module
|
||||
'''
|
||||
|
||||
def setUp(self):
|
||||
with patch('salt.utils.schedule.clean_proc_dir', MagicMock(return_value=None)):
|
||||
self.schedule = Schedule({}, {}, returners={})
|
||||
|
||||
# delete_job tests
|
||||
|
||||
def test_delete_job_exists(self):
|
||||
'''
|
||||
Tests ensuring the job exists and deleting it
|
||||
'''
|
||||
self.schedule.opts = {'schedule': {'foo': 'bar'}, 'pillar': ''}
|
||||
self.schedule.delete_job('foo')
|
||||
self.assertNotIn('foo', self.schedule.opts)
|
||||
|
||||
def test_delete_job_in_pillar(self):
|
||||
'''
|
||||
Tests deleting job in pillar
|
||||
'''
|
||||
self.schedule.opts = {'pillar': {'schedule': {'foo': 'bar'}}, 'schedule': ''}
|
||||
self.schedule.delete_job('foo')
|
||||
self.assertNotIn('foo', self.schedule.opts)
|
||||
|
||||
def test_delete_job_intervals(self):
|
||||
'''
|
||||
Tests removing job from intervals
|
||||
'''
|
||||
self.schedule.opts = {'pillar': '', 'schedule': ''}
|
||||
self.schedule.intervals = {'foo': 'bar'}
|
||||
self.schedule.delete_job('foo')
|
||||
self.assertNotIn('foo', self.schedule.intervals)
|
||||
|
||||
# add_job tests
|
||||
|
||||
def test_add_job_data_not_dict(self):
|
||||
'''
|
||||
Tests if data is a dictionary
|
||||
'''
|
||||
data = 'foo'
|
||||
self.assertRaises(ValueError, Schedule.add_job, self.schedule, data)
|
||||
|
||||
def test_add_job_multiple_jobs(self):
|
||||
'''
|
||||
Tests if more than one job is scheduled at a time
|
||||
'''
|
||||
data = {'key1': 'value1', 'key2': 'value2'}
|
||||
self.assertRaises(ValueError, Schedule.add_job, self.schedule, data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from integration import run_tests
|
||||
run_tests(ScheduleTestCase, needs_daemon=False)
|
Loading…
Reference in New Issue
Block a user