From 32d01ee6c6bd4a7070be158c7188ede14cb34b60 Mon Sep 17 00:00:00 2001 From: Erik Johnson Date: Thu, 4 Dec 2014 14:29:44 -0600 Subject: [PATCH 1/4] Revert "Pylint cleanup for threadpool" This reverts commit c93ca9aec2106a8dcc1404ae542c5889e8c2df15. --- salt/utils/process.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/salt/utils/process.py b/salt/utils/process.py index b89fe6c96e..734d6a03f7 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -158,11 +158,7 @@ class ThreadPool(object): # intentionally not called "apply_async" since we aren't keeping track of # the return at all, if we want to make this API compatible with multiprocessing # threadpool we can in the future, and we won't have to worry about name collision - def fire_async(self, func, args=None, kwargs=None): - if args is None: - args = [] - if kwargs is None: - kwargs = {} + def fire_async(self, func, args=[], kwargs={}): try: self._job_queue.put((func, args, kwargs), False) return True From 82b5567c9248f345e2b388e3d238d51b7a0d9640 Mon Sep 17 00:00:00 2001 From: Erik Johnson Date: Thu, 4 Dec 2014 14:29:56 -0600 Subject: [PATCH 2/4] Revert "Remove some un-used variables" This reverts commit 22ed10108ee1cf3b29af9220d6ba6fa57f532eff. --- salt/utils/event.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/salt/utils/event.py b/salt/utils/event.py index c2de64d897..da36336f71 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -253,7 +253,7 @@ class SaltEvent(object): mtag = raw[0:20].rstrip('|') mdata = raw[20:] else: # new style - mtag, _, mdata = raw.partition(TAGEND) # split tag from data + mtag, sep, mdata = raw.partition(TAGEND) # split tag from data data = serial.loads(mdata) return mtag, data @@ -729,7 +729,7 @@ class ReactWrap(object): cmd = local - def runner(self, _, **kwargs): + def runner(self, fun, **kwargs): ''' Wrap RunnerClient for executing :ref:`runner modules ` ''' @@ -737,7 +737,7 @@ class ReactWrap(object): self.client_cache['runner'] = salt.runner.RunnerClient(self.opts) self.pool.fire_async(self.client_cache['runner'].low, kwargs) - def wheel(self, _, **kwargs): + def wheel(self, fun, **kwargs): ''' Wrap Wheel to enable executing :ref:`wheel modules ` ''' From ba7f08d047afffc2b87950910cac0cdd48b76302 Mon Sep 17 00:00:00 2001 From: Erik Johnson Date: Thu, 4 Dec 2014 14:30:05 -0600 Subject: [PATCH 3/4] Revert "Remove "fire_event" from AsyncClientMixin, since this was only added to remove infinite recusion in the reactor-- which is now not calling this API" This reverts commit 0b19ec7385e68914041f21959f50009a8a6f45a2. --- salt/client/mixins.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/salt/client/mixins.py b/salt/client/mixins.py index a16240f6c8..f225362bdc 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -62,7 +62,7 @@ class AsyncClientMixin(object): client = None tag_prefix = None - def _proc_function(self, fun, low, user, tag, jid): + def _proc_function(self, fun, low, user, tag, jid, fire_event=True): ''' Run this method in a multiprocess target to execute the function in a multiprocess and fire the return data on the event bus @@ -72,13 +72,14 @@ class AsyncClientMixin(object): 'jid': jid, 'user': user, } - event = salt.utils.event.get_event( - 'master', - self.opts['sock_dir'], - self.opts['transport'], - opts=self.opts, - listen=False) - event.fire_event(data, tagify('new', base=tag)) + if fire_event: + event = salt.utils.event.get_event( + 'master', + self.opts['sock_dir'], + self.opts['transport'], + opts=self.opts, + listen=False) + event.fire_event(data, tagify('new', base=tag)) try: data['return'] = self.low(fun, low) @@ -93,12 +94,13 @@ class AsyncClientMixin(object): data['success'] = False data['user'] = user - event.fire_event(data, tagify('ret', base=tag)) - # if we fired an event, make sure to delete the event object. - # This will ensure that we call destroy, which will do the 0MQ linger - del event + if fire_event: + event.fire_event(data, tagify('ret', base=tag)) + # if we fired an event, make sure to delete the event object. + # This will ensure that we call destroy, which will do the 0MQ linger + del event - def async(self, fun, low, user='UNKNOWN'): + def async(self, fun, low, user='UNKNOWN', fire_event=True): ''' Execute the function in a multiprocess and return the event tag to use to watch for the return @@ -108,6 +110,7 @@ class AsyncClientMixin(object): proc = multiprocessing.Process( target=self._proc_function, - args=(fun, low, user, tag, jid)) + args=(fun, low, user, tag, jid), + kwargs={'fire_event': fire_event}) proc.start() return {'tag': tag, 'jid': jid} From 8c7d66d6dc643bfbe4bd7c6e3b64b4630855df8a Mon Sep 17 00:00:00 2001 From: Erik Johnson Date: Thu, 4 Dec 2014 14:31:35 -0600 Subject: [PATCH 4/4] Revert "Historically the recator has just called the "async" method of the runner and wheel clients, but this actually creates daemonized processes. In addition to creating a new daemonized process each event, the number of process it creates is unbounded, meaning that the reactor can easily use all available PIDs on a fairly busy master. In addition, there is no bound on the CPU that these are allowed to use (since they can create ALL the pids). This changes the reactor to create a threadpool for executing its master-side clients (runner/wheel). This threadpool has a configurable number of workers (max parallelism) and hwm (max queue size before dropping events)." This reverts commit 2a8d2a462a5cecad3a4d7e88395f7a871a6bfdd9. Conflicts: salt/utils/event.py --- salt/config.py | 4 --- salt/utils/event.py | 16 +++++------- salt/utils/process.py | 59 ------------------------------------------- 3 files changed, 6 insertions(+), 73 deletions(-) diff --git a/salt/config.py b/salt/config.py index e81be85440..0d74eef68e 100644 --- a/salt/config.py +++ b/salt/config.py @@ -213,8 +213,6 @@ VALID_OPTS = { 'publish_session': int, 'reactor': list, 'reactor_refresh_interval': int, - 'reactor_worker_threads': int, - 'reactor_worker_hwm': int, 'serial': str, 'search': str, 'search_index_interval': int, @@ -526,8 +524,6 @@ DEFAULT_MASTER_OPTS = { 'range_server': 'range:80', 'reactor': [], 'reactor_refresh_interval': 60, - 'reactor_worker_threads': 10, - 'reactor_worker_hwm': 10000, 'serial': 'msgpack', 'state_verbose': True, 'state_output': 'full', diff --git a/salt/utils/event.py b/salt/utils/event.py index da36336f71..bbcb690883 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -73,7 +73,6 @@ import salt.loader import salt.state import salt.utils import salt.utils.cache -import salt.utils.process from salt._compat import string_types log = logging.getLogger(__name__) @@ -699,11 +698,6 @@ class ReactWrap(object): if ReactWrap.client_cache is None: ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval']) - self.pool = salt.utils.process.ThreadPool( - self.opts['reactor_worker_threads'], # number of workers for runner/wheel - queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers - ) - def run(self, low): ''' Execute the specified function in the specified state by passing the @@ -712,12 +706,14 @@ class ReactWrap(object): l_fun = getattr(self, low['state']) try: f_call = salt.utils.format_call(l_fun, low) - l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {})) + ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {})) except Exception: log.error( 'Failed to execute {0}: {1}\n'.format(low['state'], l_fun), exc_info=True ) + return False + return ret def local(self, *args, **kwargs): ''' @@ -725,7 +721,7 @@ class ReactWrap(object): ''' if 'local' not in self.client_cache: self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file']) - self.client_cache['local'].cmd_async(*args, **kwargs) + return self.client_cache['local'].cmd_async(*args, **kwargs) cmd = local @@ -735,7 +731,7 @@ class ReactWrap(object): ''' if 'runner' not in self.client_cache: self.client_cache['runner'] = salt.runner.RunnerClient(self.opts) - self.pool.fire_async(self.client_cache['runner'].low, kwargs) + return self.client_cache['runner'].async(fun, kwargs, fire_event=False) def wheel(self, fun, **kwargs): ''' @@ -743,7 +739,7 @@ class ReactWrap(object): ''' if 'wheel' not in self.client_cache: self.client_cache['wheel'] = salt.wheel.Wheel(self.opts) - self.pool.fire_async(self.client_cache['wheel'].low, kwargs) + return self.client_cache['wheel'].async(fun, kwargs, fire_event=False) class StateFire(object): diff --git a/salt/utils/process.py b/salt/utils/process.py index 734d6a03f7..388e9aa663 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -8,9 +8,6 @@ import sys import multiprocessing import signal -import threading -import Queue - # Import salt libs import salt.utils @@ -122,62 +119,6 @@ def os_is_running(pid): return False -class ThreadPool(object): - ''' - This is a very VERY basic threadpool implementation - This was made instead of using multiprocessing ThreadPool because - we want to set max queue size and we want to daemonize threads (neither - is exposed in the stdlib version). - - Since there isn't much use for this class as of right now this implementation - Only supports daemonized threads and will *not* return results - - TODO: if this is found to be more generally useful it would be nice to pull - in the majority of code from upstream or from http://bit.ly/1wTeJtM - ''' - def __init__(self, - num_threads=None, - queue_size=0): - # if no count passed, default to number of CPUs - if num_threads is None: - num_threads = multiprocessing.cpu_count() - self.num_threads = num_threads - - # create a task queue of queue_size - self._job_queue = Queue.Queue(queue_size) - - self._workers = [] - - # create worker threads - for idx in xrange(num_threads): - thread = threading.Thread(target=self._thread_target) - thread.daemon = True - thread.start() - self._workers.append(thread) - - # intentionally not called "apply_async" since we aren't keeping track of - # the return at all, if we want to make this API compatible with multiprocessing - # threadpool we can in the future, and we won't have to worry about name collision - def fire_async(self, func, args=[], kwargs={}): - try: - self._job_queue.put((func, args, kwargs), False) - return True - except Queue.Full: - return False - - def _thread_target(self): - while True: - # 1s timeout so that if the parent dies this thread will die after 1s - try: - func, args, kwargs = self._job_queue.get(timeout=1) - except Queue.Empty: - continue - try: - func(*args, **kwargs) - except Exception: - pass - - class ProcessManager(object): ''' A class which will manage processes that should be running