Handle failures of upstream masters

This commit is contained in:
Thomas Jackson 2014-07-31 21:29:47 -07:00
parent f226809cd5
commit 6805d2b2cf

View File

@ -2025,7 +2025,10 @@ class MultiSyndic(Syndic):
Make a Syndic minion, this minion will handle relaying jobs and returns from
all minions connected to it to the list of masters it is connected to.
Note: jobs will be returned best-effort to the requesting master
Note: jobs will be returned best-effort to the requesting master. This also means
(since we are using zmq) that if a job was fired and the master disconnects
between the publish and return, that the return will end up in a zmq buffer
in this Syndic headed to that original master
'''
# timeout for one of the connections to an upstream master
MASTER_MINION_CONNECT_TIMEOUT = 5
@ -2064,7 +2067,7 @@ class MultiSyndic(Syndic):
t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False)
# TODO: make less terrible...
# connect the minion to the master
# connect the minion to the master, since the minion class doesn't do it already
t_minion.socket = self.context.socket(zmq.SUB)
t_minion._set_reconnect_ivl()
t_minion._setsockopts()
@ -2083,22 +2086,43 @@ class MultiSyndic(Syndic):
Pick a master (based on the master_id if you have it) and change the opts
to match that master (MAJOR HACK)
'''
if master_id not in self.master_minions:
# TODO: make random
master_id = self.master_minions.keys()[0]
self.opts = self.master_minions[master_id]['minion'].opts
return master_id
def iter_master_options(self, master_id=None):
'''
Iterate (in order) over your options for master
'''
masters = set(self.master_minions.keys())
if master_id not in self.master_minions:
master_id = masters.pop()
else:
masters.remove(master_id)
while True:
self._pick_master(master_id)
yield master_id
if len(masters) == 0:
break
master_id = masters.pop()
def _fire_master(self, data=None, tag=None, events=None, pretag=None):
# if you don't care, or the master you want we dont have, pick one at random
master = self._pick_master()
self.master_minions[master]['minion']._fire_master(data=data,
tag=tag,
events=events,
pretag=pretag,
)
# if you don't care, or the master you want we dont have, pick one at random
for master in self.iter_master_options():
try:
self.master_minions[master]['minion']._fire_master(data=data,
tag=tag,
events=events,
pretag=pretag,
)
return
# unable to return, ususally because the master is down
except SaltClientError:
log.error('Unable to fire event to {0}, trying another...'.format(master))
log.critical('Unable to fire event on ANY master')
# Syndic Tune In
def tune_in(self):
@ -2167,7 +2191,7 @@ class MultiSyndic(Syndic):
self.poller.register(minion_map['minion'].socket, zmq.POLLIN)
# check if you had events
if socks.get(minion_map['minion'].socket) == zmq.POLLIN:
self._process_cmd_socket(minion_map['minion'])
self._process_cmd_socket(master_id, minion_map['minion'])
if socks.get(self.local.event.sub) == zmq.POLLIN:
self._process_event_socket()
@ -2186,7 +2210,7 @@ class MultiSyndic(Syndic):
exc_info=True
)
def _process_cmd_socket(self, minion):
def _process_cmd_socket(self, master_id, minion):
'''
process a pub job that a minion recieved
'''
@ -2209,13 +2233,10 @@ class MultiSyndic(Syndic):
log.trace('Handling payload')
# TODO: remove this Terrible hack...
self.crypticle = minion.crypticle
# Terrible hack: We have to set the opts so all the class funcs work
self._pick_master(master_id)
self._handle_payload(payload)
def _reset_event_aggregation(self):
self.jids = {}
self.raw_events = []
self.event_forward_timeout = None
def _process_event_socket(self):
tout = time.time() + self.opts['syndic_max_event_process_time']
while tout > time.time():
@ -2262,8 +2283,13 @@ class MultiSyndic(Syndic):
pretag=tagify(self.opts['id'], base='syndic'),
)
for jid, jid_ret in self.jids.iteritems():
self._pick_master(jid_ret.get('__master_id__'))
self._return_pub(jid_ret, '_syndic_return')
for master in self.iter_master_options(jid_ret.get('__master_id__')):
try:
self._return_pub(jid_ret, '_syndic_return')
break
except SaltClientError:
log.error('Unable to return to {0}, trying another...'.format(master))
continue
self._reset_event_aggregation()