Merge pull request #16078 from jacksontj/develop

Rework of more_time feature
This commit is contained in:
Thomas S Hatch 2014-09-26 14:27:12 -06:00
commit fd90d8a066
2 changed files with 151 additions and 195 deletions

View File

@ -24,6 +24,8 @@ import os
import time
import copy
import logging
import zmq
import errno
from datetime import datetime
from salt._compat import string_types
@ -186,7 +188,7 @@ class LocalClient(object):
# Looks like the timeout is invalid, use config
return self.opts['timeout']
def gather_job_info(self, jid, tgt, tgt_type, minions, **kwargs):
def gather_job_info(self, jid, tgt, tgt_type):
'''
Return the information about a given job
'''
@ -200,15 +202,7 @@ class LocalClient(object):
timeout=timeout,
)
if not pub_data:
return pub_data
minions.update(pub_data['minions'])
return self.get_returns(pub_data['jid'],
minions,
self._get_timeout(timeout),
pending_tags=[jid])
return pub_data
def _check_pub_data(self, pub_data):
'''
@ -778,6 +772,29 @@ class LocalClient(object):
if len(found.intersection(minions)) >= len(minions):
raise StopIteration()
# TODO: tests!!
def get_returns_no_block(
self,
jid,
event=None):
'''
Raw function to just return events of jid excluding timeout logic
Yield either the raw event data or None
'''
if event is None:
event = self.event
while True:
try:
raw = event.get_event_noblock()
if raw['tag'].startswith(jid):
yield raw
except zmq.ZMQError as ex:
if ex.errno == errno.EAGAIN or ex.errno == errno.EINTR:
yield None
else:
raise
def get_iter_returns(
self,
jid,
@ -802,6 +819,10 @@ class LocalClient(object):
timeout = self.opts['timeout']
start = int(time.time())
timeout_at = start + timeout
# timeouts per minion, id_ -> timeout time
minion_timeouts = {}
found = set()
# Check to see if the jid is real, if not return the empty dict
if not self.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
@ -817,49 +838,37 @@ class LocalClient(object):
jid, minions, datetime.fromtimestamp(timeout_at).time()
)
)
# iterator for this job's return
ret_iter = self.get_returns_no_block(jid)
# iterator for the info of this job
jinfo_iter = []
jinfo_timeout = time.time() + timeout
while True:
# Process events until timeout is reached or all minions have returned
time_left = timeout_at - int(time.time())
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
raw = None
# Look for events if we haven't yet found all the minions or if we are still waiting for
# the syndics to report on how many minions they have forwarded the command to
if (len(found.intersection(minions)) < len(minions) or
(self.opts['order_masters'] and syndic_wait < self.opts.get('syndic_wait', 1))):
raw = self.event.get_event(wait, jid)
if raw is not None:
for raw in ret_iter:
# if we got None, then there were no events
if raw is None:
break
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw:
minions.update(raw['syndic'])
continue
if 'return' not in raw:
if 'return' not in raw['data']:
continue
if kwargs.get('raw', False):
found.add(raw['id'])
found.add(raw['data']['id'])
yield raw
else:
found.add(raw['id'])
ret = {raw['id']: {'ret': raw['return']}}
found.add(raw['data']['id'])
ret = {raw['data']['id']: {'ret': raw['data']['return']}}
if 'out' in raw:
ret[raw['id']]['out'] = raw['out']
log.debug('jid {0} return from {1}'.format(jid, raw['id']))
ret[raw['data']['id']]['out'] = raw['data']['out']
log.debug('jid {0} return from {1}'.format(jid, raw['data']['id']))
yield ret
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug('jid {0} found all minions {1}'.format(jid, found))
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug('jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()))
continue
break
continue
# Then event system timeout was reached and nothing was returned
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug('jid {0} found all minions {1}'.format(jid, found))
@ -867,44 +876,74 @@ class LocalClient(object):
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug(
'jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()
)
)
log.debug('jid {0} syndic_wait {1} will now timeout at {2}'.format(
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()))
continue
break
if last_time:
if len(found) < len(minions):
log.info(
'jid {0} minions {1} did not return in time'.format(
jid, (minions - found)
)
)
if expect_minions:
for minion in list((minions - found)):
yield {minion: {'failed': True}}
break
if int(time.time()) > timeout_at:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs)
still_running = [id_ for id_, jdat in jinfo.iteritems()
if jdat
]
if still_running:
timeout_at = int(time.time()) + timeout
log.debug(
'jid {0} still running on {1} will now timeout at {2}'.format(
jid, still_running, datetime.fromtimestamp(timeout_at).time()
)
)
continue
# let start the timeouts for all remaining minions
for id_ in minions - found:
# if we have a new minion in the list, make sure it has a timeout
if id_ not in minion_timeouts:
minion_timeouts[id_] = time.time() + timeout
# if we don't have the job info iterator (or its timed out), lets make it
if time.time() > jinfo_timeout:
# need our own event listener, so we don't clobber the class one
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
listen=not self.opts.get('__worker', False))
jinfo = self.gather_job_info(jid, tgt, tgt_type)
# if we weren't assigned any jid that means the master thinks
# we have nothing to send
if 'jid' not in jinfo:
jinfo_iter = []
else:
last_time = True
log.debug('jid {0} not running on any minions last time'.format(jid))
jinfo_iter = self.get_returns_no_block(jinfo['jid'], event=event)
jinfo_timeout = time.time() + self.opts['gather_job_timeout']
# check for minions that are running the job still
for raw in jinfo_iter:
# if there are no more events, lets stop waiting for the jinfo
if raw is None:
break
# TODO: move to a library??
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw.get('data', {}):
minions.update(raw['syndic'])
continue
if 'return' not in raw.get('data', {}):
continue
# if the job isn't running there anymore... don't count
if raw['data']['return'] == {}:
continue
# if we didn't originally target the minion, lets add it to the list
if raw['data']['id'] not in minions:
minions.add(raw['data']['id'])
# update this minion's timeout, as long as the job is still running
minion_timeouts[raw['data']['id']] = time.time() + timeout
# if we have hit all minion timeouts, lets call it
done = True
for id_ in minions - found:
if time.time() < minion_timeouts[id_]:
done = False
break
if done:
break
# don't spin
time.sleep(0.01)
if expect_minions:
for minion in list((minions - found)):
yield {minion: {'failed': True}}
def get_returns(
self,
@ -1119,11 +1158,6 @@ class LocalClient(object):
Get the returns for the command line interface via the event system
'''
log.trace('func get_cli_event_returns()')
if not isinstance(minions, set):
if isinstance(minions, string_types):
minions = set([minions])
elif isinstance(minions, (list, tuple)):
minions = set(list(minions))
if verbose:
msg = 'Executing job with jid {0}'.format(jid)
@ -1132,116 +1166,33 @@ class LocalClient(object):
elif show_jid:
print('jid: {0}'.format(jid))
if timeout is None:
timeout = self.opts['timeout']
# lazy load the connected minions
connected_minions = None
start = time.time()
timeout_at = start + timeout
found = set()
# Check to see if the jid is real, if not return the empty dict
if not self.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning('jid does not exist')
yield {}
# stop the iteration, since the jid is invalid
raise StopIteration()
# Wait for the hosts to check in
syndic_wait = 0
last_time = False
while True:
# Process events until timeout is reached or all minions have returned
time_left = timeout_at - time.time()
# Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left)
raw = self.event.get_event(wait, jid)
log.trace('get_cli_event_returns() called self.event.get_event() and received: raw={0}'.format(raw))
if raw is not None:
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
if 'syndic' in raw:
minions.update(raw['syndic'])
continue
if 'return' not in raw:
continue
found.add(raw.get('id'))
ret = {raw['id']: {'ret': raw['return']}}
if 'out' in raw:
ret[raw['id']]['out'] = raw['out']
if 'retcode' in raw:
ret[raw['id']]['retcode'] = raw['retcode']
log.trace('raw = {0}'.format(raw))
log.trace('ret = {0}'.format(ret))
log.trace('yeilding \'ret\'')
yield ret
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = time.time() + 1
continue
break
continue
# Then event system timeout was reached and nothing was returned
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
if self.opts['order_masters']:
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = time.time() + 1
continue
break
if last_time:
if verbose or show_timeout:
if self.opts.get('minion_data_cache', False) \
or tgt_type in ('glob', 'pcre', 'list'):
if len(found) < len(minions):
fail = sorted(list(minions.difference(found)))
# May incur heavy disk access
connected_minions = salt.utils.minions.CkMinions(self.opts).connected_ids()
for failed in fail:
if connected_minions and failed not in connected_minions:
yield {failed: {'out': 'no_return',
'ret': 'Minion did not return. [Not connected]'}}
else:
yield({
failed: {
'out': 'no_return',
'ret': 'Minion did not return. [No response]'
}
})
break
if time.time() > timeout_at:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs)
more_time = False
for id_ in jinfo:
if jinfo[id_]:
if verbose:
print(
'Execution is still running on {0}'.format(id_)
)
more_time = True
if not more_time:
cache_jinfo = self.get_cache_returns(jid)
for id_ in cache_jinfo:
if id_ == tgt:
found.add(cache_jinfo.get('id'))
ret = {id_: {'ret': cache_jinfo[id_]['ret']}}
if 'out' in cache_jinfo[id_]:
ret[id_]['out'] = cache_jinfo[id_]['out']
if 'retcode' in cache_jinfo[id_]:
ret[id_]['retcode'] = cache_jinfo[id_]['retcode']
yield ret
if more_time:
timeout_at = time.time() + timeout
continue
for ret in self.get_iter_returns(jid,
minions,
timeout=timeout,
tgt=tgt,
tgt_type='glob',
expect_minions=(verbose or show_timeout)
):
# replace the return structure for missing minions
for id_, min_ret in ret.iteritems():
if min_ret.get('failed') is True:
if connected_minions is None:
connected_minions = salt.utils.minions.CkMinions(self.opts).connected_ids()
if connected_minions and id_ not in connected_minions:
yield {id_: {'out': 'no_return',
'ret': 'Minion did not return. [Not connected]'}}
else:
yield({
id_: {
'out': 'no_return',
'ret': 'Minion did not return. [No response]'
}
})
else:
last_time = True
time.sleep(0.01)
yield {id_: min_ret}
def get_event_iter_returns(self, jid, minions, timeout=None):
'''
@ -1252,6 +1203,8 @@ class LocalClient(object):
if timeout is None:
timeout = self.opts['timeout']
timeout_at = time.time() + timeout
found = set()
# Check to see if the jid is real, if not return the empty dict
if not self.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
@ -1262,7 +1215,7 @@ class LocalClient(object):
# Wait for the hosts to check in
while True:
raw = self.event.get_event(timeout)
if raw is None:
if raw is None or time.time() > timeout_at:
# Timeout reached
break
if 'minions' in raw.get('data', {}):

View File

@ -73,15 +73,18 @@ try:
# if its not ephemeral, make sure we didn't already grab it
if not self.ephemeral_lease:
for child in self.client.get_children(self.path):
try:
data, stat = self.client.get(self.path + "/" + child)
if identifier == data.decode('utf-8'):
self.create_path = self.path + "/" + child
self.is_acquired = True
break
except NoNodeError: # pragma: nocover
pass
try:
for child in self.client.get_children(self.path):
try:
data, stat = self.client.get(self.path + "/" + child)
if identifier == data.decode('utf-8'):
self.create_path = self.path + "/" + child
self.is_acquired = True
break
except NoNodeError: # pragma: nocover
pass
except NoNodeError: # pragma: nocover
pass
def _get_lease(self, data=None):
# Make sure the session is still valid