mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 17:33:54 +00:00
More code cleanup, wrap all the calls to syndics in _call_syndic
This commit is contained in:
parent
ccca1f1bb5
commit
081708e3d6
@ -1784,11 +1784,11 @@ class Syndic(Minion):
|
||||
Make a Syndic minion, this minion will use the minion keys on the
|
||||
master to authenticate with a higher level master.
|
||||
'''
|
||||
def __init__(self, opts):
|
||||
def __init__(self, opts, **kwargs):
|
||||
self._syndic_interface = opts.get('interface')
|
||||
self._syndic = True
|
||||
opts['loop_interval'] = 1
|
||||
super(Syndic, self).__init__(opts)
|
||||
super(Syndic, self).__init__(opts, **kwargs)
|
||||
self.mminion = salt.minion.MasterMinion(opts)
|
||||
|
||||
def _handle_aes(self, load, sig=None):
|
||||
@ -2080,6 +2080,8 @@ class MultiSyndic(MinionBase):
|
||||
it will (under most circumstances) stall the daemon for ~60s attempting to re-auth
|
||||
with the down master
|
||||
'''
|
||||
# time to connect to upstream master
|
||||
SYNDIC_CONNECT_TIMEOUT = 5
|
||||
def __init__(self, opts):
|
||||
opts['loop_interval'] = 1
|
||||
super(MultiSyndic, self).__init__(opts)
|
||||
@ -2112,7 +2114,10 @@ class MultiSyndic(MinionBase):
|
||||
if minion['auth_wait'] < self.opts['acceptance_wait_time_max']:
|
||||
minion['auth_wait'] += auth_wait
|
||||
try:
|
||||
t_minion = Syndic(minion['opts'])
|
||||
t_minion = Syndic(minion['opts'],
|
||||
timeout=self.SYNDIC_CONNECT_TIMEOUT,
|
||||
safe=False,
|
||||
)
|
||||
|
||||
|
||||
self.master_syndics[master]['syndic'] = t_minion
|
||||
@ -2122,16 +2127,38 @@ class MultiSyndic(MinionBase):
|
||||
|
||||
return True
|
||||
except SaltClientError:
|
||||
log.error('Error while bring up minion for multi-master. Is master {0} responding?'.format(master))
|
||||
log.error('Error while bring up minion for multi-syndic. Is master {0} responding?'.format(master))
|
||||
return False
|
||||
|
||||
def _call_syndic(self, func, args=(), kwargs={}, master_id=None):
|
||||
'''
|
||||
Wrapper to call a given func on a syndic, best effort to get the one you asked for
|
||||
'''
|
||||
for master, syndic_dict in self.iter_master_options(master_id):
|
||||
if 'syndic' not in syndic_dict:
|
||||
continue
|
||||
if syndic_dict['dead_until'] > time.time():
|
||||
log.error('Unable to call {0} on {1}, that syndic is dead for now'.format(func, master_id))
|
||||
continue
|
||||
try:
|
||||
getattr(syndic_dict['syndic'], func)(*args, **kwargs)
|
||||
return
|
||||
except SaltClientError:
|
||||
log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id))
|
||||
# TODO: configurable timeout???
|
||||
syndic_dict['dead_until'] = time.time() + 60
|
||||
continue
|
||||
log.critical('Unable to call {0} on any masters!'.format(func))
|
||||
|
||||
|
||||
def iter_master_options(self, master_id=None):
|
||||
'''
|
||||
Iterate (in order) over your options for master
|
||||
'''
|
||||
masters = set(self.master_syndics.keys())
|
||||
masters = self.master_syndics.keys()
|
||||
shuffle(masters)
|
||||
if master_id not in self.master_syndics:
|
||||
master_id = masters.pop()
|
||||
master_id = masters.pop(0)
|
||||
else:
|
||||
masters.remove(master_id)
|
||||
|
||||
@ -2139,30 +2166,7 @@ class MultiSyndic(MinionBase):
|
||||
yield master_id, self.master_syndics[master_id]
|
||||
if len(masters) == 0:
|
||||
break
|
||||
master_id = masters.pop()
|
||||
|
||||
|
||||
def _fire_master(self, data=None, tag=None, events=None, pretag=None):
|
||||
|
||||
# if you don't care, or the master you want we dont have, pick one at random
|
||||
for master, syndic_dict in self.iter_master_options():
|
||||
if syndic_dict['dead_until'] > time.time():
|
||||
log.error('Unable to fire event to {0}, that syndic is dead for now'.format(master))
|
||||
continue
|
||||
try:
|
||||
syndic_dict['syndic']._fire_master(data=data,
|
||||
tag=tag,
|
||||
events=events,
|
||||
pretag=pretag,
|
||||
)
|
||||
return
|
||||
# unable to return, ususally because the master is down
|
||||
except SaltClientError:
|
||||
log.error('Unable to fire event to {0}, trying another...'.format(master))
|
||||
# TODO: configurable timeout???
|
||||
syndic_dict['dead_until'] = time.time() + 60
|
||||
|
||||
log.critical('Unable to fire event on ANY master')
|
||||
master_id = masters.pop(0)
|
||||
|
||||
def _reset_event_aggregation(self):
|
||||
self.jids = {}
|
||||
@ -2178,7 +2182,7 @@ class MultiSyndic(MinionBase):
|
||||
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
|
||||
self.local.event.subscribe('')
|
||||
|
||||
log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id']))
|
||||
log.debug('MultiSyndic {0!r} trying to tune in'.format(self.opts['id']))
|
||||
|
||||
# Share the poller with the event object
|
||||
self.poller = self.local.event.poller
|
||||
@ -2209,7 +2213,6 @@ class MultiSyndic(MinionBase):
|
||||
if 'generator' not in minion_map:
|
||||
# if we couldn't connect, lets try later
|
||||
if not self._connect_to_master(master_id):
|
||||
print ('not connected to', master_id)
|
||||
continue
|
||||
minion_map['generator'].next()
|
||||
|
||||
@ -2273,22 +2276,13 @@ class MultiSyndic(MinionBase):
|
||||
def _forward_events(self):
|
||||
log.trace('Forwarding events')
|
||||
if self.raw_events:
|
||||
self._fire_master(events=self.raw_events,
|
||||
pretag=tagify(self.opts['id'], base='syndic'),
|
||||
self._call_syndic('_fire_master',
|
||||
kwargs={'events': self.raw_events,
|
||||
'pretag': tagify(self.opts['id'], base='syndic')},
|
||||
)
|
||||
for jid, jid_ret in self.jids.iteritems():
|
||||
for master, syndic_dict in self.iter_master_options(jid_ret.get('__master_id__')):
|
||||
if syndic_dict['dead_until'] > time.time():
|
||||
log.error('Unable to return to {0}, that syndic is dead for now'.format(master))
|
||||
continue
|
||||
try:
|
||||
syndic_dict['syndic']._return_pub(jid_ret, '_syndic_return')
|
||||
break
|
||||
except SaltClientError:
|
||||
log.error('Unable to return to {0}, trying another...'.format(master))
|
||||
# TODO: configurable timeout???
|
||||
syndic_dict['dead_until'] = time.time() + 60
|
||||
continue
|
||||
self._call_syndic('_return_pub', args=(jid_ret, '_syndic_return'), master_id=jid_ret.get('__master_id__'))
|
||||
|
||||
self._reset_event_aggregation()
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user