Merge pull request #25832 from DSRCompany/event_subscribe_fix

Event subscribe fix
This commit is contained in:
Mike Place 2015-07-29 09:41:14 -06:00
commit 3849790698
25 changed files with 126 additions and 111 deletions

View File

@ -139,7 +139,7 @@ class LocalClient(object):
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=not self.opts.get('__worker', False))
listen=False)
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.returners = salt.loader.returners(self.opts, self.functions)
@ -978,7 +978,7 @@ class LocalClient(object):
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=not self.opts.get('__worker', False))
listen=False)
# start listening for new events, before firing off the pings
event.connect_pub()
# since this is a new ping, no one has responded yet

View File

@ -46,7 +46,7 @@ class APIClient(object):
Provide a uniform method of accessing the various client interfaces in Salt
in the form of low-data data structures. For example:
'''
def __init__(self, opts=None):
def __init__(self, opts=None, listen=True):
if not opts:
opts = salt.config.client_config(
os.environ.get(
@ -63,7 +63,8 @@ class APIClient(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts)
opts=self.opts,
listen=listen)
def run(self, cmd):
'''

View File

@ -146,7 +146,7 @@ class SyncClientMixin(object):
'eauth': 'pam',
})
'''
event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=True)
job = self.master_call(**low)
ret_tag = salt.utils.event.tagify('ret', base=job['tag'])

View File

@ -52,13 +52,15 @@ def start(host, port=5959, tag='salt/engine/logstash'):
if __opts__.get('id').endswith('_master'):
event_bus = salt.utils.event.get_master_event(
__opts__,
__opts__['sock_dir'])
__opts__['sock_dir'],
listen=True)
else:
event_bus = salt.utils.event.get_event(
'minion',
transport=__opts__['transport'],
opts=__opts__,
sock_dir=__opts__['sock_dir'])
sock_dir=__opts__['sock_dir'],
listen=True)
log.debug('Logstash engine started')
while True:

View File

@ -113,7 +113,8 @@ def start(queue, profile=None, tag='salt/engine/sqs'):
if __opts__.get('__role') == 'master':
fire_master = salt.utils.event.get_master_event(
__opts__,
__opts__['sock_dir']).fire_event
__opts__['sock_dir'],
listen=False).fire_event
else:
fire_master = None

View File

@ -21,13 +21,15 @@ def start():
if __opts__['__role'] == 'master':
event_bus = salt.utils.event.get_master_event(
__opts__,
__opts__['sock_dir'])
__opts__['sock_dir'],
listen=True)
else:
event_bus = salt.utils.event.get_event(
'minion',
transport=__opts__['transport'],
opts=__opts__,
sock_dir=__opts__['sock_dir'])
sock_dir=__opts__['sock_dir'],
listen=True)
log.debug('test engine started')
while True:

View File

@ -204,7 +204,7 @@ def minion_mods(
ret.pack['__salt__'] = ret
if notify:
evt = salt.utils.event.get_event('minion', opts=opts)
evt = salt.utils.event.get_event('minion', opts=opts, listen=False)
evt.fire_event({'complete': True}, tag='/salt/minion/minion_mod_complete')
return ret

View File

@ -165,7 +165,7 @@ class Maintenance(multiprocessing.Process):
returners=self.returners)
self.ckminions = salt.utils.minions.CkMinions(self.opts)
# Make Event bus for firing
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False)
# Init any values needed by the git ext pillar
self.git_pillar = salt.daemons.masterapi.init_git_pillar(self.opts)
# Set up search object
@ -782,7 +782,7 @@ class AESFuncs(object):
:returns: Instance for handling AES operations
'''
self.opts = opts
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False)
self.serial = salt.payload.Serial(opts)
self.ckminions = salt.utils.minions.CkMinions(opts)
# Make a client
@ -1416,7 +1416,7 @@ class ClearFuncs(object):
self.opts = opts
self.key = key
# Create the event manager
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False)
# Make a client
self.local = salt.client.get_local_client(self.opts['conf_file'])
# Make an minion checker object

View File

@ -68,7 +68,7 @@ def fire_master(data, tag, preload=None):
# Usually, we can send the event via the minion, which is faster
# because it is already authenticated
try:
return salt.utils.event.MinionEvent(__opts__).fire_event(
return salt.utils.event.MinionEvent(__opts__, listen=False).fire_event(
{'data': data, 'tag': tag, 'events': None, 'pretag': None}, 'fire_master')
except Exception:
return False

View File

@ -58,7 +58,7 @@ def _mine_function_available(func):
def _mine_send(load, opts):
eventer = salt.utils.event.MinionEvent(opts)
eventer = salt.utils.event.MinionEvent(opts, listen=False)
event_ret = eventer.fire_event(load, '_minion_mine')
# We need to pause here to allow for the decoupled nature of
# events time to allow the mine to propagate

View File

@ -547,7 +547,7 @@ def refresh_modules(async=True):
# If we're going to block, first setup a listener
ret = __salt__['event.fire']({}, 'module_refresh')
else:
eventer = salt.utils.event.get_event('minion', opts=__opts__)
eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True)
ret = __salt__['event.fire']({'notify': True}, 'module_refresh')
# Wait for the finish event to fire
log.trace('refresh_modules waiting for module refresh to complete')

View File

@ -1781,7 +1781,8 @@ class Events(object):
'master',
sock_dir=self.opts['sock_dir'],
transport=self.opts['transport'],
opts=self.opts)
opts=self.opts,
listen=True)
stream = event.iter_events(full=True)
yield u'retry: {0}\n'.format(400)
@ -1954,7 +1955,8 @@ class WebsocketEndpoint(object):
'master',
sock_dir=self.opts['sock_dir'],
transport=self.opts['transport'],
opts=self.opts)
opts=self.opts,
listen=True)
stream = event.iter_events(full=True)
SaltInfo = event_processor.SaltInfo(handler)
while True:

View File

@ -260,6 +260,7 @@ class EventListener(object):
opts['sock_dir'],
opts['transport'],
opts=opts,
listen=True,
)
self.event.subscribe() # start listening for events immediately
@ -1608,7 +1609,8 @@ class WebhookSaltAPIHandler(SaltAPIHandler): # pylint: disable=W0223
'master',
self.application.opts['sock_dir'],
self.application.opts['transport'],
opts=self.application.opts)
opts=self.application.opts,
listen=False)
ret = self.event.fire_event({
'post': self.raw_data,

View File

@ -34,7 +34,8 @@ def list_(saltenv='base', test=None):
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__)
opts=__opts__,
listen=True)
__jid_event__.fire_event({}, 'salt/reactors/manage/list')
@ -60,7 +61,8 @@ def add(event, reactors, saltenv='base', test=None):
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__)
opts=__opts__,
listen=True)
__jid_event__.fire_event({'event': event,
'reactors': reactors},
@ -84,7 +86,8 @@ def delete(event, saltenv='base', test=None):
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__)
opts=__opts__,
listen=True)
__jid_event__.fire_event({'event': event}, 'salt/reactors/manage/delete')

View File

@ -264,7 +264,8 @@ def event(tagmatch='*', count=-1, quiet=False, sock_dir=None, pretty=False):
'master',
sock_dir or __opts__['sock_dir'],
__opts__['transport'],
opts=__opts__)
opts=__opts__,
listen=True)
while True:
ret = sevent.get_event(full=True)

View File

@ -492,7 +492,8 @@ def wait_for_event(
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__)
opts=__opts__,
listen=True)
del_counter = 0
starttime = time.time()

View File

@ -71,7 +71,7 @@ class AESReqServerMixin(object):
# other things needed for _auth
# Create the event manager
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False)
self.auto_key = salt.daemons.masterapi.AutoKey(self.opts)
# only create a con_cache-client if the con_cache is active

View File

@ -2044,7 +2044,7 @@ def check_auth(name, sock_dir=None, queue=None, timeout=300):
This function is called from a multiprocess instance, to wait for a minion
to become available to receive salt commands
'''
event = salt.utils.event.SaltEvent('master', sock_dir)
event = salt.utils.event.SaltEvent('master', sock_dir, listen=True)
starttime = time.mktime(time.localtime())
newtimeout = timeout
log.debug(

View File

@ -50,5 +50,5 @@ def fire_exception(exc, opts, job=None, node='minion'):
'''
if job is None:
job = {}
event = salt.utils.event.SaltEvent(node, opts=opts)
event = salt.utils.event.SaltEvent(node, opts=opts, listen=False)
event.fire_event(pack_exception(exc), '_salt_error')

View File

@ -122,8 +122,8 @@ def get_event(node, sock_dir=None, transport='zeromq', opts=None, listen=True):
# TODO: AIO core is separate from transport
if transport in ('zeromq', 'tcp'):
if node == 'master':
return MasterEvent(sock_dir, opts)
return SaltEvent(node, sock_dir, opts)
return MasterEvent(sock_dir, opts, listen=listen)
return SaltEvent(node, sock_dir, opts, listen=listen)
elif transport == 'raet':
import salt.utils.raetevent
return salt.utils.raetevent.RAETEvent(node,
@ -138,7 +138,7 @@ def get_master_event(opts, sock_dir, listen=True):
'''
# TODO: AIO core is separate from transport
if opts['transport'] in ('zeromq', 'tcp'):
return MasterEvent(sock_dir, opts)
return MasterEvent(sock_dir, opts, listen=listen)
elif opts['transport'] == 'raet':
import salt.utils.raetevent
return salt.utils.raetevent.MasterEvent(
@ -172,7 +172,7 @@ class SaltEvent(object):
RAET compatible
The base class used to manage salt events
'''
def __init__(self, node, sock_dir=None, opts=None):
def __init__(self, node, sock_dir=None, opts=None, listen=True):
self.serial = salt.payload.Serial({'serial': 'msgpack'})
self.context = zmq.Context()
self.poller = zmq.Poller()
@ -186,7 +186,8 @@ class SaltEvent(object):
if salt.utils.is_windows() and not hasattr(opts, 'ipc_mode'):
opts['ipc_mode'] = 'tcp'
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
self.subscribe()
if listen:
self.subscribe()
self.pending_events = []
self.__load_cache_regex()
@ -606,8 +607,8 @@ class MasterEvent(SaltEvent):
RAET compatible
Create a master event management object
'''
def __init__(self, sock_dir, opts=None):
super(MasterEvent, self).__init__('master', sock_dir, opts)
def __init__(self, sock_dir, opts=None, listen=True):
super(MasterEvent, self).__init__('master', sock_dir, opts, listen=listen)
class LocalClientEvent(MasterEvent):
@ -640,9 +641,9 @@ class MinionEvent(SaltEvent):
RAET compatible
Create a master event management object
'''
def __init__(self, opts):
def __init__(self, opts, listen=True):
super(MinionEvent, self).__init__(
'minion', sock_dir=opts.get('sock_dir', None), opts=opts)
'minion', sock_dir=opts.get('sock_dir', None), opts=opts, listen=listen)
class AsyncEventPublisher(object):
@ -904,7 +905,7 @@ class EventReturn(multiprocessing.Process):
signal.signal(signal.SIGTERM, self.sig_stop)
salt.utils.appendproctitle(self.__class__.__name__)
self.event = get_event('master', opts=self.opts)
self.event = get_event('master', opts=self.opts, listen=True)
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
try:
@ -947,7 +948,6 @@ class StateFire(object):
'''
def __init__(self, opts, auth=None):
self.opts = opts
self.event = SaltEvent(opts, 'minion')
if not auth:
self.auth = salt.crypt.SAuth(self.opts)
else:

View File

@ -40,16 +40,15 @@ class RAETEvent(object):
'''
self.node = node # application kind see kinds.APPL_KIND_NAMES
self.sock_dir = sock_dir
self.listen = listen
if opts is None:
opts = {}
self.opts = opts
self.stack = None
self.ryn = 'manor' # remote yard name
self.connected = False
self.__prep_stack()
self.__prep_stack(listen)
def __prep_stack(self):
def __prep_stack(self, listen):
'''
Prepare the stack objects
'''
@ -59,7 +58,8 @@ class RAETEvent(object):
else:
self.stack = transport.jobber_stack = self._setup_stack(ryn=self.ryn)
log.debug("RAETEvent Using Jobber Stack at = {0}\n".format(self.stack.ha))
self.connect_pub()
if listen:
self.subscribe()
def _setup_stack(self, ryn='manor'):
kind = self.opts.get('__role', '') # opts optional for master
@ -112,7 +112,8 @@ class RAETEvent(object):
'''
Included for compat with zeromq events, not required
'''
return
if not self.connected:
self.connect_pub()
def unsubscribe(self, tag=None):
'''
@ -124,16 +125,15 @@ class RAETEvent(object):
'''
Establish the publish connection
'''
if not self.connected and self.listen:
try:
route = {'dst': (None, self.ryn, 'event_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass
try:
route = {'dst': (None, self.ryn, 'event_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass
def connect_pull(self, timeout=1000):
'''
@ -156,7 +156,8 @@ class RAETEvent(object):
IF wait is 0 then block forever.
'''
self.connect_pub()
if not self.connected:
self.connect_pub()
start = time.time()
while True:
self.stack.serviceAll()
@ -180,7 +181,8 @@ class RAETEvent(object):
'''
Get the raw event msg without blocking or any other niceties
'''
self.connect_pub()
if not self.connected:
self.connect_pub()
self.stack.serviceAll()
if self.stack.rxMsgs:
msg, sender = self.stack.rxMsgs.popleft()
@ -204,7 +206,6 @@ class RAETEvent(object):
Send a single event into the publisher with paylod dict "data" and event
identifier "tag"
'''
self.connect_pub()
# Timeout is retained for compat with zeromq events
if not str(tag): # no empty tags allowed
raise ValueError('Empty tag.')
@ -272,18 +273,17 @@ class PresenceEvent(MasterEvent):
'''
Establish the publish connection
'''
if not self.connected and self.listen:
try:
route = {'dst': (None, self.ryn, 'presence_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route}
if self.state:
msg['data'] = {'state': self.state}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass
try:
route = {'dst': (None, self.ryn, 'presence_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route}
if self.state:
msg['data'] = {'state': self.state}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass
class StatsEvent(MasterEvent):
@ -297,13 +297,12 @@ class StatsEvent(MasterEvent):
'''
Establish the publish connection
'''
if not self.connected and self.listen:
try:
route = {'dst': (self.estate, None, 'stats_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route, 'tag': self.tag}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass
try:
route = {'dst': (self.estate, None, 'stats_req'),
'src': (None, self.stack.local.name, None)}
msg = {'route': route, 'tag': self.tag}
self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid)
self.stack.serviceAll()
self.connected = True
except Exception:
pass

View File

@ -361,7 +361,7 @@ class Schedule(object):
schedule = self.opts['pillar']['schedule']
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': schedule},
tag='/salt/minion/minion_schedule_delete_complete')
@ -397,7 +397,7 @@ class Schedule(object):
self.opts['schedule'].update(data)
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': self.opts['schedule']},
tag='/salt/minion/minion_schedule_add_complete')
@ -416,7 +416,7 @@ class Schedule(object):
schedule = self.opts['schedule']
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': schedule},
tag='/salt/minion/minion_schedule_enabled_job_complete')
@ -437,7 +437,7 @@ class Schedule(object):
schedule = self.opts['schedule']
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': schedule},
tag='/salt/minion/minion_schedule_disabled_job_complete')
@ -507,7 +507,7 @@ class Schedule(object):
self.opts['schedule']['enabled'] = True
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': self.opts['schedule']},
tag='/salt/minion/minion_schedule_enabled_complete')
@ -518,7 +518,7 @@ class Schedule(object):
self.opts['schedule']['enabled'] = False
# Fire the complete event back along with updated list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': self.opts['schedule']},
tag='/salt/minion/minion_schedule_disabled_complete')
@ -554,7 +554,7 @@ class Schedule(object):
schedule.update(self.opts['pillar']['schedule'])
# Fire the complete event back along with the list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True, 'schedule': schedule},
tag='/salt/minion/minion_schedule_list_complete')
@ -565,7 +565,7 @@ class Schedule(object):
self.persist()
# Fire the complete event back along with the list of schedule
evt = salt.utils.event.get_event('minion', opts=self.opts)
evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False)
evt.fire_event({'complete': True},
tag='/salt/minion/minion_schedule_saved')

View File

@ -107,7 +107,8 @@ def listen(opts):
opts['node'],
sock_dir=opts['sock_dir'],
transport=opts['transport'],
opts=opts
opts=opts,
listen=True
)
check_access_and_print_warning(opts['sock_dir'])
print(event.puburi)

View File

@ -29,7 +29,7 @@ class EventModuleTest(integration.ModuleCase):
events = Queue()
def get_event(events):
me = event.MasterEvent(self.master_opts['sock_dir'])
me = event.MasterEvent(self.master_opts['sock_dir'], listen=True)
events.put_nowait(
me.get_event(wait=10, tag='salttest', full=False)
)
@ -62,7 +62,7 @@ class EventModuleTest(integration.ModuleCase):
events = Queue()
def get_event(events):
me = event.MinionEvent(self.minion_opts)
me = event.MinionEvent(self.minion_opts, listen=True)
events.put_nowait(
me.get_event(wait=10, tag='salttest', full=False)
)
@ -91,7 +91,7 @@ class EventModuleTest(integration.ModuleCase):
events = Queue()
def get_event(events):
me = event.MinionEvent(self.sub_minion_opts)
me = event.MinionEvent(self.sub_minion_opts, listen=True)
events.put_nowait(
me.get_event(wait=10, tag='salttest', full=False)
)

View File

@ -65,7 +65,7 @@ class EventSender(Process):
self.wait = wait
def run(self):
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=False)
time.sleep(self.wait)
me.fire_event(self.data, self.tag)
# Wait a few seconds before tearing down the zmq context
@ -99,7 +99,7 @@ class TestSaltEvent(TestCase):
self.assertEqual(data[key], evt[key], msg)
def test_master_event(self):
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=False)
self.assertEqual(
me.puburi, 'ipc://{0}'.format(
os.path.join(SOCK_DIR, 'master_event_pub.ipc')
@ -115,7 +115,7 @@ class TestSaltEvent(TestCase):
def test_minion_event(self):
opts = dict(id='foo', sock_dir=SOCK_DIR)
id_hash = hashlib.md5(opts['id']).hexdigest()[:10]
me = event.MinionEvent(opts)
me = event.MinionEvent(opts, listen=False)
self.assertEqual(
me.puburi,
'ipc://{0}'.format(
@ -135,12 +135,12 @@ class TestSaltEvent(TestCase):
def test_minion_event_tcp_ipc_mode(self):
opts = dict(id='foo', ipc_mode='tcp')
me = event.MinionEvent(opts)
me = event.MinionEvent(opts, listen=False)
self.assertEqual(me.puburi, 'tcp://127.0.0.1:4510')
self.assertEqual(me.pulluri, 'tcp://127.0.0.1:4511')
def test_minion_event_no_id(self):
me = event.MinionEvent(dict(sock_dir=SOCK_DIR))
me = event.MinionEvent(dict(sock_dir=SOCK_DIR), listen=False)
id_hash = hashlib.md5('').hexdigest()[:10]
self.assertEqual(
me.puburi,
@ -162,7 +162,7 @@ class TestSaltEvent(TestCase):
def test_event_subscription(self):
'''Test a single event is received'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
@ -171,7 +171,7 @@ class TestSaltEvent(TestCase):
def test_event_timeout(self):
'''Test no event is received if the timeout is reached'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
@ -182,7 +182,7 @@ class TestSaltEvent(TestCase):
def test_event_no_timeout(self):
'''Test no wait timeout, we should block forever, until we get one '''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
@ -192,7 +192,7 @@ class TestSaltEvent(TestCase):
def test_event_subscription_matching(self):
'''Test a subscription startswith matching'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
@ -201,7 +201,7 @@ class TestSaltEvent(TestCase):
def test_event_subscription_matching_all(self):
'''Test a subscription matching'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='')
@ -210,7 +210,7 @@ class TestSaltEvent(TestCase):
def test_event_not_subscribed(self):
'''Test get event ignores non-subscribed events'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
me.fire_event({'data': 'foo1'}, 'evt2')
@ -220,7 +220,7 @@ class TestSaltEvent(TestCase):
def test_event_multiple_subscriptions(self):
'''Test multiple subscriptions do not interfere'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
me.fire_event({'data': 'foo1'}, 'evt2')
@ -230,9 +230,9 @@ class TestSaltEvent(TestCase):
def test_event_multiple_clients(self):
'''Test event is received by multiple clients'''
with eventpublisher_process():
me1 = event.MasterEvent(SOCK_DIR)
me1 = event.MasterEvent(SOCK_DIR, listen=True)
me1.subscribe()
me2 = event.MasterEvent(SOCK_DIR)
me2 = event.MasterEvent(SOCK_DIR, listen=True)
me2.subscribe()
me1.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me1.get_event(tag='evt1')
@ -245,7 +245,7 @@ class TestSaltEvent(TestCase):
def test_event_nested_subs(self):
'''Test nested event subscriptions do not drop events, issue #8580'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
@ -272,7 +272,7 @@ class TestSaltEvent(TestCase):
'''Test nested event subscriptions do not drop events, get event for all tags'''
# Show why not to call get_event(tag='')
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
@ -284,7 +284,7 @@ class TestSaltEvent(TestCase):
def test_event_many(self):
'''Test a large number of events, one at a time'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
for i in range(500):
me.fire_event({'data': '{0}'.format(i)}, 'testevents')
@ -294,7 +294,7 @@ class TestSaltEvent(TestCase):
def test_event_many_backlog(self):
'''Test a large number of events, send all then recv all'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
# Must not exceed zmq HWM
for i in range(500):
@ -308,7 +308,7 @@ class TestSaltEvent(TestCase):
def test_send_master_event(self):
'''Tests that sending an event through fire_master generates expected event'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me = event.MasterEvent(SOCK_DIR, listen=True)
me.subscribe()
data = {'data': 'foo1'}
me.fire_master(data, 'test_master')
@ -335,7 +335,7 @@ class TestAsyncEventPublisher(AsyncTestCase):
def test_event_subscription(self):
'''Test a single event is received'''
me = event.MinionEvent({'sock_dir': SOCK_DIR})
me = event.MinionEvent({'sock_dir': SOCK_DIR}, listen=True)
me.fire_event({'data': 'foo1'}, 'evt1')
self.wait()
evt1 = me.get_event(tag='evt1')