diff --git a/salt/minion.py b/salt/minion.py index 1223eef0cf..9fb137e370 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2025,7 +2025,10 @@ class MultiSyndic(Syndic): Make a Syndic minion, this minion will handle relaying jobs and returns from all minions connected to it to the list of masters it is connected to. - Note: jobs will be returned best-effort to the requesting master + Note: jobs will be returned best-effort to the requesting master. This also means + (since we are using zmq) that if a job was fired and the master disconnects + between the publish and return, that the return will end up in a zmq buffer + in this Syndic headed to that original master ''' # timeout for one of the connections to an upstream master MASTER_MINION_CONNECT_TIMEOUT = 5 @@ -2064,7 +2067,7 @@ class MultiSyndic(Syndic): t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False) # TODO: make less terrible... - # connect the minion to the master + # connect the minion to the master, since the minion class doesn't do it already t_minion.socket = self.context.socket(zmq.SUB) t_minion._set_reconnect_ivl() t_minion._setsockopts() @@ -2083,22 +2086,43 @@ class MultiSyndic(Syndic): Pick a master (based on the master_id if you have it) and change the opts to match that master (MAJOR HACK) ''' - if master_id not in self.master_minions: - # TODO: make random - master_id = self.master_minions.keys()[0] - self.opts = self.master_minions[master_id]['minion'].opts - return master_id + + def iter_master_options(self, master_id=None): + ''' + Iterate (in order) over your options for master + ''' + masters = set(self.master_minions.keys()) + if master_id not in self.master_minions: + master_id = masters.pop() + else: + masters.remove(master_id) + + while True: + self._pick_master(master_id) + yield master_id + if len(masters) == 0: + break + master_id = masters.pop() + def _fire_master(self, data=None, tag=None, events=None, pretag=None): - # if you don't care, or the master you want we dont have, pick one at random - master = self._pick_master() - self.master_minions[master]['minion']._fire_master(data=data, - tag=tag, - events=events, - pretag=pretag, - ) + # if you don't care, or the master you want we dont have, pick one at random + for master in self.iter_master_options(): + try: + self.master_minions[master]['minion']._fire_master(data=data, + tag=tag, + events=events, + pretag=pretag, + ) + return + # unable to return, ususally because the master is down + except SaltClientError: + log.error('Unable to fire event to {0}, trying another...'.format(master)) + + log.critical('Unable to fire event on ANY master') + # Syndic Tune In def tune_in(self): @@ -2167,7 +2191,7 @@ class MultiSyndic(Syndic): self.poller.register(minion_map['minion'].socket, zmq.POLLIN) # check if you had events if socks.get(minion_map['minion'].socket) == zmq.POLLIN: - self._process_cmd_socket(minion_map['minion']) + self._process_cmd_socket(master_id, minion_map['minion']) if socks.get(self.local.event.sub) == zmq.POLLIN: self._process_event_socket() @@ -2186,7 +2210,7 @@ class MultiSyndic(Syndic): exc_info=True ) - def _process_cmd_socket(self, minion): + def _process_cmd_socket(self, master_id, minion): ''' process a pub job that a minion recieved ''' @@ -2209,13 +2233,10 @@ class MultiSyndic(Syndic): log.trace('Handling payload') # TODO: remove this Terrible hack... self.crypticle = minion.crypticle + # Terrible hack: We have to set the opts so all the class funcs work + self._pick_master(master_id) self._handle_payload(payload) - def _reset_event_aggregation(self): - self.jids = {} - self.raw_events = [] - self.event_forward_timeout = None - def _process_event_socket(self): tout = time.time() + self.opts['syndic_max_event_process_time'] while tout > time.time(): @@ -2262,8 +2283,13 @@ class MultiSyndic(Syndic): pretag=tagify(self.opts['id'], base='syndic'), ) for jid, jid_ret in self.jids.iteritems(): - self._pick_master(jid_ret.get('__master_id__')) - self._return_pub(jid_ret, '_syndic_return') + for master in self.iter_master_options(jid_ret.get('__master_id__')): + try: + self._return_pub(jid_ret, '_syndic_return') + break + except SaltClientError: + log.error('Unable to return to {0}, trying another...'.format(master)) + continue self._reset_event_aggregation()