Although the PR says it was merged, it wasn't and the reactor is currently broken because of the partial merge of the previous refactor
This commit is contained in:
Thomas Jackson 2014-12-23 07:45:27 -08:00
parent ddc7930a9b
commit 328f39a52c

View File

@ -14,8 +14,8 @@ import salt.state
import salt.utils
import salt.utils.cache
import salt.utils.event
from salt.ext.six import string_types
import salt.utils.process
from salt.ext.six import string_types
from salt._compat import string_types
log = logging.getLogger(__name__)
@ -30,7 +30,6 @@ class Reactor(multiprocessing.Process, salt.state.Compiler):
def __init__(self, opts):
multiprocessing.Process.__init__(self)
salt.state.Compiler.__init__(self, opts)
self.wrap = ReactWrap(self.opts)
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
@ -86,7 +85,7 @@ class Reactor(multiprocessing.Process, salt.state.Compiler):
continue
if len(ropt) != 1:
continue
key = next(iter(ropt.keys()))
key = ropt.iterkeys().next()
val = ropt[key]
if fnmatch.fnmatch(tag, key):
if isinstance(val, string_types):
@ -107,7 +106,9 @@ class Reactor(multiprocessing.Process, salt.state.Compiler):
if high:
errors = self.verify_high(high)
if errors:
return errors
log.error(('Unable to render reactions for event {0} due to '
'errors ({1}) in one or more of the sls files ({2})').format(tag, errors, reactors))
return [] # We'll return nothing since there was an error
chunks = self.order_chunks(self.compile_high_data(high))
return chunks
@ -123,10 +124,12 @@ class Reactor(multiprocessing.Process, salt.state.Compiler):
Enter into the server loop
'''
salt.utils.appendproctitle(self.__class__.__name__)
# instantiate some classes inside our new process
self.event = salt.utils.event.SaltEvent('master', self.opts['sock_dir'])
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/reactor/start')
for data in events:
self.wrap = ReactWrap(self.opts)
for data in self.event.iter_events(full=True):
reactors = self.list_reactors(data['tag'])
if not reactors:
continue
@ -147,6 +150,11 @@ class ReactWrap(object):
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
self.pool = salt.utils.process.ThreadPool(
self.opts['reactor_worker_threads'], # number of workers for runner/wheel
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
)
def run(self, low):
'''
Execute the specified function in the specified state by passing the
@ -155,14 +163,12 @@ class ReactWrap(object):
l_fun = getattr(self, low['state'])
try:
f_call = salt.utils.format_call(l_fun, low)
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
except Exception:
log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
exc_info=True
)
return False
return ret
def local(self, *args, **kwargs):
'''
@ -170,7 +176,7 @@ class ReactWrap(object):
'''
if 'local' not in self.client_cache:
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
return self.client_cache['local'].cmd_async(*args, **kwargs)
self.client_cache['local'].cmd_async(*args, **kwargs)
cmd = local
@ -180,7 +186,7 @@ class ReactWrap(object):
'''
if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))
def wheel(self, fun, **kwargs):
'''
@ -188,4 +194,4 @@ class ReactWrap(object):
'''
if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))