From 2b8c106108c95e494cba0d78dd18b017c0e27fd6 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 31 Jul 2014 16:33:53 -0700 Subject: [PATCH 01/11] Add master_id to pub/ret data as ground work for multi_syndic --- salt/master.py | 5 +++++ salt/minion.py | 16 ++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/salt/master.py b/salt/master.py index 055b21c646..7bd3ef59ad 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2195,6 +2195,11 @@ class ClearFuncs(object): 'jid': clear_load['jid'], 'ret': clear_load['ret'], } + # if you specified a master id, lets put that in the load + if 'master_id' in self.opts: + load['master_id'] = self.opts['master_id'] + elif 'master_id' in extra: + load['master_id'] = extra['master_id'] if 'id' in extra: load['id'] = extra['id'] diff --git a/salt/minion.py b/salt/minion.py index b22b7dfcfc..4167f9ea35 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1067,6 +1067,8 @@ class Minion(MinionBase): ret['jid'] = data['jid'] ret['fun'] = data['fun'] ret['fun_args'] = data['arg'] + if 'master_id' in data: + ret['master_id'] = data['master_id'] minion_instance._return_pub(ret) if data['ret']: ret['id'] = opts['id'] @@ -1802,7 +1804,7 @@ class Syndic(Minion): data = self.crypticle.loads(load) # Verify that the publication is valid if 'tgt' not in data or 'jid' not in data or 'fun' not in data \ - or 'to' not in data or 'arg' not in data: + or 'arg' not in data: return data['to'] = int(data['to']) - 1 if 'user' in data: @@ -1835,6 +1837,12 @@ class Syndic(Minion): # Set up default tgt_type if 'tgt_type' not in data: data['tgt_type'] = 'glob' + kwargs = {} + + # if a master_id is in the data, add it to publish job + if 'master_id' in data: + kwargs['master_id'] = data['master_id'] + # Send out the publication self.local.pub(data['tgt'], data['fun'], @@ -1842,7 +1850,8 @@ class Syndic(Minion): data['tgt_type'], data['ret'], data['jid'], - data['to']) + data['to'], + **kwargs) # Syndic Tune In def tune_in(self): @@ -1982,6 +1991,8 @@ class Syndic(Minion): jdict['__load__'].update( self.mminion.returners[fstr](event['data']['jid']) ) + if 'master_id' in event['data']: + jdict['master_id'] = event['data']['master_id'] jdict[event['data']['id']] = event['data']['return'] else: # Add generic event aggregation here @@ -1994,6 +2005,7 @@ class Syndic(Minion): self._fire_master(events=self.raw_events, pretag=tagify(self.opts['id'], base='syndic'), ) + print (self.jids) for jid in self.jids: self._return_pub(self.jids[jid], '_syndic_return') self._reset_event_aggregation() From f226809cd592464fd45f10d464e310f4cd49f364 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 31 Jul 2014 20:50:46 -0700 Subject: [PATCH 02/11] Basics working, can't handle restart of a master yet... --- salt/__init__.py | 6 +- salt/minion.py | 247 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 251 insertions(+), 2 deletions(-) diff --git a/salt/__init__.py b/salt/__init__.py index b7b7287e65..4da5735a9b 100644 --- a/salt/__init__.py +++ b/salt/__init__.py @@ -420,7 +420,11 @@ class Syndic(parsers.SyndicOptionParser): # Late import so logging works correctly import salt.minion self.daemonize_if_required() - self.syndic = salt.minion.Syndic(self.config) + # if its a multisyndic, do so + if isinstance(self.config.get('master'), list): + self.syndic = salt.minion.MultiSyndic(self.config) + else: + self.syndic = salt.minion.Syndic(self.config) self.set_pidfile() def start(self): diff --git a/salt/minion.py b/salt/minion.py index 4167f9ea35..1223eef0cf 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2005,7 +2005,6 @@ class Syndic(Minion): self._fire_master(events=self.raw_events, pretag=tagify(self.opts['id'], base='syndic'), ) - print (self.jids) for jid in self.jids: self._return_pub(self.jids[jid], '_syndic_return') self._reset_event_aggregation() @@ -2021,6 +2020,252 @@ class Syndic(Minion): if hasattr(self, 'local'): del self.local +class MultiSyndic(Syndic): + ''' + Make a Syndic minion, this minion will handle relaying jobs and returns from + all minions connected to it to the list of masters it is connected to. + + Note: jobs will be returned best-effort to the requesting master + ''' + # timeout for one of the connections to an upstream master + MASTER_MINION_CONNECT_TIMEOUT = 5 + def __init__(self, opts): + opts['loop_interval'] = 1 + # keep a copy, since we'll be messing with it a bit + self.raw_opts = copy.copy(opts) + super(MultiSyndic, self).__init__(opts) + + self.context = zmq.Context() + + self.master_minions = {} + for master in set(self.raw_opts['master']): + s_opts = copy.copy(self.raw_opts) + s_opts['master'] = master + self.master_minions[master] = {'opts': s_opts, + 'auth_wait': s_opts['acceptance_wait_time']} + self._connect_to_master(master) + + def _connect_to_master(self, master): + ''' + Attempt to connect to master, including back-off for each one + + return boolean of wether you connected or not + ''' + if master not in self.master_minions: + log.error('Unable to connect to {0}, not in the list of masters'.format(master)) + return False + + minion = self.master_minions[master] + if time.time() - minion['auth_wait'] > minion.get('last', 0): + minion['last'] = time.time() + if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: + minion['auth_wait'] += auth_wait + try: + t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False) + + # TODO: make less terrible... + # connect the minion to the master + t_minion.socket = self.context.socket(zmq.SUB) + t_minion._set_reconnect_ivl() + t_minion._setsockopts() + t_minion.socket.connect(t_minion.master_pub) + + self.master_minions[master]['minion'] = t_minion + self.master_minions[master]['auth_wait'] = self.opts['acceptance_wait_time'] + + return True + except SaltClientError: + log.error('Error while bring up minion for multi-master. Is master {0} responding?'.format(master)) + return False + + def _pick_master(self, master_id=None): + ''' + Pick a master (based on the master_id if you have it) and change the opts + to match that master (MAJOR HACK) + ''' + if master_id not in self.master_minions: + # TODO: make random + master_id = self.master_minions.keys()[0] + + self.opts = self.master_minions[master_id]['minion'].opts + return master_id + + def _fire_master(self, data=None, tag=None, events=None, pretag=None): + # if you don't care, or the master you want we dont have, pick one at random + master = self._pick_master() + + self.master_minions[master]['minion']._fire_master(data=data, + tag=tag, + events=events, + pretag=pretag, + ) + + # Syndic Tune In + def tune_in(self): + ''' + Lock onto the publisher. This is the main event loop for the syndic + ''' + # Instantiate the local client + self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) + self.local.event.subscribe('') + self.local.opts['interface'] = self._syndic_interface + + signal.signal(signal.SIGTERM, self.clean_die) + log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id'])) + + # Share the poller with the event object + self.poller = self.local.event.poller + + # register the master_minions on this poller + for master_id, minion_map in self.master_minions.iteritems(): + if 'minion' not in minion_map: + continue + self.poller.register(minion_map['minion'].socket, zmq.POLLIN) + + + # Send an event to the master that the minion is live + self._fire_master( + 'Syndic {0} started at {1}'.format( + self.opts['id'], + time.asctime() + ), + 'syndic_start' + ) + self._fire_master( + 'Syndic {0} started at {1}'.format( + self.opts['id'], + time.asctime() + ), + tagify([self.opts['id'], 'start'], 'syndic'), + ) + + # Make sure to gracefully handle SIGUSR1 + enable_sigusr1_handler() + + loop_interval = int(self.opts['loop_interval']) + self._reset_event_aggregation() + while True: + try: + # Do all the maths in seconds + timeout = loop_interval + if self.event_forward_timeout is not None: + timeout = min(timeout, + self.event_forward_timeout - time.time()) + if timeout >= 0: + log.trace('Polling timeout: %f', timeout) + socks = dict(self.poller.poll(timeout * 1000)) + else: + # This shouldn't really happen. + # But there's no harm being defensive + log.warning('Negative timeout in syndic main loop') + socks = {} + # check all of your master_minions + for master_id, minion_map in self.master_minions.iteritems(): + if 'minion' not in minion_map: + if not self._connect_to_master(master_id): + continue + self.poller.register(minion_map['minion'].socket, zmq.POLLIN) + # check if you had events + if socks.get(minion_map['minion'].socket) == zmq.POLLIN: + self._process_cmd_socket(minion_map['minion']) + + if socks.get(self.local.event.sub) == zmq.POLLIN: + self._process_event_socket() + + if (self.event_forward_timeout is not None and + self.event_forward_timeout < time.time()): + self._forward_events() + # We don't handle ZMQErrors like the other minions + # I've put explicit handling around the recieve calls + # in the process_*_socket methods. If we see any other + # errors they may need some kind of handling so log them + # for now. + except Exception: + log.critical( + 'An exception occurred while polling the syndic', + exc_info=True + ) + + def _process_cmd_socket(self, minion): + ''' + process a pub job that a minion recieved + ''' + try: + messages = minion.socket.recv_multipart(zmq.NOBLOCK) + messages_len = len(messages) + idx = None + if messages_len == 1: + idx = 0 + elif messages_len == 2: + idx = 1 + else: + raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len)) + + payload = minion.serial.loads(messages[idx]) + except zmq.ZMQError as e: + # Swallow errors for bad wakeups or signals needing processing + if e.errno != errno.EAGAIN and e.errno != errno.EINTR: + raise + log.trace('Handling payload') + # TODO: remove this Terrible hack... + self.crypticle = minion.crypticle + self._handle_payload(payload) + + def _reset_event_aggregation(self): + self.jids = {} + self.raw_events = [] + self.event_forward_timeout = None + + def _process_event_socket(self): + tout = time.time() + self.opts['syndic_max_event_process_time'] + while tout > time.time(): + try: + event = self.local.event.get_event_noblock() + except zmq.ZMQError as e: + # EAGAIN indicates no more events at the moment + # EINTR some kind of signal maybe someone trying + # to get us to quit so escape our timeout + if e.errno == errno.EAGAIN or e.errno == errno.EINTR: + break + raise + log.trace('Got event {0}'.format(event['tag'])) + if self.event_forward_timeout is None: + self.event_forward_timeout = ( + time.time() + self.opts['syndic_event_forward_timeout'] + ) + if salt.utils.is_jid(event['tag']) and 'return' in event['data']: + if 'jid' not in event['data']: + # Not a job return + continue + jdict = self.jids.setdefault(event['tag'], {}) + if not jdict: + jdict['__fun__'] = event['data'].get('fun') + jdict['__jid__'] = event['data']['jid'] + jdict['__load__'] = {} + fstr = '{0}.get_jid'.format(self.opts['master_job_cache']) + jdict['__load__'].update( + self.mminion.returners[fstr](event['data']['jid']) + ) + if 'master_id' in event['data']: + # __'s to make sure it doesn't print out on the master cli + jdict['__master_id__'] = event['data']['master_id'] + jdict[event['data']['id']] = event['data']['return'] + else: + # Add generic event aggregation here + if 'retcode' not in event['data']: + self.raw_events.append(event) + + def _forward_events(self): + log.trace('Forwarding events') + if self.raw_events: + self._fire_master(events=self.raw_events, + pretag=tagify(self.opts['id'], base='syndic'), + ) + for jid, jid_ret in self.jids.iteritems(): + self._pick_master(jid_ret.get('__master_id__')) + self._return_pub(jid_ret, '_syndic_return') + self._reset_event_aggregation() + class Matcher(object): ''' From 6805d2b2cf85be73505dc6882d2196d3a71614d9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 31 Jul 2014 21:29:47 -0700 Subject: [PATCH 03/11] Handle failures of upstream masters --- salt/minion.py | 72 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 1223eef0cf..9fb137e370 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2025,7 +2025,10 @@ class MultiSyndic(Syndic): Make a Syndic minion, this minion will handle relaying jobs and returns from all minions connected to it to the list of masters it is connected to. - Note: jobs will be returned best-effort to the requesting master + Note: jobs will be returned best-effort to the requesting master. This also means + (since we are using zmq) that if a job was fired and the master disconnects + between the publish and return, that the return will end up in a zmq buffer + in this Syndic headed to that original master ''' # timeout for one of the connections to an upstream master MASTER_MINION_CONNECT_TIMEOUT = 5 @@ -2064,7 +2067,7 @@ class MultiSyndic(Syndic): t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False) # TODO: make less terrible... - # connect the minion to the master + # connect the minion to the master, since the minion class doesn't do it already t_minion.socket = self.context.socket(zmq.SUB) t_minion._set_reconnect_ivl() t_minion._setsockopts() @@ -2083,22 +2086,43 @@ class MultiSyndic(Syndic): Pick a master (based on the master_id if you have it) and change the opts to match that master (MAJOR HACK) ''' - if master_id not in self.master_minions: - # TODO: make random - master_id = self.master_minions.keys()[0] - self.opts = self.master_minions[master_id]['minion'].opts - return master_id + + def iter_master_options(self, master_id=None): + ''' + Iterate (in order) over your options for master + ''' + masters = set(self.master_minions.keys()) + if master_id not in self.master_minions: + master_id = masters.pop() + else: + masters.remove(master_id) + + while True: + self._pick_master(master_id) + yield master_id + if len(masters) == 0: + break + master_id = masters.pop() + def _fire_master(self, data=None, tag=None, events=None, pretag=None): - # if you don't care, or the master you want we dont have, pick one at random - master = self._pick_master() - self.master_minions[master]['minion']._fire_master(data=data, - tag=tag, - events=events, - pretag=pretag, - ) + # if you don't care, or the master you want we dont have, pick one at random + for master in self.iter_master_options(): + try: + self.master_minions[master]['minion']._fire_master(data=data, + tag=tag, + events=events, + pretag=pretag, + ) + return + # unable to return, ususally because the master is down + except SaltClientError: + log.error('Unable to fire event to {0}, trying another...'.format(master)) + + log.critical('Unable to fire event on ANY master') + # Syndic Tune In def tune_in(self): @@ -2167,7 +2191,7 @@ class MultiSyndic(Syndic): self.poller.register(minion_map['minion'].socket, zmq.POLLIN) # check if you had events if socks.get(minion_map['minion'].socket) == zmq.POLLIN: - self._process_cmd_socket(minion_map['minion']) + self._process_cmd_socket(master_id, minion_map['minion']) if socks.get(self.local.event.sub) == zmq.POLLIN: self._process_event_socket() @@ -2186,7 +2210,7 @@ class MultiSyndic(Syndic): exc_info=True ) - def _process_cmd_socket(self, minion): + def _process_cmd_socket(self, master_id, minion): ''' process a pub job that a minion recieved ''' @@ -2209,13 +2233,10 @@ class MultiSyndic(Syndic): log.trace('Handling payload') # TODO: remove this Terrible hack... self.crypticle = minion.crypticle + # Terrible hack: We have to set the opts so all the class funcs work + self._pick_master(master_id) self._handle_payload(payload) - def _reset_event_aggregation(self): - self.jids = {} - self.raw_events = [] - self.event_forward_timeout = None - def _process_event_socket(self): tout = time.time() + self.opts['syndic_max_event_process_time'] while tout > time.time(): @@ -2262,8 +2283,13 @@ class MultiSyndic(Syndic): pretag=tagify(self.opts['id'], base='syndic'), ) for jid, jid_ret in self.jids.iteritems(): - self._pick_master(jid_ret.get('__master_id__')) - self._return_pub(jid_ret, '_syndic_return') + for master in self.iter_master_options(jid_ret.get('__master_id__')): + try: + self._return_pub(jid_ret, '_syndic_return') + break + except SaltClientError: + log.error('Unable to return to {0}, trying another...'.format(master)) + continue self._reset_event_aggregation() From d9d45dd0b8945b8a5db7b452d836ee5ac57813da Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 31 Jul 2014 21:31:08 -0700 Subject: [PATCH 04/11] Update notes --- salt/minion.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/salt/minion.py b/salt/minion.py index 9fb137e370..db1a4342b7 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2028,7 +2028,12 @@ class MultiSyndic(Syndic): Note: jobs will be returned best-effort to the requesting master. This also means (since we are using zmq) that if a job was fired and the master disconnects between the publish and return, that the return will end up in a zmq buffer - in this Syndic headed to that original master + in this Syndic headed to that original master. + + In addition, since these classes all seem to use a mix of blocking and non-blocking + calls (with varying timeouts along the way) this daemon does not handle failure well, + it will (under most circumstances) stall the daemon for ~60s attempting to re-auth + with the down master ''' # timeout for one of the connections to an upstream master MASTER_MINION_CONNECT_TIMEOUT = 5 From 72a12874939c17efc8368ce8c6b8ecfc42d96ea4 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 31 Jul 2014 21:31:35 -0700 Subject: [PATCH 05/11] pep8 --- salt/minion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/salt/minion.py b/salt/minion.py index db1a4342b7..51ccba223f 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2020,6 +2020,7 @@ class Syndic(Minion): if hasattr(self, 'local'): del self.local + class MultiSyndic(Syndic): ''' Make a Syndic minion, this minion will handle relaying jobs and returns from From c8961fe7921b1cc9d9fcb990540277cf3b67703d Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 12:41:31 -0700 Subject: [PATCH 06/11] Major re-work to look more like MultiMinion --- salt/minion.py | 215 +++++++++++++++++++++++-------------------------- 1 file changed, 100 insertions(+), 115 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 51ccba223f..a8b2406bc7 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1853,25 +1853,7 @@ class Syndic(Minion): data['to'], **kwargs) - # Syndic Tune In - def tune_in(self): - ''' - Lock onto the publisher. This is the main event loop for the syndic - ''' - # Instantiate the local client - self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) - self.local.event.subscribe('') - self.local.opts['interface'] = self._syndic_interface - - signal.signal(signal.SIGTERM, self.clean_die) - log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id'])) - - self.context = zmq.Context() - - # Start with the publish socket - # Share the poller with the event object - self.poller = self.local.event.poller - self.socket = self.context.socket(zmq.SUB) + def _setsockopts(self): # no filters for syndication masters, unless we want to maintain a # list of all connected minions and update the filter self.socket.setsockopt(zmq.SUBSCRIBE, '') @@ -1880,8 +1862,7 @@ class Syndic(Minion): self._set_reconnect_ivl_max() self._set_tcp_keepalive() - self.socket.connect(self.master_pub) - self.poller.register(self.socket, zmq.POLLIN) + def _fire_master_syndic_start(self): # Send an event to the master that the minion is live self._fire_master( 'Syndic {0} started at {1}'.format( @@ -1898,6 +1879,69 @@ class Syndic(Minion): tagify([self.opts['id'], 'start'], 'syndic'), ) + def tune_in_no_block(self): + ''' + Executes the tune_in sequence but omits extra logging and the + management of the event bus assuming that these are handled outside + the tune_in sequence + ''' + # Instantiate the local client + self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) + self.local.event.subscribe('') + + self._init_context_and_poller() + + self.socket = self.context.socket(zmq.SUB) + + self._setsockopts() + + self.socket.connect(self.master_pub) + self.poller.register(self.socket, zmq.POLLIN) + + loop_interval = int(self.opts['loop_interval']) + + self._fire_master_syndic_start() + + while True: + try: + socks = dict(self.poller.poll(loop_interval * 1000)) + if socks.get(self.socket) == zmq.POLLIN: + self._process_cmd_socket() + except zmq.ZMQError: + yield True + except Exception: + log.critical( + 'An exception occurred while polling the minion', + exc_info=True + ) + yield True + + # Syndic Tune In + def tune_in(self): + ''' + Lock onto the publisher. This is the main event loop for the syndic + ''' + # Instantiate the local client + self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) + self.local.event.subscribe('') + self.local.opts['interface'] = self._syndic_interface + + signal.signal(signal.SIGTERM, self.clean_die) + log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id'])) + + self._init_context_and_poller() + + # Start with the publish socket + # Share the poller with the event object + self.socket = self.context.socket(zmq.SUB) + + self._setsockopts() + + self.socket.connect(self.master_pub) + self.poller.register(self.socket, zmq.POLLIN) + # Send an event to the master that the minion is live + self._fire_master_syndic_start() + # Make sure to gracefully handle SIGUSR1 enable_sigusr1_handler() @@ -2021,9 +2065,9 @@ class Syndic(Minion): del self.local -class MultiSyndic(Syndic): +class MultiSyndic(MinionBase): ''' - Make a Syndic minion, this minion will handle relaying jobs and returns from + Make a MultiSyndic minion, this minion will handle relaying jobs and returns from all minions connected to it to the list of masters it is connected to. Note: jobs will be returned best-effort to the requesting master. This also means @@ -2036,76 +2080,61 @@ class MultiSyndic(Syndic): it will (under most circumstances) stall the daemon for ~60s attempting to re-auth with the down master ''' - # timeout for one of the connections to an upstream master - MASTER_MINION_CONNECT_TIMEOUT = 5 def __init__(self, opts): opts['loop_interval'] = 1 - # keep a copy, since we'll be messing with it a bit - self.raw_opts = copy.copy(opts) super(MultiSyndic, self).__init__(opts) + self.mminion = salt.minion.MasterMinion(opts) - self.context = zmq.Context() - - self.master_minions = {} - for master in set(self.raw_opts['master']): - s_opts = copy.copy(self.raw_opts) + # create all of the syndics you need + self.master_syndics = {} + for master in set(self.opts['master']): + s_opts = copy.copy(self.opts) s_opts['master'] = master - self.master_minions[master] = {'opts': s_opts, + self.master_syndics[master] = {'opts': s_opts, 'auth_wait': s_opts['acceptance_wait_time']} self._connect_to_master(master) + # TODO: do we need all of this? def _connect_to_master(self, master): ''' Attempt to connect to master, including back-off for each one return boolean of wether you connected or not ''' - if master not in self.master_minions: + if master not in self.master_syndics: log.error('Unable to connect to {0}, not in the list of masters'.format(master)) return False - minion = self.master_minions[master] + minion = self.master_syndics[master] if time.time() - minion['auth_wait'] > minion.get('last', 0): minion['last'] = time.time() if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: minion['auth_wait'] += auth_wait try: - t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False) + t_minion = Syndic(minion['opts']) - # TODO: make less terrible... - # connect the minion to the master, since the minion class doesn't do it already - t_minion.socket = self.context.socket(zmq.SUB) - t_minion._set_reconnect_ivl() - t_minion._setsockopts() - t_minion.socket.connect(t_minion.master_pub) - self.master_minions[master]['minion'] = t_minion - self.master_minions[master]['auth_wait'] = self.opts['acceptance_wait_time'] + self.master_syndics[master]['syndic'] = t_minion + self.master_syndics[master]['generator'] = t_minion.tune_in_no_block() + self.master_syndics[master]['auth_wait'] = self.opts['acceptance_wait_time'] + return True except SaltClientError: log.error('Error while bring up minion for multi-master. Is master {0} responding?'.format(master)) return False - def _pick_master(self, master_id=None): - ''' - Pick a master (based on the master_id if you have it) and change the opts - to match that master (MAJOR HACK) - ''' - self.opts = self.master_minions[master_id]['minion'].opts - def iter_master_options(self, master_id=None): ''' Iterate (in order) over your options for master ''' - masters = set(self.master_minions.keys()) - if master_id not in self.master_minions: + masters = set(self.master_syndics.keys()) + if master_id not in self.master_syndics: master_id = masters.pop() else: masters.remove(master_id) while True: - self._pick_master(master_id) yield master_id if len(masters) == 0: break @@ -2117,7 +2146,7 @@ class MultiSyndic(Syndic): # if you don't care, or the master you want we dont have, pick one at random for master in self.iter_master_options(): try: - self.master_minions[master]['minion']._fire_master(data=data, + self.master_syndics[master]['syndic']._fire_master(data=data, tag=tag, events=events, pretag=pretag, @@ -2129,6 +2158,10 @@ class MultiSyndic(Syndic): log.critical('Unable to fire event on ANY master') + def _reset_event_aggregation(self): + self.jids = {} + self.raw_events = [] + self.event_forward_timeout = None # Syndic Tune In def tune_in(self): @@ -2138,37 +2171,12 @@ class MultiSyndic(Syndic): # Instantiate the local client self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) self.local.event.subscribe('') - self.local.opts['interface'] = self._syndic_interface - signal.signal(signal.SIGTERM, self.clean_die) log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id'])) # Share the poller with the event object self.poller = self.local.event.poller - # register the master_minions on this poller - for master_id, minion_map in self.master_minions.iteritems(): - if 'minion' not in minion_map: - continue - self.poller.register(minion_map['minion'].socket, zmq.POLLIN) - - - # Send an event to the master that the minion is live - self._fire_master( - 'Syndic {0} started at {1}'.format( - self.opts['id'], - time.asctime() - ), - 'syndic_start' - ) - self._fire_master( - 'Syndic {0} started at {1}'.format( - self.opts['id'], - time.asctime() - ), - tagify([self.opts['id'], 'start'], 'syndic'), - ) - # Make sure to gracefully handle SIGUSR1 enable_sigusr1_handler() @@ -2189,16 +2197,19 @@ class MultiSyndic(Syndic): # But there's no harm being defensive log.warning('Negative timeout in syndic main loop') socks = {} - # check all of your master_minions - for master_id, minion_map in self.master_minions.iteritems(): - if 'minion' not in minion_map: + print ('foo') + # check all of your master_syndics, have them do their thing + for master_id, minion_map in self.master_syndics.iteritems(): + # if not connected, lets try + if 'generator' not in minion_map: + # if we couldn't connect, lets try later if not self._connect_to_master(master_id): + print ('not connected to', master_id) continue - self.poller.register(minion_map['minion'].socket, zmq.POLLIN) - # check if you had events - if socks.get(minion_map['minion'].socket) == zmq.POLLIN: - self._process_cmd_socket(master_id, minion_map['minion']) + print ('some stuff?') + minion_map['generator'].next() + # events if socks.get(self.local.event.sub) == zmq.POLLIN: self._process_event_socket() @@ -2216,33 +2227,6 @@ class MultiSyndic(Syndic): exc_info=True ) - def _process_cmd_socket(self, master_id, minion): - ''' - process a pub job that a minion recieved - ''' - try: - messages = minion.socket.recv_multipart(zmq.NOBLOCK) - messages_len = len(messages) - idx = None - if messages_len == 1: - idx = 0 - elif messages_len == 2: - idx = 1 - else: - raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len)) - - payload = minion.serial.loads(messages[idx]) - except zmq.ZMQError as e: - # Swallow errors for bad wakeups or signals needing processing - if e.errno != errno.EAGAIN and e.errno != errno.EINTR: - raise - log.trace('Handling payload') - # TODO: remove this Terrible hack... - self.crypticle = minion.crypticle - # Terrible hack: We have to set the opts so all the class funcs work - self._pick_master(master_id) - self._handle_payload(payload) - def _process_event_socket(self): tout = time.time() + self.opts['syndic_max_event_process_time'] while tout > time.time(): @@ -2288,10 +2272,11 @@ class MultiSyndic(Syndic): self._fire_master(events=self.raw_events, pretag=tagify(self.opts['id'], base='syndic'), ) + print (self.jids) for jid, jid_ret in self.jids.iteritems(): for master in self.iter_master_options(jid_ret.get('__master_id__')): try: - self._return_pub(jid_ret, '_syndic_return') + self.master_syndics[master]['syndic']._return_pub(jid_ret, '_syndic_return') break except SaltClientError: log.error('Unable to return to {0}, trying another...'.format(master)) From ccca1f1bb55246cf9f14b73a8303dffef58dcded Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 13:04:20 -0700 Subject: [PATCH 07/11] Add hard coded timeout for syndic being dead --- salt/minion.py | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index a8b2406bc7..928c1031dc 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2091,7 +2091,8 @@ class MultiSyndic(MinionBase): s_opts = copy.copy(self.opts) s_opts['master'] = master self.master_syndics[master] = {'opts': s_opts, - 'auth_wait': s_opts['acceptance_wait_time']} + 'auth_wait': s_opts['acceptance_wait_time'], + 'dead_until': 0} self._connect_to_master(master) # TODO: do we need all of this? @@ -2135,7 +2136,7 @@ class MultiSyndic(MinionBase): masters.remove(master_id) while True: - yield master_id + yield master_id, self.master_syndics[master_id] if len(masters) == 0: break master_id = masters.pop() @@ -2144,17 +2145,22 @@ class MultiSyndic(MinionBase): def _fire_master(self, data=None, tag=None, events=None, pretag=None): # if you don't care, or the master you want we dont have, pick one at random - for master in self.iter_master_options(): + for master, syndic_dict in self.iter_master_options(): + if syndic_dict['dead_until'] > time.time(): + log.error('Unable to fire event to {0}, that syndic is dead for now'.format(master)) + continue try: - self.master_syndics[master]['syndic']._fire_master(data=data, - tag=tag, - events=events, - pretag=pretag, - ) + syndic_dict['syndic']._fire_master(data=data, + tag=tag, + events=events, + pretag=pretag, + ) return # unable to return, ususally because the master is down except SaltClientError: log.error('Unable to fire event to {0}, trying another...'.format(master)) + # TODO: configurable timeout??? + syndic_dict['dead_until'] = time.time() + 60 log.critical('Unable to fire event on ANY master') @@ -2197,7 +2203,6 @@ class MultiSyndic(MinionBase): # But there's no harm being defensive log.warning('Negative timeout in syndic main loop') socks = {} - print ('foo') # check all of your master_syndics, have them do their thing for master_id, minion_map in self.master_syndics.iteritems(): # if not connected, lets try @@ -2206,7 +2211,6 @@ class MultiSyndic(MinionBase): if not self._connect_to_master(master_id): print ('not connected to', master_id) continue - print ('some stuff?') minion_map['generator'].next() # events @@ -2272,14 +2276,18 @@ class MultiSyndic(MinionBase): self._fire_master(events=self.raw_events, pretag=tagify(self.opts['id'], base='syndic'), ) - print (self.jids) for jid, jid_ret in self.jids.iteritems(): - for master in self.iter_master_options(jid_ret.get('__master_id__')): + for master, syndic_dict in self.iter_master_options(jid_ret.get('__master_id__')): + if syndic_dict['dead_until'] > time.time(): + log.error('Unable to return to {0}, that syndic is dead for now'.format(master)) + continue try: - self.master_syndics[master]['syndic']._return_pub(jid_ret, '_syndic_return') + syndic_dict['syndic']._return_pub(jid_ret, '_syndic_return') break except SaltClientError: log.error('Unable to return to {0}, trying another...'.format(master)) + # TODO: configurable timeout??? + syndic_dict['dead_until'] = time.time() + 60 continue self._reset_event_aggregation() From 081708e3d661f72d24c984102a201c1ab20191f9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 13:27:30 -0700 Subject: [PATCH 08/11] More code cleanup, wrap all the calls to syndics in _call_syndic --- salt/minion.py | 86 +++++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 928c1031dc..885651df4d 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1784,11 +1784,11 @@ class Syndic(Minion): Make a Syndic minion, this minion will use the minion keys on the master to authenticate with a higher level master. ''' - def __init__(self, opts): + def __init__(self, opts, **kwargs): self._syndic_interface = opts.get('interface') self._syndic = True opts['loop_interval'] = 1 - super(Syndic, self).__init__(opts) + super(Syndic, self).__init__(opts, **kwargs) self.mminion = salt.minion.MasterMinion(opts) def _handle_aes(self, load, sig=None): @@ -2080,6 +2080,8 @@ class MultiSyndic(MinionBase): it will (under most circumstances) stall the daemon for ~60s attempting to re-auth with the down master ''' + # time to connect to upstream master + SYNDIC_CONNECT_TIMEOUT = 5 def __init__(self, opts): opts['loop_interval'] = 1 super(MultiSyndic, self).__init__(opts) @@ -2112,7 +2114,10 @@ class MultiSyndic(MinionBase): if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: minion['auth_wait'] += auth_wait try: - t_minion = Syndic(minion['opts']) + t_minion = Syndic(minion['opts'], + timeout=self.SYNDIC_CONNECT_TIMEOUT, + safe=False, + ) self.master_syndics[master]['syndic'] = t_minion @@ -2122,16 +2127,38 @@ class MultiSyndic(MinionBase): return True except SaltClientError: - log.error('Error while bring up minion for multi-master. Is master {0} responding?'.format(master)) + log.error('Error while bring up minion for multi-syndic. Is master {0} responding?'.format(master)) return False + def _call_syndic(self, func, args=(), kwargs={}, master_id=None): + ''' + Wrapper to call a given func on a syndic, best effort to get the one you asked for + ''' + for master, syndic_dict in self.iter_master_options(master_id): + if 'syndic' not in syndic_dict: + continue + if syndic_dict['dead_until'] > time.time(): + log.error('Unable to call {0} on {1}, that syndic is dead for now'.format(func, master_id)) + continue + try: + getattr(syndic_dict['syndic'], func)(*args, **kwargs) + return + except SaltClientError: + log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id)) + # TODO: configurable timeout??? + syndic_dict['dead_until'] = time.time() + 60 + continue + log.critical('Unable to call {0} on any masters!'.format(func)) + + def iter_master_options(self, master_id=None): ''' Iterate (in order) over your options for master ''' - masters = set(self.master_syndics.keys()) + masters = self.master_syndics.keys() + shuffle(masters) if master_id not in self.master_syndics: - master_id = masters.pop() + master_id = masters.pop(0) else: masters.remove(master_id) @@ -2139,30 +2166,7 @@ class MultiSyndic(MinionBase): yield master_id, self.master_syndics[master_id] if len(masters) == 0: break - master_id = masters.pop() - - - def _fire_master(self, data=None, tag=None, events=None, pretag=None): - - # if you don't care, or the master you want we dont have, pick one at random - for master, syndic_dict in self.iter_master_options(): - if syndic_dict['dead_until'] > time.time(): - log.error('Unable to fire event to {0}, that syndic is dead for now'.format(master)) - continue - try: - syndic_dict['syndic']._fire_master(data=data, - tag=tag, - events=events, - pretag=pretag, - ) - return - # unable to return, ususally because the master is down - except SaltClientError: - log.error('Unable to fire event to {0}, trying another...'.format(master)) - # TODO: configurable timeout??? - syndic_dict['dead_until'] = time.time() + 60 - - log.critical('Unable to fire event on ANY master') + master_id = masters.pop(0) def _reset_event_aggregation(self): self.jids = {} @@ -2178,7 +2182,7 @@ class MultiSyndic(MinionBase): self.local = salt.client.get_local_client(self.opts['_minion_conf_file']) self.local.event.subscribe('') - log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id'])) + log.debug('MultiSyndic {0!r} trying to tune in'.format(self.opts['id'])) # Share the poller with the event object self.poller = self.local.event.poller @@ -2209,7 +2213,6 @@ class MultiSyndic(MinionBase): if 'generator' not in minion_map: # if we couldn't connect, lets try later if not self._connect_to_master(master_id): - print ('not connected to', master_id) continue minion_map['generator'].next() @@ -2273,22 +2276,13 @@ class MultiSyndic(MinionBase): def _forward_events(self): log.trace('Forwarding events') if self.raw_events: - self._fire_master(events=self.raw_events, - pretag=tagify(self.opts['id'], base='syndic'), + self._call_syndic('_fire_master', + kwargs={'events': self.raw_events, + 'pretag': tagify(self.opts['id'], base='syndic')}, ) for jid, jid_ret in self.jids.iteritems(): - for master, syndic_dict in self.iter_master_options(jid_ret.get('__master_id__')): - if syndic_dict['dead_until'] > time.time(): - log.error('Unable to return to {0}, that syndic is dead for now'.format(master)) - continue - try: - syndic_dict['syndic']._return_pub(jid_ret, '_syndic_return') - break - except SaltClientError: - log.error('Unable to return to {0}, trying another...'.format(master)) - # TODO: configurable timeout??? - syndic_dict['dead_until'] = time.time() + 60 - continue + self._call_syndic('_return_pub', args=(jid_ret, '_syndic_return'), master_id=jid_ret.get('__master_id__')) + self._reset_event_aggregation() From 5754314d674776b4dd2c69b1b9bf0f109896b176 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 13:38:05 -0700 Subject: [PATCH 09/11] Make backoff use acceptance_wait_time --- salt/minion.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 885651df4d..72d848430e 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2109,6 +2109,10 @@ class MultiSyndic(MinionBase): return False minion = self.master_syndics[master] + # if we need to be dead for a while, stay that way + if minion['dead_until'] > time.time(): + return False + if time.time() - minion['auth_wait'] > minion.get('last', 0): minion['last'] = time.time() if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: @@ -2123,11 +2127,16 @@ class MultiSyndic(MinionBase): self.master_syndics[master]['syndic'] = t_minion self.master_syndics[master]['generator'] = t_minion.tune_in_no_block() self.master_syndics[master]['auth_wait'] = self.opts['acceptance_wait_time'] + self.master_syndics[master]['dead_until'] = 0 return True except SaltClientError: log.error('Error while bring up minion for multi-syndic. Is master {0} responding?'.format(master)) + # re-use auth-wait as backoff for syndic + minion['dead_until'] = time.time() + minion['auth_wait'] + if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: + minion['auth_wait'] += minion['auth_wait'] return False def _call_syndic(self, func, args=(), kwargs={}, master_id=None): @@ -2144,10 +2153,12 @@ class MultiSyndic(MinionBase): getattr(syndic_dict['syndic'], func)(*args, **kwargs) return except SaltClientError: - log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id)) - # TODO: configurable timeout??? - syndic_dict['dead_until'] = time.time() + 60 - continue + log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id)) + # re-use auth-wait as backoff for syndic + syndic_dict['dead_until'] = time.time() + syndic_dict['auth_wait'] + if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: + minion['auth_wait'] += minion['auth_wait'] + continue log.critical('Unable to call {0} on any masters!'.format(func)) @@ -2208,13 +2219,13 @@ class MultiSyndic(MinionBase): log.warning('Negative timeout in syndic main loop') socks = {} # check all of your master_syndics, have them do their thing - for master_id, minion_map in self.master_syndics.iteritems(): + for master_id, syndic_dict in self.master_syndics.iteritems(): # if not connected, lets try - if 'generator' not in minion_map: + if 'generator' not in syndic_dict: # if we couldn't connect, lets try later if not self._connect_to_master(master_id): continue - minion_map['generator'].next() + syndic_dict['generator'].next() # events if socks.get(self.local.event.sub) == zmq.POLLIN: From abc9985e50f5011abc5e138b4230cb0bb69f5a01 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 16:15:36 -0700 Subject: [PATCH 10/11] Pep8 cleanup --- salt/minion.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 72d848430e..5a26c76204 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2082,6 +2082,7 @@ class MultiSyndic(MinionBase): ''' # time to connect to upstream master SYNDIC_CONNECT_TIMEOUT = 5 + def __init__(self, opts): opts['loop_interval'] = 1 super(MultiSyndic, self).__init__(opts) @@ -2114,35 +2115,32 @@ class MultiSyndic(MinionBase): return False if time.time() - minion['auth_wait'] > minion.get('last', 0): - minion['last'] = time.time() - if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: - minion['auth_wait'] += auth_wait try: t_minion = Syndic(minion['opts'], timeout=self.SYNDIC_CONNECT_TIMEOUT, safe=False, ) - self.master_syndics[master]['syndic'] = t_minion self.master_syndics[master]['generator'] = t_minion.tune_in_no_block() self.master_syndics[master]['auth_wait'] = self.opts['acceptance_wait_time'] self.master_syndics[master]['dead_until'] = 0 - return True except SaltClientError: log.error('Error while bring up minion for multi-syndic. Is master {0} responding?'.format(master)) # re-use auth-wait as backoff for syndic minion['dead_until'] = time.time() + minion['auth_wait'] if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: - minion['auth_wait'] += minion['auth_wait'] + minion['auth_wait'] += self.opts['acceptance_wait_time'] return False - def _call_syndic(self, func, args=(), kwargs={}, master_id=None): + def _call_syndic(self, func, args=(), kwargs=None, master_id=None): ''' Wrapper to call a given func on a syndic, best effort to get the one you asked for ''' + if kwargs is None: + kwargs = {} for master, syndic_dict in self.iter_master_options(master_id): if 'syndic' not in syndic_dict: continue @@ -2156,12 +2154,11 @@ class MultiSyndic(MinionBase): log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id)) # re-use auth-wait as backoff for syndic syndic_dict['dead_until'] = time.time() + syndic_dict['auth_wait'] - if minion['auth_wait'] < self.opts['acceptance_wait_time_max']: - minion['auth_wait'] += minion['auth_wait'] + if syndic_dict['auth_wait'] < self.opts['acceptance_wait_time_max']: + syndic_dict['auth_wait'] += self.opts['acceptance_wait_time'] continue log.critical('Unable to call {0} on any masters!'.format(func)) - def iter_master_options(self, master_id=None): ''' Iterate (in order) over your options for master From 0ce353ddfbdda028ffaab6af315ba8a0d9ca1872 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 1 Aug 2014 16:38:22 -0700 Subject: [PATCH 11/11] Add master_id to master docs --- doc/ref/configuration/master.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/ref/configuration/master.rst b/doc/ref/configuration/master.rst index 10dae3b51a..5e461e3530 100644 --- a/doc/ref/configuration/master.rst +++ b/doc/ref/configuration/master.rst @@ -61,6 +61,20 @@ The network port to set up the publication interface publish_port: 4505 +.. conf_master:: master_id + +``master_id`` +---------------- + +Default: ``None`` + +The id to be passed in the publish job to minions. This is used for MultiSyndics +to return the job to the requesting master. Note, this must be the same string +as the syndic is configured with. + +.. code-block:: yaml + + master_id: MasterOfMaster .. conf_master:: user