Merge pull request #10063 from mbirtwell/fix_a_missing_returns_case_develop

Fix a case when job returns are missed by salt because saltutil.find_job times out
This commit is contained in:
Thomas S Hatch 2014-01-31 09:52:22 -08:00
commit bbd0e5855b
3 changed files with 65 additions and 43 deletions

View File

@ -37,6 +37,7 @@ import time
import copy
import getpass
import logging
from datetime import datetime
# Import salt libs
import salt.config
@ -182,10 +183,11 @@ class LocalClient(object):
'''
Return the information about a given job
'''
log.debug('Checking whether jid %s is still running', jid)
jinfo = self.cmd(tgt,
'saltutil.find_job',
[jid],
2,
self.opts['gather_job_timeout'],
tgt_type,
**kwargs)
return jinfo
@ -838,6 +840,8 @@ class LocalClient(object):
# Wait for the hosts to check in
syndic_wait = 0
last_time = False
log.debug("get_iter_returns for jid %s sent to %s will timeout at %s",
jid, minions, datetime.fromtimestamp(timeout_at).time())
while True:
# Process events until timeout is reached or all minions have returned
time_left = timeout_at - int(time.time())
@ -866,7 +870,10 @@ class LocalClient(object):
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug('jid %s syndic_wait %s will now timeout at %s',
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time())
continue
log.debug('jid %s found all minions', jid)
break
continue
# Then event system timeout was reached and nothing was returned
@ -876,27 +883,35 @@ class LocalClient(object):
if syndic_wait < self.opts.get('syndic_wait', 1):
syndic_wait += 1
timeout_at = int(time.time()) + 1
log.debug('jid %s syndic_wait %s will now timeout at %s',
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time())
continue
log.debug('jid %s found all minions', jid)
break
if glob.glob(wtag) and int(time.time()) <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if last_time:
if len(found) < len(minions):
log.info('jid %s minions %s did not return in time',
jid, minions)
break
if int(time.time()) > timeout_at:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, **kwargs)
more_time = False
for id_ in jinfo:
if jinfo[id_]:
more_time = True
if more_time:
still_running = [id_ for id_, jdat in jinfo.iteritems()
if jdat
]
if still_running:
timeout_at = int(time.time()) + timeout
log.debug('jid %s still running on %s will now timeout at %s',
jid, still_running, datetime.fromtimestamp(timeout_at).time())
continue
else:
last_time = True
log.debug('jid %s not running on any minions last time', jid)
continue
time.sleep(0.01)
@ -916,11 +931,15 @@ class LocalClient(object):
self.opts['hash_type'])
start = int(time.time())
timeout_at = start + timeout
log.debug("get_returns for jid %s sent to %s will timeout at %s",
jid, minions, datetime.fromtimestamp(timeout_at).time())
found = set()
ret = {}
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
log.warning("jid_dir (%s) does not exist", jid_dir)
return ret
# Wait for the hosts to check in
while True:
@ -932,17 +951,21 @@ class LocalClient(object):
ret[raw['id']] = raw['return']
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug("jid %s found all minions", jid)
break
continue
# Then event system timeout was reached and nothing was returned
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
log.debug("jid %s found all minions", jid)
break
if glob.glob(wtag) and int(time.time()) <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if int(time.time()) > timeout_at:
log.info('jid %s minions %s did not return in time',
jid, minions)
break
time.sleep(0.01)
return ret

View File

@ -183,6 +183,7 @@ VALID_OPTS = {
'keysize': int,
'salt_transport': str,
'enumerate_proxy_minions': bool,
'gather_job_timeout': int,
}
# default configurations
@ -388,7 +389,8 @@ DEFAULT_MASTER_OPTS = {
'sign_pub_messages': False,
'keysize': 4096,
'salt_transport': 'zeromq',
'enumerate_proxy_minions': False
'enumerate_proxy_minions': False,
'gather_job_timeout': 2,
}
# ----- Salt Cloud Configuration Defaults ----------------------------------->

