From b723b405f116c33a57ebda2c14ec482bf1e9a733 Mon Sep 17 00:00:00 2001 From: Volker Schwicking Date: Thu, 25 Jul 2013 18:06:52 +0200 Subject: [PATCH] changed timeout from 9999 to self.opts['timeout'] which is the user specified timeout or defaults to 5. With timeout 9999 the loop (almost) never ends if a previously detected minion vanishes/shuts down during the actual batch run. Also added a minion_tracker dict which keeps track of the cmd_iter_no_block-iterators and their minions. - the cmd_iter_no_block-iterators are now cleaned during every run if the have finished (iterating iters[] gets faster). - minions that did not respond within the given timeout are taken care of by removing them from active[] and updating ret{} with empty returns. These two additions enable the main while loop to actually finish before 9999 seconds are over (ret{} and self.minions are eventually the same size) if a minion which was previously detected does not respond properly. This also fixes the problem of unresponsive minions blocking the progression of the batch run. Example: if batch=10 and 10 minions are unresponsive, range(bnum - len(active) ) always evaluates to zero and no more minions are added to next_. --- salt/cli/batch.py | 58 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 8c7326cbc0..62d85311f5 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -72,7 +72,7 @@ class Batch(object): args = [[], self.opts['fun'], self.opts['arg'], - 99999, + self.opts['timeout'], 'list', ] bnum = self.get_bnum() @@ -80,6 +80,15 @@ class Batch(object): active = [] ret = {} iters = [] + + # the minion tracker keeps track of responses and iterators + # - it removes finished iterators from iters[] + # - if a previously detected minion does not respond, its + # added with an empty answer to ret{} once the timeout is reached + # - unresponsive minions are removed from active[] to make + # sure that the main while loop finishes even with unresp minions + minion_tracker = {} + # Iterate while we still have things to execute while len(ret) < len(self.minions): next_ = [] @@ -91,19 +100,28 @@ class Batch(object): for i in range(bnum - len(active)): if to_run: next_.append(to_run.pop()) + active += next_ args[0] = next_ + if next_: if not self.quiet: print('\nExecuting run on {0}\n'.format(next_)) - iters.append( - self.local.cmd_iter_no_block( - *args, - raw=self.opts.get('raw', False)) - ) + # create a new iterator for this batch of minions + new_iter = self.local.cmd_iter_no_block( + *args, + raw=self.opts.get('raw', False)) + # add it to our iterators and to the minion_tracker + iters.append(new_iter) + minion_tracker[new_iter] = {} + # every iterator added is 'active' and has its set of minions + minion_tracker[new_iter]['minions'] = next_ + minion_tracker[new_iter]['active'] = True + else: time.sleep(0.02) parts = {} + for queue in iters: try: # Gather returns until we get to the bottom @@ -121,8 +139,21 @@ class Batch(object): else: parts.update(part) except StopIteration: - # remove the iter, it is done - pass + # if a iterator is done: + # - set it to inactive + # - add minions that have not responded to parts{} + + # check if the tracker contains the iterator + if( queue in minion_tracker.keys() ): + minion_tracker[queue]['active'] = False + + # add all minions that belong to this iterator and + # that have not responded to parts{} with an empty response + for minion in minion_tracker[queue]['minions']: + if minion not in parts.keys(): + parts[minion] = {} + parts[minion]['ret'] = {} + for minion, data in parts.items(): active.remove(minion) if self.opts.get('raw'): @@ -141,3 +172,14 @@ class Batch(object): data, out, self.opts) + + # remove inactive iterators from the iters list + for queue in minion_tracker.keys(): + # only remove inactive queues + if not minion_tracker[queue]['active'] and queue in iters: + iters.remove(queue) + # also remove the iterator's minions from the active list + for minion in minion_tracker[queue]['minions']: + if minion in active: + active.remove(minion) +