Merge pull request #6311 from felskrone/add_minion_tracking_and_timeout

changed timeout from 9999 to self.opts['timeout'] which is the user spec...
This commit is contained in:
Thomas S Hatch 2013-07-26 09:21:55 -07:00
commit a25f715a82

View File

@ -72,7 +72,7 @@ class Batch(object):
args = [[],
self.opts['fun'],
self.opts['arg'],
99999,
self.opts['timeout'],
'list',
]
bnum = self.get_bnum()
@ -80,6 +80,15 @@ class Batch(object):
active = []
ret = {}
iters = []
# the minion tracker keeps track of responses and iterators
# - it removes finished iterators from iters[]
# - if a previously detected minion does not respond, its
# added with an empty answer to ret{} once the timeout is reached
# - unresponsive minions are removed from active[] to make
# sure that the main while loop finishes even with unresp minions
minion_tracker = {}
# Iterate while we still have things to execute
while len(ret) < len(self.minions):
next_ = []
@ -91,19 +100,28 @@ class Batch(object):
for i in range(bnum - len(active)):
if to_run:
next_.append(to_run.pop())
active += next_
args[0] = next_
if next_:
if not self.quiet:
print('\nExecuting run on {0}\n'.format(next_))
iters.append(
self.local.cmd_iter_no_block(
*args,
raw=self.opts.get('raw', False))
)
# create a new iterator for this batch of minions
new_iter = self.local.cmd_iter_no_block(
*args,
raw=self.opts.get('raw', False))
# add it to our iterators and to the minion_tracker
iters.append(new_iter)
minion_tracker[new_iter] = {}
# every iterator added is 'active' and has its set of minions
minion_tracker[new_iter]['minions'] = next_
minion_tracker[new_iter]['active'] = True
else:
time.sleep(0.02)
parts = {}
for queue in iters:
try:
# Gather returns until we get to the bottom
@ -121,8 +139,21 @@ class Batch(object):
else:
parts.update(part)
except StopIteration:
# remove the iter, it is done
pass
# if a iterator is done:
# - set it to inactive
# - add minions that have not responded to parts{}
# check if the tracker contains the iterator
if( queue in minion_tracker.keys() ):
minion_tracker[queue]['active'] = False
# add all minions that belong to this iterator and
# that have not responded to parts{} with an empty response
for minion in minion_tracker[queue]['minions']:
if minion not in parts.keys():
parts[minion] = {}
parts[minion]['ret'] = {}
for minion, data in parts.items():
active.remove(minion)
if self.opts.get('raw'):
@ -141,3 +172,14 @@ class Batch(object):
data,
out,
self.opts)
# remove inactive iterators from the iters list
for queue in minion_tracker.keys():
# only remove inactive queues
if not minion_tracker[queue]['active'] and queue in iters:
iters.remove(queue)
# also remove the iterator's minions from the active list
for minion in minion_tracker[queue]['minions']:
if minion in active:
active.remove(minion)