Merge pull request #18741 from terminalmage/revert-pr18264

Revert #18254
This commit is contained in:
Erik Johnson 2014-12-04 14:37:03 -06:00
commit 00ed07411a
4 changed files with 26 additions and 94 deletions

View File

@ -62,7 +62,7 @@ class AsyncClientMixin(object):
client = None client = None
tag_prefix = None tag_prefix = None
def _proc_function(self, fun, low, user, tag, jid): def _proc_function(self, fun, low, user, tag, jid, fire_event=True):
''' '''
Run this method in a multiprocess target to execute the function in a Run this method in a multiprocess target to execute the function in a
multiprocess and fire the return data on the event bus multiprocess and fire the return data on the event bus
@ -72,13 +72,14 @@ class AsyncClientMixin(object):
'jid': jid, 'jid': jid,
'user': user, 'user': user,
} }
event = salt.utils.event.get_event( if fire_event:
'master', event = salt.utils.event.get_event(
self.opts['sock_dir'], 'master',
self.opts['transport'], self.opts['sock_dir'],
opts=self.opts, self.opts['transport'],
listen=False) opts=self.opts,
event.fire_event(data, tagify('new', base=tag)) listen=False)
event.fire_event(data, tagify('new', base=tag))
try: try:
data['return'] = self.low(fun, low) data['return'] = self.low(fun, low)
@ -93,12 +94,13 @@ class AsyncClientMixin(object):
data['success'] = False data['success'] = False
data['user'] = user data['user'] = user
event.fire_event(data, tagify('ret', base=tag)) if fire_event:
# if we fired an event, make sure to delete the event object. event.fire_event(data, tagify('ret', base=tag))
# This will ensure that we call destroy, which will do the 0MQ linger # if we fired an event, make sure to delete the event object.
del event # This will ensure that we call destroy, which will do the 0MQ linger
del event
def async(self, fun, low, user='UNKNOWN'): def async(self, fun, low, user='UNKNOWN', fire_event=True):
''' '''
Execute the function in a multiprocess and return the event tag to use Execute the function in a multiprocess and return the event tag to use
to watch for the return to watch for the return
@ -108,6 +110,7 @@ class AsyncClientMixin(object):
proc = multiprocessing.Process( proc = multiprocessing.Process(
target=self._proc_function, target=self._proc_function,
args=(fun, low, user, tag, jid)) args=(fun, low, user, tag, jid),
kwargs={'fire_event': fire_event})
proc.start() proc.start()
return {'tag': tag, 'jid': jid} return {'tag': tag, 'jid': jid}

View File

@ -213,8 +213,6 @@ VALID_OPTS = {
'publish_session': int, 'publish_session': int,
'reactor': list, 'reactor': list,
'reactor_refresh_interval': int, 'reactor_refresh_interval': int,
'reactor_worker_threads': int,
'reactor_worker_hwm': int,
'serial': str, 'serial': str,
'search': str, 'search': str,
'search_index_interval': int, 'search_index_interval': int,
@ -526,8 +524,6 @@ DEFAULT_MASTER_OPTS = {
'range_server': 'range:80', 'range_server': 'range:80',
'reactor': [], 'reactor': [],
'reactor_refresh_interval': 60, 'reactor_refresh_interval': 60,
'reactor_worker_threads': 10,
'reactor_worker_hwm': 10000,
'serial': 'msgpack', 'serial': 'msgpack',
'state_verbose': True, 'state_verbose': True,
'state_output': 'full', 'state_output': 'full',

View File

@ -73,7 +73,6 @@ import salt.loader
import salt.state import salt.state
import salt.utils import salt.utils
import salt.utils.cache import salt.utils.cache
import salt.utils.process
from salt._compat import string_types from salt._compat import string_types
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -253,7 +252,7 @@ class SaltEvent(object):
mtag = raw[0:20].rstrip('|') mtag = raw[0:20].rstrip('|')
mdata = raw[20:] mdata = raw[20:]
else: # new style else: # new style
mtag, _, mdata = raw.partition(TAGEND) # split tag from data mtag, sep, mdata = raw.partition(TAGEND) # split tag from data
data = serial.loads(mdata) data = serial.loads(mdata)
return mtag, data return mtag, data
@ -699,11 +698,6 @@ class ReactWrap(object):
if ReactWrap.client_cache is None: if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval']) ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
self.pool = salt.utils.process.ThreadPool(
self.opts['reactor_worker_threads'], # number of workers for runner/wheel
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
)
def run(self, low): def run(self, low):
''' '''
Execute the specified function in the specified state by passing the Execute the specified function in the specified state by passing the
@ -712,12 +706,14 @@ class ReactWrap(object):
l_fun = getattr(self, low['state']) l_fun = getattr(self, low['state'])
try: try:
f_call = salt.utils.format_call(l_fun, low) f_call = salt.utils.format_call(l_fun, low)
l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {})) ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
except Exception: except Exception:
log.error( log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun), 'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
exc_info=True exc_info=True
) )
return False
return ret
def local(self, *args, **kwargs): def local(self, *args, **kwargs):
''' '''
@ -725,25 +721,25 @@ class ReactWrap(object):
''' '''
if 'local' not in self.client_cache: if 'local' not in self.client_cache:
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file']) self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
self.client_cache['local'].cmd_async(*args, **kwargs) return self.client_cache['local'].cmd_async(*args, **kwargs)
cmd = local cmd = local
def runner(self, _, **kwargs): def runner(self, fun, **kwargs):
''' '''
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>` Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
''' '''
if 'runner' not in self.client_cache: if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts) self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
self.pool.fire_async(self.client_cache['runner'].low, kwargs) return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
def wheel(self, _, **kwargs): def wheel(self, fun, **kwargs):
''' '''
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>` Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
''' '''
if 'wheel' not in self.client_cache: if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts) self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
self.pool.fire_async(self.client_cache['wheel'].low, kwargs) return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
class StateFire(object): class StateFire(object):

View File

@ -8,9 +8,6 @@ import sys
import multiprocessing import multiprocessing
import signal import signal
import threading
import Queue
# Import salt libs # Import salt libs
import salt.utils import salt.utils
@ -122,66 +119,6 @@ def os_is_running(pid):
return False return False
class ThreadPool(object):
'''
This is a very VERY basic threadpool implementation
This was made instead of using multiprocessing ThreadPool because
we want to set max queue size and we want to daemonize threads (neither
is exposed in the stdlib version).
Since there isn't much use for this class as of right now this implementation
Only supports daemonized threads and will *not* return results
TODO: if this is found to be more generally useful it would be nice to pull
in the majority of code from upstream or from http://bit.ly/1wTeJtM
'''
def __init__(self,
num_threads=None,
queue_size=0):
# if no count passed, default to number of CPUs
if num_threads is None:
num_threads = multiprocessing.cpu_count()
self.num_threads = num_threads
# create a task queue of queue_size
self._job_queue = Queue.Queue(queue_size)
self._workers = []
# create worker threads
for idx in xrange(num_threads):
thread = threading.Thread(target=self._thread_target)
thread.daemon = True
thread.start()
self._workers.append(thread)
# intentionally not called "apply_async" since we aren't keeping track of
# the return at all, if we want to make this API compatible with multiprocessing
# threadpool we can in the future, and we won't have to worry about name collision
def fire_async(self, func, args=None, kwargs=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
try:
self._job_queue.put((func, args, kwargs), False)
return True
except Queue.Full:
return False
def _thread_target(self):
while True:
# 1s timeout so that if the parent dies this thread will die after 1s
try:
func, args, kwargs = self._job_queue.get(timeout=1)
except Queue.Empty:
continue
try:
func(*args, **kwargs)
except Exception:
pass
class ProcessManager(object): class ProcessManager(object):
''' '''
A class which will manage processes that should be running A class which will manage processes that should be running