diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 0dc87648f4..7dfb092922 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -37,6 +37,7 @@ import time import copy import getpass import logging +from datetime import datetime # Import salt libs import salt.config @@ -182,10 +183,11 @@ class LocalClient(object): ''' Return the information about a given job ''' + log.debug('Checking whether jid %s is still running', jid) jinfo = self.cmd(tgt, 'saltutil.find_job', [jid], - 2, + self.opts['gather_job_timeout'], tgt_type, **kwargs) return jinfo @@ -838,6 +840,8 @@ class LocalClient(object): # Wait for the hosts to check in syndic_wait = 0 last_time = False + log.debug("get_iter_returns for jid %s sent to %s will timeout at %s", + jid, minions, datetime.fromtimestamp(timeout_at).time()) while True: # Process events until timeout is reached or all minions have returned time_left = timeout_at - int(time.time()) @@ -866,7 +870,10 @@ class LocalClient(object): if syndic_wait < self.opts.get('syndic_wait', 1): syndic_wait += 1 timeout_at = int(time.time()) + 1 + log.debug('jid %s syndic_wait %s will now timeout at %s', + jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()) continue + log.debug('jid %s found all minions', jid) break continue # Then event system timeout was reached and nothing was returned @@ -876,27 +883,35 @@ class LocalClient(object): if syndic_wait < self.opts.get('syndic_wait', 1): syndic_wait += 1 timeout_at = int(time.time()) + 1 + log.debug('jid %s syndic_wait %s will now timeout at %s', + jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()) continue + log.debug('jid %s found all minions', jid) break if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: # The timeout +1 has not been reached and there is still a # write tag for the syndic continue if last_time: + if len(found) < len(minions): + log.info('jid %s minions %s did not return in time', + jid, minions) break if int(time.time()) > timeout_at: # The timeout has been reached, check the jid to see if the # timeout needs to be increased jinfo = self.gather_job_info(jid, tgt, tgt_type, **kwargs) - more_time = False - for id_ in jinfo: - if jinfo[id_]: - more_time = True - if more_time: + still_running = [id_ for id_, jdat in jinfo.iteritems() + if jdat + ] + if still_running: timeout_at = int(time.time()) + timeout + log.debug('jid %s still running on %s will now timeout at %s', + jid, still_running, datetime.fromtimestamp(timeout_at).time()) continue else: last_time = True + log.debug('jid %s not running on any minions last time', jid) continue time.sleep(0.01) @@ -916,11 +931,15 @@ class LocalClient(object): self.opts['hash_type']) start = int(time.time()) timeout_at = start + timeout + log.debug("get_returns for jid %s sent to %s will timeout at %s", + jid, minions, datetime.fromtimestamp(timeout_at).time()) + found = set() ret = {} wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict if not os.path.isdir(jid_dir): + log.warning("jid_dir (%s) does not exist", jid_dir) return ret # Wait for the hosts to check in while True: @@ -932,17 +951,21 @@ class LocalClient(object): ret[raw['id']] = raw['return'] if len(found.intersection(minions)) >= len(minions): # All minions have returned, break out of the loop + log.debug("jid %s found all minions", jid) break continue # Then event system timeout was reached and nothing was returned if len(found.intersection(minions)) >= len(minions): # All minions have returned, break out of the loop + log.debug("jid %s found all minions", jid) break if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: # The timeout +1 has not been reached and there is still a # write tag for the syndic continue if int(time.time()) > timeout_at: + log.info('jid %s minions %s did not return in time', + jid, minions) break time.sleep(0.01) return ret diff --git a/salt/config.py b/salt/config.py index 83f3972ee2..f866cd541e 100644 --- a/salt/config.py +++ b/salt/config.py @@ -183,6 +183,7 @@ VALID_OPTS = { 'keysize': int, 'salt_transport': str, 'enumerate_proxy_minions': bool, + 'gather_job_timeout': int, } # default configurations @@ -388,7 +389,8 @@ DEFAULT_MASTER_OPTS = { 'sign_pub_messages': False, 'keysize': 4096, 'salt_transport': 'zeromq', - 'enumerate_proxy_minions': False + 'enumerate_proxy_minions': False, + 'gather_job_timeout': 2, } # ----- Salt Cloud Configuration Defaults -----------------------------------> diff --git a/salt/minion.py b/salt/minion.py index d47da8b252..82adb15e35 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -19,6 +19,7 @@ import time import traceback import sys import signal +import errno from random import randint import salt @@ -1176,7 +1177,6 @@ class Minion(object): ) self.poller = zmq.Poller() - self.epoller = zmq.Poller() self.socket = self.context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.IDENTITY, self.opts['id']) @@ -1225,7 +1225,7 @@ class Minion(object): ) self.socket.connect(self.master_pub) self.poller.register(self.socket, zmq.POLLIN) - self.epoller.register(self.epull_sock, zmq.POLLIN) + self.poller.register(self.epull_sock, zmq.POLLIN) # Send an event to the master that the minion is live self._fire_master( 'Minion {0} started at {1}'.format( @@ -1291,38 +1291,46 @@ class Minion(object): 'Exception {0} occurred in scheduled job'.format(exc) ) try: + log.trace("Check main poller timeout %s" % loop_interval) socks = dict(self.poller.poll( loop_interval * 1000) ) - if self.socket in socks and socks[self.socket] == zmq.POLLIN: - payload = self.serial.loads(self.socket.recv()) + if socks.get(self.socket) == zmq.POLLIN: + payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK)) + log.trace("Handling payload") self._handle_payload(payload) + # Check the event system - if self.epoller.poll(1): + if socks.get(self.epull_sock) == zmq.POLLIN: + package = self.epull_sock.recv(zmq.NOBLOCK) + log.debug("Handling event %r", package) try: - while True: - package = self.epull_sock.recv(zmq.NOBLOCK) - if package.startswith('module_refresh'): - self.module_refresh() - elif package.startswith('pillar_refresh'): + if package.startswith('module_refresh'): + self.module_refresh() + elif package.startswith('pillar_refresh'): + self.pillar_refresh() + elif package.startswith('grains_refresh'): + if self.grains_cache != self.opts['grains']: self.pillar_refresh() - elif package.startswith('grains_refresh'): - if self.grains_cache != self.opts['grains']: - self.pillar_refresh() - self.grains_cache = self.opts['grains'] - elif package.startswith('fire_master'): - tag, data = salt.utils.event.MinionEvent.unpack(package) - log.debug("Forwarding master event tag={tag}".format(tag=data['tag'])) - self._fire_master(data['data'], data['tag'], data['events'], data['pretag']) - - self.epub_sock.send(package) + self.grains_cache = self.opts['grains'] + elif package.startswith('fire_master'): + tag, data = salt.utils.event.MinionEvent.unpack(package) + log.debug("Forwarding master event tag={tag}".format(tag=data['tag'])) + self._fire_master(data['data'], data['tag'], data['events'], data['pretag']) + self.epub_sock.send(package) except Exception: - pass - except zmq.ZMQError: - # This is thrown by the interrupt caused by python handling the - # SIGCHLD. This is a safe error and we just start the poll - # again + log.debug("Exception while handling events", exc_info=True) + + except zmq.ZMQError as e: + # The interrupt caused by python handling the + # SIGCHLD. Throws this error with errno == EINTR. + # Nothing to recieve on the zmq socket throws this error + # with EAGAIN. + # Both are sage to ignore + if e.errno != errno.EAGAIN and e.errno != errno.EINTR: + log.critical('Unexpected ZMQError while polling minion', + exc_info=True) continue except Exception: log.critical( @@ -1415,17 +1423,6 @@ class Minion(object): socket[0].close() self.poller.unregister(socket[0]) - if hasattr(self, 'epoller'): - if isinstance(self.epoller.sockets, dict): - for socket in self.epoller.sockets.keys(): - if socket.closed is False: - socket.close() - self.epoller.unregister(socket) - else: - for socket in self.epoller.sockets: - if socket[0].closed is False: - socket[0].close() - self.epoller.unregister(socket[0]) if hasattr(self, 'epub_sock') and self.epub_sock.closed is False: self.epub_sock.close() if hasattr(self, 'epull_sock') and self.epull_sock.closed is False: