Don't fire events for runner/wheel execution in the reactor

This creates more or less infinite loops if you have an entry in the reactor conf that acts on all events.

Yo dawg, I heard you liked events... :)
This commit is contained in:
Thomas Jackson 2014-09-17 17:34:51 -07:00
parent 14124526ce
commit 74b20cf4a1
2 changed files with 15 additions and 12 deletions

View File

@ -63,22 +63,23 @@ class AsyncClientMixin(object):
client = None
tag_prefix = None
def _proc_function(self, fun, low, user, tag, jid):
def _proc_function(self, fun, low, user, tag, jid, fire_event=True):
'''
Run this method in a multiprocess target to execute the function in a
multiprocess and fire the return data on the event bus
'''
salt.utils.daemonize()
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
listen=False)
data = {'fun': '{0}.{1}'.format(self.client, fun),
'jid': jid,
'user': user,
}
event.fire_event(data, tagify('new', base=tag))
if fire_event:
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
listen=False)
event.fire_event(data, tagify('new', base=tag))
try:
data['return'] = self.low(fun, low)
@ -92,11 +93,12 @@ class AsyncClientMixin(object):
)
data['success'] = False
data['user'] = user
event.fire_event(data, tagify('ret', base=tag))
if fire_event:
event.fire_event(data, tagify('ret', base=tag))
# this is a workaround because process reaping is defeating 0MQ linger
time.sleep(2.0) # delay so 0MQ event gets out before runner process reaped
def async(self, fun, low, user='UNKNOWN'):
def async(self, fun, low, user='UNKNOWN', fire_event=True):
'''
Execute the function in a multiprocess and return the event tag to use
to watch for the return
@ -106,6 +108,7 @@ class AsyncClientMixin(object):
proc = multiprocessing.Process(
target=self._proc_function,
args=(fun, low, user, tag, jid))
args=(fun, low, user, tag, jid),
kwargs={'fire_event': fire_event})
proc.start()
return {'tag': tag, 'jid': jid}

View File

@ -727,7 +727,7 @@ class ReactWrap(object):
'''
if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
return self.client_cache['runner'].async(fun, kwargs)
return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
def wheel(self, fun, **kwargs):
'''
@ -735,7 +735,7 @@ class ReactWrap(object):
'''
if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
return self.client_cache['wheel'].call_func(fun, **kwargs)
return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
class StateFire(object):