diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py index 4e3d4ced45..16978630ee 100644 --- a/salt/utils/reactor.py +++ b/salt/utils/reactor.py @@ -14,8 +14,8 @@ import salt.state import salt.utils import salt.utils.cache import salt.utils.event -from salt.ext.six import string_types import salt.utils.process +from salt.ext.six import string_types from salt._compat import string_types log = logging.getLogger(__name__) @@ -30,7 +30,6 @@ class Reactor(multiprocessing.Process, salt.state.Compiler): def __init__(self, opts): multiprocessing.Process.__init__(self) salt.state.Compiler.__init__(self, opts) - self.wrap = ReactWrap(self.opts) local_minion_opts = self.opts.copy() local_minion_opts['file_client'] = 'local' @@ -86,7 +85,7 @@ class Reactor(multiprocessing.Process, salt.state.Compiler): continue if len(ropt) != 1: continue - key = next(iter(ropt.keys())) + key = ropt.iterkeys().next() val = ropt[key] if fnmatch.fnmatch(tag, key): if isinstance(val, string_types): @@ -107,7 +106,9 @@ class Reactor(multiprocessing.Process, salt.state.Compiler): if high: errors = self.verify_high(high) if errors: - return errors + log.error(('Unable to render reactions for event {0} due to ' + 'errors ({1}) in one or more of the sls files ({2})').format(tag, errors, reactors)) + return [] # We'll return nothing since there was an error chunks = self.order_chunks(self.compile_high_data(high)) return chunks @@ -123,10 +124,12 @@ class Reactor(multiprocessing.Process, salt.state.Compiler): Enter into the server loop ''' salt.utils.appendproctitle(self.__class__.__name__) + + # instantiate some classes inside our new process self.event = salt.utils.event.SaltEvent('master', self.opts['sock_dir']) - events = self.event.iter_events(full=True) - self.event.fire_event({}, 'salt/reactor/start') - for data in events: + self.wrap = ReactWrap(self.opts) + + for data in self.event.iter_events(full=True): reactors = self.list_reactors(data['tag']) if not reactors: continue @@ -147,6 +150,11 @@ class ReactWrap(object): if ReactWrap.client_cache is None: ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval']) + self.pool = salt.utils.process.ThreadPool( + self.opts['reactor_worker_threads'], # number of workers for runner/wheel + queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers + ) + def run(self, low): ''' Execute the specified function in the specified state by passing the @@ -155,14 +163,12 @@ class ReactWrap(object): l_fun = getattr(self, low['state']) try: f_call = salt.utils.format_call(l_fun, low) - ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {})) + l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {})) except Exception: log.error( 'Failed to execute {0}: {1}\n'.format(low['state'], l_fun), exc_info=True ) - return False - return ret def local(self, *args, **kwargs): ''' @@ -170,7 +176,7 @@ class ReactWrap(object): ''' if 'local' not in self.client_cache: self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file']) - return self.client_cache['local'].cmd_async(*args, **kwargs) + self.client_cache['local'].cmd_async(*args, **kwargs) cmd = local @@ -180,7 +186,7 @@ class ReactWrap(object): ''' if 'runner' not in self.client_cache: self.client_cache['runner'] = salt.runner.RunnerClient(self.opts) - return self.client_cache['runner'].async(fun, kwargs, fire_event=False) + self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs)) def wheel(self, fun, **kwargs): ''' @@ -188,4 +194,4 @@ class ReactWrap(object): ''' if 'wheel' not in self.client_cache: self.client_cache['wheel'] = salt.wheel.Wheel(self.opts) - return self.client_cache['wheel'].async(fun, kwargs, fire_event=False) + self.pool.fire_async(self.client_cache['wheel'].low, args=(fun, kwargs))