From 35eb777c124295d216de6a003f912feea7338ccb Mon Sep 17 00:00:00 2001 From: Thomas S Hatch Date: Thu, 1 Mar 2012 17:16:12 -0700 Subject: [PATCH] Add initial --batch option Yes, there has been some disagreement on the name, I care a lot less about why people dislike --batch and care a lot more about arguments for something else to call it. --- salt/cli/__init__.py | 13 ++++- salt/cli/batch.py | 117 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 salt/cli/batch.py diff --git a/salt/cli/__init__.py b/salt/cli/__init__.py index 945be87cf4..6cdcca9060 100644 --- a/salt/cli/__init__.py +++ b/salt/cli/__init__.py @@ -11,6 +11,7 @@ import sys import salt.cli.caller import salt.cli.cp import salt.cli.key +import salt.cli.batch import salt.client import salt.output import salt.runner @@ -50,6 +51,13 @@ class SaltCMD(object): dest='iter_', action='store_true', help='Return the data from minions as the data is returned') + parser.add_option('-b', + '--batch', + default='', + dest='batch', + help=('Execute the salt job in batch mode, pass either the ' + 'number of minions to batch at a time, or the ' + 'percentage of minions to have running')) parser.add_option('-E', '--pcre', default=False, @@ -163,6 +171,7 @@ class SaltCMD(object): if not options.timeout is None: opts['timeout'] = int(options.timeout) opts['iter'] = options.iter_ + opts['batch'] = options.batch opts['pcre'] = options.pcre opts['list'] = options.list_ opts['grain'] = options.grain @@ -242,7 +251,9 @@ class SaltCMD(object): print 'Return data for job {0}:'.format(jid) printout(ret[jid]) print '' - + elif self.opts['batch']: + batch = salt.cli.batch.Batch(self.opts) + batch.run() else: if not 'timeout' in self.opts: self.opts['timeout'] = local.opts['timeout'] diff --git a/salt/cli/batch.py b/salt/cli/batch.py new file mode 100644 index 0000000000..f535ad39f1 --- /dev/null +++ b/salt/cli/batch.py @@ -0,0 +1,117 @@ +''' +Execute batch runs +''' +# Import Python libs +import time +import copy + +# Import Salt libs +import salt.client + +class Batch(object): + ''' + Manage the execution of batch runs + ''' + def __init__(self, opts): + self.opts = opts + self.local = salt.client.LocalClient(opts['conf_file']) + self.minions = self.__gather_minions() + + def __gather_minions(self): + ''' + Return a list of minions to use for the batch run + ''' + args = [self.opts['tgt'], + 'test.ping', + [], + 1, + ] + if self.opts['pcre']: + args.append('pcre') + elif self.opts['list']: + args.append('list') + elif self.opts['grain']: + args.append('grain') + elif self.opts['grain_pcre']: + args.append('grain_pcre') + elif self.opts['exsel']: + args.append('exsel') + elif self.opts['nodegroup']: + args.append('nodegroup') + elif self.opts['compound']: + args.append('compound') + else: + args.append('glob') + + fret = [] + for ret in self.local.cmd_iter(*args): + for minion in ret: + print '{0} Detected for this batch run'.format(minion) + fret.append(minion) + return sorted(fret) + + def get_bnum(self): + ''' + Return the active number of minions to maintain + ''' + if self.opts['batch'].startswith('%'): + return int(float(self.opts['batch']) * snum) + return int(self.opts['batch']) + + def run(self): + ''' + Execute the batch run + ''' + args = [[], + self.opts['fun'], + self.opts['arg'], + 9999999999, + 'list', + ] + bnum = self.get_bnum() + to_run = copy.deepcopy(self.minions) + active = [] + ret = {} + iters = [] + # Itterate while we still have things to execute + while len(ret) < len(self.minions): + next_ = [] + if len(to_run) <= bnum and not active: + # last bit of them, add them all to next iterator + while to_run: + next_.append(to_run.pop()) + else: + for ind in range(bnum - len(active)): + if to_run: + next_.append(to_run.pop()) + active += next_ + args[0] = next_ + if next_: + print '\nExecuting run on {0}\n'.format(next_) + iters.append( + self.local.cmd_iter_no_block(*args)) + else: + time.sleep(0.02) + parts = {} + for queue in iters: + try: + # Gather returns until we get to the bottom + ncnt = 0 + while True: + part = queue.next() + if part is None: + time.sleep(0.01) + ncnt += 1 + if ncnt > 5: + break + continue + print part + parts.update(part) + except StopIteration: + # remove the iter, it is done + pass + for minion, data in parts.items(): + active.remove(minion) + ret[minion] = data['ret'] + #print minion + #print ret[minion]