Properly wait on returns in saltnado

This was broken because the behavior was to simply check the ckminions
and wait for only those returns to complete. This works assuming
ckminions is accurate (which there are many cases where it isn't, such
as syndics).

_disbatch_local's waiting on returns needs to match LocalClient's
behavior (namely that in get_iter_returns). This means we are allowed to
return when (1) we have waitged the min_wait_time (0 if not a syndic)
(2) no minions are running the job (3) all minions we saw running it are
done running the job. The only method allowed for earlier termination is
if the gather_job_timeout is exceeded.

Fixes #42659
This commit is contained in:
Thomas Jackson 2018-06-18 13:12:06 -07:00 committed by rallytime
parent 95ef006e00
commit d0a98534a9
No known key found for this signature in database
GPG Key ID: E8F1A4B90D0DEA19

View File

@ -923,9 +923,11 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# Generate jid before triggering a job to subscribe all returns from minions
chunk['jid'] = salt.utils.jid.gen_jid()
# Subscribe returns from minions before firing a job
minions = set(self.ckminions.check_minions(chunk['tgt'], chunk.get('tgt_type', 'glob')))
future_minion_map = self.subscribe_minion_returns(chunk['jid'], minions)
# start listening for the event before we fire the job to avoid races
events = [
self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid']),
self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid']),
]
f_call = self._format_call_run_job_async(chunk)
# fire a job off
@ -937,88 +939,91 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# if the job didn't publish, lets not wait around for nothing
# TODO: set header??
if 'jid' not in pub_data:
for future in future_minion_map:
for future in events:
try:
future.set_result(None)
except Exception:
pass
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
# Map of minion_id -> returned for all minions we think we need to wait on
minions = {m: False for m in pub_data['minions']}
# minimum time required for return to complete. By default no waiting, if
# we are a syndic then we must wait syndic_wait at a minimum
min_wait_time = Future().set_result(True)
# wait syndic a while to avoid missing published events
if self.application.opts['order_masters']:
yield tornado.gen.sleep(self.application.opts['syndic_wait'])
min_wait_time = tornado.gen.sleep(self.application.opts['syndic_wait'])
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
is_finished = Future()
is_finished = tornado.gen.sleep(self.application.opts['gather_job_timeout'])
job_not_running_future = self.job_not_running(pub_data['jid'],
# ping until the job is not running, while doing so, if we see new minions returning
# that they are running the job, add them to the list
tornado.ioloop.IOLoop.current().spawn_callback(self.job_not_running, pub_data['jid'],
chunk['tgt'],
f_call['kwargs']['tgt_type'],
minions,
is_finished)
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
yield job_not_running_future
raise tornado.gen.Return((yield minion_returns_future))
def subscribe_minion_returns(self, jid, minions):
# Subscribe each minion event
future_minion_map = {}
for minion in minions:
tag = tagify([jid, 'ret', minion], 'job')
minion_future = self.application.event_listener.get_event(self,
tag=tag,
matcher=EventListener.exact_matcher)
future_minion_map[minion_future] = minion
return future_minion_map
@tornado.gen.coroutine
def sanitize_minion_returns(self, future_minion_map, minions, is_finished):
'''
Return a future which will complete once all returns are completed
(according to minions), or one of the passed in "finish_chunk_ret_future" completes
'''
if minions is None:
minions = []
# Remove redundant minions
redundant_minion_futures = [future for future in future_minion_map.keys() if future_minion_map[future] not in minions]
for redundant_minion_future in redundant_minion_futures:
try:
redundant_minion_future.set_result(None)
except Exception:
pass
del future_minion_map[redundant_minion_future]
def more_todo():
'''Check if there are any more minions we are waiting on returns from
'''
return any(x is False for x in six.itervalues(minions))
# here we want to follow the behavior of LocalClient.get_iter_returns
# namely we want to wait at least syndic_wait (assuming we are a syndic)
# and that there are no more jobs running on minions. We are allowed to exit
# early if gather_job_timeout has been exceeded
chunk_ret = {}
while True:
f = yield Any(list(future_minion_map.keys()) + [is_finished])
to_wait = events+[is_finished]
if not min_wait_time.done():
to_wait += [min_wait_time]
def cancel_inflight_futures():
for event in to_wait:
if not event.done():
event.set_result(None)
f = yield Any(to_wait)
try:
# When finished entire routine, cleanup other futures and return result
if f is is_finished:
for event in future_minion_map.keys():
if not event.done():
event.set_result(None)
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
elif f is min_wait_time:
if not more_todo():
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
continue
f_result = f.result()
chunk_ret[f_result['data']['id']] = f_result['data']['return']
# if this is a start, then we need to add it to the pile
if f_result['tag'].endswith('/new'):
for minion_id in f_result['data']['minions']:
if minion_id not in minions:
minions[minion_id] = False
else:
chunk_ret[f_result['data']['id']] = f_result['data']['return']
# clear finished event future
minions[f_result['data']['id']] = True
# if there are no more minions to wait for, then we are done
if not more_todo() and min_wait_time.done():
cancel_inflight_futures()
raise tornado.gen.Return(chunk_ret)
except TimeoutException:
pass
# clear finished event future
try:
minions.remove(future_minion_map[f])
del future_minion_map[f]
except ValueError:
pass
if not minions:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(chunk_ret)
if f == events[0]:
events[0] = self.application.event_listener.get_event(self, tag='salt/job/'+chunk['jid'])
else:
events[1] = self.application.event_listener.get_event(self, tag='syndic/job/'+chunk['jid'])
@tornado.gen.coroutine
def job_not_running(self, jid, tgt, tgt_type, is_finished):
def job_not_running(self, jid, tgt, tgt_type, minions, is_finished):
'''
Return a future which will complete once jid (passed in) is no longer
running on tgt
@ -1044,8 +1049,6 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
event = f.result()
except TimeoutException:
if not minion_running:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(True)
else:
ping_pub_data = yield self.saltclients['local'](tgt,
@ -1059,6 +1062,8 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# Minions can return, we want to see if the job is running...
if event['data'].get('return', {}) == {}:
continue
if event['data']['id'] not in minions:
minions[event['data']['id']] = False
minion_running = True
@tornado.gen.coroutine