* Test both master and minion events firing
 * Allow targeting the minions in `tests.integration.ModuleCase.run_function()`
 * The above item allows to also test events firing using `tcp` as `ipc_mode` which is being used by the tests `sub_minion`
This commit is contained in:
Pedro Algarvio 2012-09-01 07:29:06 +01:00
parent bc988be9bb
commit accb15d6ee
4 changed files with 145 additions and 22 deletions

View File

@ -52,23 +52,23 @@ class SaltEvent(object):
sock_dir,
'master_event_pull.ipc'
))
return puburi, pulluri
if kwargs.get('ipc_mode', '') == 'tcp':
puburi = 'tcp://127.0.0.1:{0}'.format(
kwargs.get('tcp_pub_port', 4510)
)
pulluri = 'tcp://127.0.0.1:{0}'.format(
kwargs.get('tcp_pull_port', 4511)
)
else:
puburi = 'ipc://{0}'.format(os.path.join(
sock_dir,
'minion_event_{0}_pub.ipc'.format(kwargs.get('id', ''))
))
pulluri = 'ipc://{0}'.format(os.path.join(
sock_dir,
'minion_event_{0}_pull.ipc'.format(kwargs.get('id', ''))
))
if kwargs.get('ipc_mode', '') == 'tcp':
puburi = 'tcp://127.0.0.1:{0}'.format(
kwargs.get('tcp_pub_port', 4510)
)
pulluri = 'tcp://127.0.0.1:{0}'.format(
kwargs.get('tcp_pull_port', 4511)
)
else:
puburi = 'ipc://{0}'.format(os.path.join(
sock_dir,
'minion_event_{0}_pub.ipc'.format(kwargs.get('id', ''))
))
pulluri = 'ipc://{0}'.format(os.path.join(
sock_dir,
'minion_event_{0}_pull.ipc'.format(kwargs.get('id', ''))
))
log.debug(
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
)

View File

@ -240,21 +240,22 @@ class ModuleCase(TestCase):
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
def run_function(self, function, arg=(), **kwargs):
def run_function(self, function, arg=(), minion_tgt='minion', **kwargs):
'''
Run a single salt function and condition the return down to match the
behavior of the raw function call
'''
know_to_return_none = ('file.chown', 'file.chgrp')
orig = self.client.cmd(
'minion', function, arg, timeout=100, kwarg=kwargs
minion_tgt, function, arg, timeout=100, kwarg=kwargs
)
if orig['minion'] is None and function not in know_to_return_none:
if orig[minion_tgt] is None and function not in know_to_return_none:
self.skipTest(
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get \'{0}\' from '
'the minion'.format(function)
'the minion \'{1}\''.format(function, minion_tgt)
)
return orig['minion']
return orig[minion_tgt]
def state_result(self, ret):
'''
@ -278,11 +279,20 @@ class ModuleCase(TestCase):
)
@property
def master_opts(self):
def sub_minion_opts(self):
'''
Return the options used for the minion
'''
return salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'sub_minion')
)
@property
def master_opts(self):
'''
Return the options used for the minion
'''
return salt.config.master_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)

View File

@ -20,3 +20,6 @@ integration.test: True
grains:
test_grain: spam
role: sub
ipc_mode: tcp

View File

@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
'''
tests.integration.modules.event
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:copyright: © 2012 UfSoft.org - :email:`Pedro Algarvio (pedro@algarvio.me)`
:license: Apache 2.0, see LICENSE for more details.
'''
import time
import threading
from Queue import Queue, Empty
import integration
from salt.utils import event
class EventModuleTest(integration.ModuleCase):
def test_event_fire_master(self):
events = Queue()
def get_event(events):
me = event.MasterEvent(self.master_opts['sock_dir'])
events.put_nowait(
me.get_event(wait=5, tag='salttest', full=False)
)
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
ret = self.run_function(
'event.fire_master',
['event.fire_master: just test it!!!!', 'salttest']
)
self.assertTrue(ret)
eventfired = events.get(block=True, timeout=1)
self.assertIn('event.fire_master: just test it!!!!', eventfired)
ret = self.run_function(
'event.fire_master',
['event.fire_master: just test it!!!!', 'salttest-miss']
)
self.assertTrue(ret)
with self.assertRaises(Empty):
eventfired = events.get(block=True, timeout=1)
def test_event_fire(self):
events = Queue()
def get_event(events):
me = event.MinionEvent(**self.minion_opts)
events.put_nowait(
me.get_event(wait=5, tag='salttest', full=False)
)
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
ret = self.run_function(
'event.fire', ['event.fire: just test it!!!!', 'salttest']
)
self.assertTrue(ret)
eventfired = events.get(block=True, timeout=1)
self.assertIn('event.fire: just test it!!!!', eventfired)
ret = self.run_function(
'event.fire', ['event.fire: just test it!!!!', 'salttest-miss']
)
self.assertTrue(ret)
with self.assertRaises(Empty):
eventfired = events.get(block=True, timeout=1)
def test_event_fire_ipc_mode_tcp(self):
events = Queue()
def get_event(events):
me = event.MinionEvent(**self.sub_minion_opts)
events.put_nowait(
me.get_event(wait=5, tag='salttest', full=False)
)
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
ret = self.run_function(
'event.fire', ['event.fire: just test it!!!!', 'salttest'],
minion_tgt='sub_minion'
)
self.assertTrue(ret)
eventfired = events.get(block=True, timeout=1)
self.assertIn('event.fire: just test it!!!!', eventfired)
ret = self.run_function(
'event.fire', ['event.fire: just test it!!!!', 'salttest-miss'],
minion_tgt='sub_minion'
)
self.assertTrue(ret)
with self.assertRaises(Empty):
eventfired = events.get(block=True, timeout=1)
if __name__ == '__main__':
from integration import run_tests
run_tests(EventModuleTest)