salt/tests/unit/runners/test_queue.py

70 lines
2.1 KiB
Python
Raw Normal View History

2017-02-28 19:30:43 +00:00
# -*- coding: utf-8 -*-
'''
unit tests for the cache runner
'''
# Import Python Libs
2017-12-15 01:21:00 +00:00
from __future__ import absolute_import, print_function, unicode_literals
2017-03-22 16:42:17 +00:00
import os
2017-02-28 19:30:43 +00:00
# Import Salt Testing Libs
2017-03-22 16:42:17 +00:00
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import skipIf, TestCase
from tests.support.mock import (
2017-02-28 19:30:43 +00:00
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
patch
)
# Import Salt Libs
import salt.runners.queue as queue_mod
2017-02-28 19:30:43 +00:00
2017-02-28 20:53:02 +00:00
2017-02-28 19:30:43 +00:00
@skipIf(NO_MOCK, NO_MOCK_REASON)
2017-03-22 16:42:17 +00:00
class QueueTest(TestCase, LoaderModuleMockMixin):
2017-02-28 19:30:43 +00:00
'''
Validate the queue runner
'''
2017-03-22 16:42:17 +00:00
def setup_loader_modules(self):
return {
queue_mod: {
'__opts__': {
'sock_dir': os.path.join(TMP, 'queue-runner-sock-dir'),
'transport': 'zeromq'
}
}
}
2017-02-28 19:30:43 +00:00
def test_insert_runner(self):
queue_insert = MagicMock(return_value=True)
with patch.object(queue_mod, 'insert', queue_insert):
queue_mod.insert_runner('test.stdout_print', queue='salt')
expected_call = {
'queue': 'salt',
'items': {
'fun': 'test.stdout_print',
'args': [],
'kwargs': {},
},
'backend': 'pgjsonb',
}
queue_insert.assert_called_once_with(**expected_call)
def test_process_runner(self):
ret = [{
'fun': 'test.stdout_print',
'args': [],
'kwargs': {},
}]
queue_pop = MagicMock(return_value=ret)
test_stdout_print = MagicMock(return_value=True)
2017-03-31 16:47:23 +00:00
with patch.dict(queue_mod.__salt__, {'test.stdout_print': test_stdout_print}):
with patch.object(queue_mod, 'pop', queue_pop):
queue_mod.process_runner(queue='salt')
2017-09-13 16:19:00 +00:00
queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')
2017-03-31 16:47:23 +00:00
test_stdout_print.assert_called_once_with()
2017-09-13 16:19:00 +00:00
queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')