Rework of more_time feature

In the past this feature has basically checked returns, pinged minions if they were still running the job, then returning once the job was no longer run remotely. This has caused some issues with race conditions if the minion or master are busy (and the time between minion done, and master recieved gets large).

This changes the behavior of the "wait" mechanism. The intent of this feature is to wait for the job return from all minions the job went to. To accomplish this we fire the job, then check the returns. If the initial returns are what we wanted/expected we break. If it isn't we will ping all targets and see if its still running. If more minions return, we merge them into the list of minions we are waiting for. We will timeout the job once all minions have stopped running the job and TIMEOUT seconds have passed (since the slowest minion)

Conflicts:

	salt/client/__init__.py
This commit is contained in:
Thomas Jackson 2014-09-24 16:15:36 -07:00
parent 8519139382
commit 295afba5f8

View File

@ -24,6 +24,8 @@ import os
import time
import copy
import logging
import zmq
import errno
from datetime import datetime
from salt._compat import string_types
@ -186,7 +188,7 @@ class LocalClient(object):
# Looks like the timeout is invalid, use config
return self.opts['timeout']
def gather_job_info(self, jid, tgt, tgt_type, minions, **kwargs):
def gather_job_info(self, jid, tgt, tgt_type):
'''
Return the information about a given job
'''
@ -200,15 +202,8 @@ class LocalClient(object):
timeout=timeout,
)
if not pub_data:
return pub_data
return pub_data
minions.update(pub_data['minions'])
return self.get_returns(pub_data['jid'],
minions,
self._get_timeout(timeout),
pending_tags=[jid])
def _check_pub_data(self, pub_data):
'''
@ -778,6 +773,29 @@ class LocalClient(object):
if len(found.intersection(minions)) >= len(minions):
raise StopIteration()
# TODO: tests!!
def get_returns_no_block(
self,
jid,
event=None):
'''
Raw funciton to just return events of jid excluding timeout logic
Yield either the raw event data or None
'''
if event is None:
event = self.event
while True:
try:
raw = event.get_event_noblock()
if raw['tag'].startswith(jid):
yield raw
except zmq.ZMQError as ex:
if ex.errno == errno.EAGAIN or ex.errno == errno.EINTR:
yield None
else:
raise
def get_iter_returns(
self,
jid,
@ -802,6 +820,10 @@ class LocalClient(object):
timeout = self.opts['timeout']
start = int(time.time())
timeout_at = start + timeout
# timeouts per minion, id_ -> timeout time
minion_timeouts = {}
found = set()
# Check to see if the jid is real, if not return the empty dict
if not self.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
@ -817,49 +839,38 @@ class LocalClient(object):
jid, minions, datetime.fromtimestamp(timeout_at).time()
)
)
# iterator for this job's return
ret_iter = self.get_returns_no_block(jid)
# iterator for the info of this job
jinfo_iter = None
jinfo_timeout = None
while True:
# Process events until timeout is reached or all minions have returned
time_left = timeout_at - int(time.time())
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
raw = None
# Look for events if we haven't yet found all the minions or if we are still waiting for
# the syndics to report on how many minions they have forwarded the command to
if (len(found.intersection(minions)) < len(minions) or
(self.opts['order_masters'] and syndic_wait < self.opts.get('syndic_wait', 1))):
raw = self.event.get_event(wait, jid)
if raw is not None:
for raw in ret_iter:
# if we got None, then there were no events
if raw is None:
break
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw:
minions.update(raw['syndic'])
continue
if 'return' not in raw:
if 'return' not in raw['data']:
continue
if kwargs.get('raw', False):
found.add(raw['id'])
found.add(raw['data']['id'])
yield raw
else:
found.add(raw['id'])
ret = {raw['id']: {'ret': raw['return']}}
found.add(raw['data']['id'])
ret = {raw['data']['id']: {'ret': raw['data']['return']}}
if 'out' in raw:
ret[raw['id']]['out'] = raw['out']
log.debug('jid {0} return from {1}'.format(jid, raw['id']))
ret[raw['data']['id']]['out'] = raw['data']['out']
log.debug('jid {0} return from {1}'.format(jid, raw['data']['id']))
yield ret
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug('jid {0} found all minions {1}'.format(jid, found))
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug('jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()))
continue
break
continue
# Then event system timeout was reached and nothing was returned
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug('jid {0} found all minions {1}'.format(jid, found))
@ -867,44 +878,75 @@ class LocalClient(object):
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug(
'jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()
)
)
log.debug('jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()))
continue
break
if last_time:
if len(found) < len(minions):
log.info(
'jid {0} minions {1} did not return in time'.format(
jid, (minions - found)
)
)
if expect_minions:
for minion in list((minions - found)):
yield {minion: {'failed': True}}
break
if int(time.time()) > timeout_at:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs)
still_running = [id_ for id_, jdat in jinfo.iteritems()
if jdat
]
if still_running:
timeout_at = int(time.time()) + timeout
log.debug(
'jid {0} still running on {1} will now timeout at {2}'.format(
jid, still_running, datetime.fromtimestamp(timeout_at).time()
)
)
continue
# let start the timeouts for all remaining minions
for id_ in minions - found:
# if we have a new minion in the list, make sure it has a timeout
if id_ not in minion_timeouts:
minion_timeouts[id_] = time.time() + timeout
# if we don't have the job info iterator (or its timed out), lets make it
if jinfo_iter is None or time.time() > jinfo_timeout:
# need our own event listener, so we don't clobber the class one
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
listen=not self.opts.get('__worker', False))
jinfo = self.gather_job_info(jid, tgt, tgt_type)
# if we weren't assigned any jid that means the master thinks
# we have nothing to send
if 'jid' not in jinfo:
jinfo_iter = []
else:
last_time = True
log.debug('jid {0} not running on any minions last time'.format(jid))
jinfo_iter = self.get_returns_no_block(jinfo['jid'], event=event)
jinfo_timeout = time.time() + self.opts['gather_job_timeout']
# check for minions that are running the job still
for raw in jinfo_iter:
# if there are no more events, lets stop waiting for the jinfo
if raw is None:
break
# TODO: move to a library??
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw.get('data', {}):
minions.update(raw['syndic'])
continue
if 'return' not in raw.get('data', {}):
continue
# if the job isn't running there anymore... don't count
if raw['data']['return'] == {}:
continue
# if we didn't originally target the minion, lets add it to the list
if raw['data']['id'] not in minions:
minions.add(raw['data']['id'])
# update this minion's timeout, as long as the job is still running
minion_timeouts[raw['data']['id']] = time.time() + timeout
# if we have hit all minion timeouts, lets call it
done = True
for id_ in minions - found:
if time.time() < minion_timeouts[id_]:
done = False
break
if done:
break
# don't spin
time.sleep(0.01)
if expect_minions:
for minion in list((minions - found)):
yield {minion: {'failed': True}}
def get_returns(
self,
@ -1119,11 +1161,6 @@ class LocalClient(object):
Get the returns for the command line interface via the event system
'''
log.trace('func get_cli_event_returns()')
if not isinstance(minions, set):
if isinstance(minions, string_types):
minions = set([minions])
elif isinstance(minions, (list, tuple)):
minions = set(list(minions))
if verbose:
msg = 'Executing job with jid {0}'.format(jid)
@ -1132,110 +1169,33 @@ class LocalClient(object):
elif show_jid:
print('jid: {0}'.format(jid))
if timeout is None:
timeout = self.opts['timeout']
# lazy load the connected minions
connected_minions = None
start = time.time()
timeout_at = start + timeout
found = set()
# Check to see if the jid is real, if not return the empty dict
if not self.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning('jid does not exist')
yield {}
# stop the iteration, since the jid is invalid
raise StopIteration()
# Wait for the hosts to check in
syndic_wait = 0
last_time = False
while True:
# Process events until timeout is reached or all minions have returned
time_left = timeout_at - time.time()
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
raw = self.event.get_event(wait, jid)
log.trace('get_cli_event_returns() called self.event.get_event() and received: raw={0}'.format(raw))
if raw is not None:
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw:
minions.update(raw['syndic'])
continue
if 'return' not in raw:
continue
found.add(raw.get('id'))
ret = {raw['id']: {'ret': raw['return']}}
if 'out' in raw:
ret[raw['id']]['out'] = raw['out']
if 'retcode' in raw:
ret[raw['id']]['retcode'] = raw['retcode']
log.trace('raw = {0}'.format(raw))
log.trace('ret = {0}'.format(ret))
log.trace('yeilding \'ret\'')
yield ret
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = time.time() + 1
continue
break
continue
# Then event system timeout was reached and nothing was returned
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = time.time() + 1
continue
break
if last_time:
if verbose or show_timeout:
if self.opts.get('minion_data_cache', False) \
or tgt_type in ('glob', 'pcre', 'list'):
if len(found) < len(minions):
fail = sorted(list(minions.difference(found)))
for minion in fail:
yield({
minion: {
'out': 'no_return',
'ret': 'Minion did not return'
}
})
break
if time.time() > timeout_at:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs)
more_time = False
for id_ in jinfo:
if jinfo[id_]:
if verbose:
print(
'Execution is still running on {0}'.format(id_)
)
more_time = True
if not more_time:
cache_jinfo = self.get_cache_returns(jid)
for id_ in cache_jinfo:
if id_ == tgt:
found.add(cache_jinfo.get('id'))
ret = {id_: {'ret': cache_jinfo[id_]['ret']}}
if 'out' in cache_jinfo[id_]:
ret[id_]['out'] = cache_jinfo[id_]['out']
if 'retcode' in cache_jinfo[id_]:
ret[id_]['retcode'] = cache_jinfo[id_]['retcode']
yield ret
if more_time:
timeout_at = time.time() + timeout
continue
for ret in self.get_iter_returns(jid,
minions,
timeout=timeout,
tgt=tgt,
tgt_type='glob',
expect_minions=(verbose or show_timeout)
):
# replace the return structure for missing minions
for id_, min_ret in ret.iteritems():
if min_ret.get('failed') is True:
if connected_minions is None:
connected_minions = salt.utils.minions.CkMinions(self.opts).connected_ids()
if connected_minions and id_ not in connected_minions:
yield {id_: {'out': 'no_return',
'ret': 'Minion did not return. [Not connected]'}}
else:
yield({
id_: {
'out': 'no_return',
'ret': 'Minion did not return. [No response]'
}
})
else:
last_time = True
time.sleep(0.01)
yield {id_: min_ret}
def get_event_iter_returns(self, jid, minions, timeout=None):
'''