View File

@ -19,6 +19,7 @@ import time
import traceback
import sys
import signal
import errno
from random import randint
import salt
@ -1176,7 +1177,6 @@ class Minion(object):
)
self.poller = zmq.Poller()
self.epoller = zmq.Poller()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
@ -1225,7 +1225,7 @@ class Minion(object):
)
self.socket.connect(self.master_pub)
self.poller.register(self.socket, zmq.POLLIN)
self.epoller.register(self.epull_sock, zmq.POLLIN)
self.poller.register(self.epull_sock, zmq.POLLIN)
# Send an event to the master that the minion is live
self._fire_master(
'Minion {0} started at {1}'.format(
@ -1291,38 +1291,46 @@ class Minion(object):
'Exception {0} occurred in scheduled job'.format(exc)
)
try:
log.trace("Check main poller timeout %s" % loop_interval)
socks = dict(self.poller.poll(
loop_interval * 1000)
)
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv())
if socks.get(self.socket) == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
log.trace("Handling payload")
self._handle_payload(payload)
# Check the event system
if self.epoller.poll(1):
if socks.get(self.epull_sock) == zmq.POLLIN:
package = self.epull_sock.recv(zmq.NOBLOCK)
log.debug("Handling event %r", package)
try:
while True:
package = self.epull_sock.recv(zmq.NOBLOCK)
if package.startswith('module_refresh'):
self.module_refresh()
elif package.startswith('pillar_refresh'):
if package.startswith('module_refresh'):
self.module_refresh()
elif package.startswith('pillar_refresh'):
self.pillar_refresh()
elif package.startswith('grains_refresh'):
if self.grains_cache != self.opts['grains']:
self.pillar_refresh()
elif package.startswith('grains_refresh'):
if self.grains_cache != self.opts['grains']:
self.pillar_refresh()
self.grains_cache = self.opts['grains']
elif package.startswith('fire_master'):
tag, data = salt.utils.event.MinionEvent.unpack(package)
log.debug("Forwarding master event tag={tag}".format(tag=data['tag']))
self._fire_master(data['data'], data['tag'], data['events'], data['pretag'])
self.epub_sock.send(package)
self.grains_cache = self.opts['grains']
elif package.startswith('fire_master'):
tag, data = salt.utils.event.MinionEvent.unpack(package)
log.debug("Forwarding master event tag={tag}".format(tag=data['tag']))
self._fire_master(data['data'], data['tag'], data['events'], data['pretag'])
self.epub_sock.send(package)
except Exception:
pass
except zmq.ZMQError:
# This is thrown by the interrupt caused by python handling the
# SIGCHLD. This is a safe error and we just start the poll
# again
log.debug("Exception while handling events", exc_info=True)
except zmq.ZMQError as e:
# The interrupt caused by python handling the
# SIGCHLD. Throws this error with errno == EINTR.
# Nothing to recieve on the zmq socket throws this error
# with EAGAIN.
# Both are sage to ignore
if e.errno != errno.EAGAIN and e.errno != errno.EINTR:
log.critical('Unexpected ZMQError while polling minion',
exc_info=True)
continue
except Exception:
log.critical(
@ -1415,17 +1423,6 @@ class Minion(object):
socket[0].close()
self.poller.unregister(socket[0])
if hasattr(self, 'epoller'):
if isinstance(self.epoller.sockets, dict):
for socket in self.epoller.sockets.keys():
if socket.closed is False:
socket.close()
self.epoller.unregister(socket)
else:
for socket in self.epoller.sockets:
if socket[0].closed is False:
socket[0].close()
self.epoller.unregister(socket[0])
if hasattr(self, 'epub_sock') and self.epub_sock.closed is False:
self.epub_sock.close()
if hasattr(self, 'epull_sock') and self.epull_sock.closed is False: