Remove local multiprocessing, and use the aync client mixin

This commit is contained in:
Thomas Jackson 2014-12-23 08:22:02 -08:00
parent 0e935486de
commit 5e59781ae8

View File

@ -126,73 +126,7 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
args, kwargs = salt.minion.load_args_and_kwargs(
self.functions[fun], arglist, pub_data
)
fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
jid = self.returners[fstr]()
log.debug('Runner starting with jid {0}'.format(jid))
self.event.fire_event({'runner_job': fun}, tagify([jid, 'new'], 'job'))
target = RunnerClient._thread_return
data = {'fun': fun, 'jid': jid, 'args': args, 'kwargs': kwargs}
args = (self, self.opts, data)
ret = jid
if self.opts.get('async', False):
process = multiprocessing.Process(
target=target, args=args
)
process.start()
else:
ret = target(*args)
return ret
@classmethod
def _thread_return(cls, instance, opts, data):
'''
The multiprocessing process calls back here
to stream returns
'''
# Runners modules runtime injection:
# - the progress event system with the correct jid
# - Provide JID if the runner wants to access it directly
done = {}
if opts.get('async', False):
progress = salt.utils.event.get_runner_event(opts, data['jid'], listen=False).fire_progress
else:
progress = _progress_print
for func_name, func in instance.functions.items():
if func.__module__ in done:
continue
mod = sys.modules[func.__module__]
mod.__jid__ = data['jid']
mod.__progress__ = progress
done[func.__module__] = mod
ret = instance.functions[data['fun']](*data['args'], **data['kwargs'])
# Sleep for just a moment to let any progress events return
time.sleep(0.1)
ret_load = {'return': ret, 'fun': data['fun'], 'fun_args': data['args']}
# Don't use the invoking processes' event socket because it could be closed down by the time we arrive here.
# Create another, for safety's sake.
master_event = salt.utils.event.get_master_event(opts, opts['sock_dir'], listen=False)
master_event.fire_event(ret_load, tagify([data['jid'], 'return'], 'runner'))
master_event.destroy()
try:
fstr = '{0}.save_runner_load'.format(opts['master_job_cache'])
instance.returners[fstr](data['jid'], ret_load)
except KeyError:
log.debug(
'The specified returner used for the master job cache '
'"{0}" does not have a save_runner_load function! The results '
'of this runner execution will not be stored.'.format(
opts['master_job_cache']
)
)
except Exception:
log.critical(
'The specified returner threw a stack trace:\n',
exc_info=True
)
if opts.get('async', False):
return data['jid']
else:
return ret
return self.functions[fun](*args, **kwargs)
def master_call(self, **kwargs):
'''
@ -300,18 +234,24 @@ class Runner(RunnerClient):
else:
try:
# Run the runner!
jid = super(Runner, self).cmd(
self.opts['fun'], self.opts['arg'], self.opts)
if self.opts.get('async', False):
low = {'fun': self.opts['fun'],
'args': self.opts['arg'],
'kwargs': self.opts}
async_pub = super(Runner, self).async(
self.opts['fun'], low)
log.info('Running in async mode. Results of this execution may '
'be collected by attaching to the master event bus or '
'by examing the master job cache, if configured.')
rets = self.get_runner_returns(jid)
'by examing the master job cache, if configured. '
'This execution is under tag {0}'.format(async_pub['tag']))
rets = self.get_runner_returns(async_pub['tag'])
else:
rets = [jid]
ret = super(Runner, self).cmd(
self.opts['fun'], self.opts['arg'], self.opts)
rets = [ret]
# Gather the returns
for ret in rets:
if not self.opts.get('quiet', False):
if not self.opts.get('quiet', False):
for ret in rets:
if isinstance(ret, dict) and 'outputter' in ret and ret['outputter'] is not None:
print(self.outputters[ret['outputter']](ret['data']))
else:
@ -324,7 +264,7 @@ class Runner(RunnerClient):
log.debug('Runner return: {0}'.format(ret))
return ret
def get_runner_returns(self, jid, timeout=None):
def get_runner_returns(self, tag, timeout=None):
'''
Gather the return data from the event system, break hard when timeout
is reached.
@ -336,8 +276,7 @@ class Runner(RunnerClient):
last_progress_timestamp = time.time()
while True:
raw = self.event.get_event(timeout, full=True)
time.sleep(0.1)
raw = self.event.get_event(timeout, tag=tag, full=True)
# If we saw no events in the event bus timeout
# OR
# we have reached the total timeout
@ -348,12 +287,11 @@ class Runner(RunnerClient):
# Timeout reached
break
try:
if not raw['tag'].split('/')[1] == 'runner' and raw['tag'].split('/')[2] == jid:
continue
elif raw['tag'].split('/')[3] == 'progress' and raw['tag'].split('/')[2] == jid:
tag_parts = raw['tag'].split('/')
if tag_parts[3] == 'progress':
last_progress_timestamp = time.time()
yield {'data': raw['data']['data'], 'outputter': raw['data']['outputter']}
elif raw['tag'].split('/')[3] == 'return' and raw['tag'].split('/')[2] == jid:
elif tag_parts[3] == 'ret':
yield raw['data']['return']
break
# Handle a findjob that might have been kicked off under the covers
@ -362,7 +300,5 @@ class Runner(RunnerClient):
continue
except (IndexError, KeyError):
continue
def _progress_print(text, *args, **kwargs):
print(text)
# if you get to the end, sleep
time.sleep(0.1)