From 5e59781ae84f75406d105f593ddf294501dbbdc9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Tue, 23 Dec 2014 08:22:02 -0800 Subject: [PATCH] Remove local multiprocessing, and use the aync client mixin --- salt/runner.py | 106 ++++++++++--------------------------------------- 1 file changed, 21 insertions(+), 85 deletions(-) diff --git a/salt/runner.py b/salt/runner.py index ad35de11fc..c844f41191 100644 --- a/salt/runner.py +++ b/salt/runner.py @@ -126,73 +126,7 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object): args, kwargs = salt.minion.load_args_and_kwargs( self.functions[fun], arglist, pub_data ) - fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) - jid = self.returners[fstr]() - log.debug('Runner starting with jid {0}'.format(jid)) - self.event.fire_event({'runner_job': fun}, tagify([jid, 'new'], 'job')) - target = RunnerClient._thread_return - data = {'fun': fun, 'jid': jid, 'args': args, 'kwargs': kwargs} - args = (self, self.opts, data) - ret = jid - if self.opts.get('async', False): - process = multiprocessing.Process( - target=target, args=args - ) - process.start() - else: - ret = target(*args) - return ret - - @classmethod - def _thread_return(cls, instance, opts, data): - ''' - The multiprocessing process calls back here - to stream returns - ''' - # Runners modules runtime injection: - # - the progress event system with the correct jid - # - Provide JID if the runner wants to access it directly - done = {} - if opts.get('async', False): - progress = salt.utils.event.get_runner_event(opts, data['jid'], listen=False).fire_progress - else: - progress = _progress_print - for func_name, func in instance.functions.items(): - if func.__module__ in done: - continue - mod = sys.modules[func.__module__] - mod.__jid__ = data['jid'] - mod.__progress__ = progress - done[func.__module__] = mod - ret = instance.functions[data['fun']](*data['args'], **data['kwargs']) - # Sleep for just a moment to let any progress events return - time.sleep(0.1) - ret_load = {'return': ret, 'fun': data['fun'], 'fun_args': data['args']} - # Don't use the invoking processes' event socket because it could be closed down by the time we arrive here. - # Create another, for safety's sake. - master_event = salt.utils.event.get_master_event(opts, opts['sock_dir'], listen=False) - master_event.fire_event(ret_load, tagify([data['jid'], 'return'], 'runner')) - master_event.destroy() - try: - fstr = '{0}.save_runner_load'.format(opts['master_job_cache']) - instance.returners[fstr](data['jid'], ret_load) - except KeyError: - log.debug( - 'The specified returner used for the master job cache ' - '"{0}" does not have a save_runner_load function! The results ' - 'of this runner execution will not be stored.'.format( - opts['master_job_cache'] - ) - ) - except Exception: - log.critical( - 'The specified returner threw a stack trace:\n', - exc_info=True - ) - if opts.get('async', False): - return data['jid'] - else: - return ret + return self.functions[fun](*args, **kwargs) def master_call(self, **kwargs): ''' @@ -300,18 +234,24 @@ class Runner(RunnerClient): else: try: # Run the runner! - jid = super(Runner, self).cmd( - self.opts['fun'], self.opts['arg'], self.opts) if self.opts.get('async', False): + low = {'fun': self.opts['fun'], + 'args': self.opts['arg'], + 'kwargs': self.opts} + async_pub = super(Runner, self).async( + self.opts['fun'], low) log.info('Running in async mode. Results of this execution may ' 'be collected by attaching to the master event bus or ' - 'by examing the master job cache, if configured.') - rets = self.get_runner_returns(jid) + 'by examing the master job cache, if configured. ' + 'This execution is under tag {0}'.format(async_pub['tag'])) + rets = self.get_runner_returns(async_pub['tag']) else: - rets = [jid] + ret = super(Runner, self).cmd( + self.opts['fun'], self.opts['arg'], self.opts) + rets = [ret] # Gather the returns - for ret in rets: - if not self.opts.get('quiet', False): + if not self.opts.get('quiet', False): + for ret in rets: if isinstance(ret, dict) and 'outputter' in ret and ret['outputter'] is not None: print(self.outputters[ret['outputter']](ret['data'])) else: @@ -324,7 +264,7 @@ class Runner(RunnerClient): log.debug('Runner return: {0}'.format(ret)) return ret - def get_runner_returns(self, jid, timeout=None): + def get_runner_returns(self, tag, timeout=None): ''' Gather the return data from the event system, break hard when timeout is reached. @@ -336,8 +276,7 @@ class Runner(RunnerClient): last_progress_timestamp = time.time() while True: - raw = self.event.get_event(timeout, full=True) - time.sleep(0.1) + raw = self.event.get_event(timeout, tag=tag, full=True) # If we saw no events in the event bus timeout # OR # we have reached the total timeout @@ -348,12 +287,11 @@ class Runner(RunnerClient): # Timeout reached break try: - if not raw['tag'].split('/')[1] == 'runner' and raw['tag'].split('/')[2] == jid: - continue - elif raw['tag'].split('/')[3] == 'progress' and raw['tag'].split('/')[2] == jid: + tag_parts = raw['tag'].split('/') + if tag_parts[3] == 'progress': last_progress_timestamp = time.time() yield {'data': raw['data']['data'], 'outputter': raw['data']['outputter']} - elif raw['tag'].split('/')[3] == 'return' and raw['tag'].split('/')[2] == jid: + elif tag_parts[3] == 'ret': yield raw['data']['return'] break # Handle a findjob that might have been kicked off under the covers @@ -362,7 +300,5 @@ class Runner(RunnerClient): continue except (IndexError, KeyError): continue - - -def _progress_print(text, *args, **kwargs): - print(text) + # if you get to the end, sleep + time.sleep(0.1)