Merge pull request #14690 from jacksontj/multi_syndic

Multi syndic
This commit is contained in:
Thomas S Hatch 2014-08-06 20:53:39 -06:00
commit de3a1ce283
4 changed files with 333 additions and 26 deletions

View File

@ -61,6 +61,20 @@ The network port to set up the publication interface
publish_port: 4505
.. conf_master:: master_id
``master_id``
----------------
Default: ``None``
The id to be passed in the publish job to minions. This is used for MultiSyndics
to return the job to the requesting master. Note, this must be the same string
as the syndic is configured with.
.. code-block:: yaml
master_id: MasterOfMaster
.. conf_master:: user

View File

@ -420,7 +420,11 @@ class Syndic(parsers.SyndicOptionParser):
# Late import so logging works correctly
import salt.minion
self.daemonize_if_required()
self.syndic = salt.minion.Syndic(self.config)
# if its a multisyndic, do so
if isinstance(self.config.get('master'), list):
self.syndic = salt.minion.MultiSyndic(self.config)
else:
self.syndic = salt.minion.Syndic(self.config)
self.set_pidfile()
def start(self):

View File

@ -2202,6 +2202,11 @@ class ClearFuncs(object):
'jid': clear_load['jid'],
'ret': clear_load['ret'],
}
# if you specified a master id, lets put that in the load
if 'master_id' in self.opts:
load['master_id'] = self.opts['master_id']
elif 'master_id' in extra:
load['master_id'] = extra['master_id']
if 'id' in extra:
load['id'] = extra['id']

View File

@ -1066,6 +1066,8 @@ class Minion(MinionBase):
ret['jid'] = data['jid']
ret['fun'] = data['fun']
ret['fun_args'] = data['arg']
if 'master_id' in data:
ret['master_id'] = data['master_id']
minion_instance._return_pub(ret)
if data['ret']:
ret['id'] = opts['id']
@ -1779,13 +1781,13 @@ class Syndic(Minion):
Make a Syndic minion, this minion will use the minion keys on the
master to authenticate with a higher level master.
'''
def __init__(self, opts):
def __init__(self, opts, **kwargs):
self._syndic_interface = opts.get('interface')
self._syndic = True
# force auth_safemode True because Syndic dont support autorestart
opts['auth_safemode'] = True
opts['loop_interval'] = 1
super(Syndic, self).__init__(opts)
super(Syndic, self).__init__(opts, **kwargs)
self.mminion = salt.minion.MasterMinion(opts)
def _handle_aes(self, load, sig=None):
@ -1801,7 +1803,7 @@ class Syndic(Minion):
data = self.crypticle.loads(load)
# Verify that the publication is valid
if 'tgt' not in data or 'jid' not in data or 'fun' not in data \
or 'to' not in data or 'arg' not in data:
or 'arg' not in data:
return
data['to'] = int(data['to']) - 1
if 'user' in data:
@ -1834,6 +1836,12 @@ class Syndic(Minion):
# Set up default tgt_type
if 'tgt_type' not in data:
data['tgt_type'] = 'glob'
kwargs = {}
# if a master_id is in the data, add it to publish job
if 'master_id' in data:
kwargs['master_id'] = data['master_id']
# Send out the publication
self.local.pub(data['tgt'],
data['fun'],
@ -1841,27 +1849,10 @@ class Syndic(Minion):
data['tgt_type'],
data['ret'],
data['jid'],
data['to'])
data['to'],
**kwargs)
# Syndic Tune In
def tune_in(self):
'''
Lock onto the publisher. This is the main event loop for the syndic
'''
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local.event.subscribe('')
self.local.opts['interface'] = self._syndic_interface
signal.signal(signal.SIGTERM, self.clean_die)
log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id']))
self.context = zmq.Context()
# Start with the publish socket
# Share the poller with the event object
self.poller = self.local.event.poller
self.socket = self.context.socket(zmq.SUB)
def _setsockopts(self):
# no filters for syndication masters, unless we want to maintain a
# list of all connected minions and update the filter
self.socket.setsockopt(zmq.SUBSCRIBE, '')
@ -1870,8 +1861,7 @@ class Syndic(Minion):
self._set_reconnect_ivl_max()
self._set_tcp_keepalive()
self.socket.connect(self.master_pub)
self.poller.register(self.socket, zmq.POLLIN)
def _fire_master_syndic_start(self):
# Send an event to the master that the minion is live
self._fire_master(
'Syndic {0} started at {1}'.format(
@ -1888,6 +1878,69 @@ class Syndic(Minion):
tagify([self.opts['id'], 'start'], 'syndic'),
)
def tune_in_no_block(self):
'''
Executes the tune_in sequence but omits extra logging and the
management of the event bus assuming that these are handled outside
the tune_in sequence
'''
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local.event.subscribe('')
self._init_context_and_poller()
self.socket = self.context.socket(zmq.SUB)
self._setsockopts()
self.socket.connect(self.master_pub)
self.poller.register(self.socket, zmq.POLLIN)
loop_interval = int(self.opts['loop_interval'])
self._fire_master_syndic_start()
while True:
try:
socks = dict(self.poller.poll(loop_interval * 1000))
if socks.get(self.socket) == zmq.POLLIN:
self._process_cmd_socket()
except zmq.ZMQError:
yield True
except Exception:
log.critical(
'An exception occurred while polling the minion',
exc_info=True
)
yield True
# Syndic Tune In
def tune_in(self):
'''
Lock onto the publisher. This is the main event loop for the syndic
'''
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local.event.subscribe('')
self.local.opts['interface'] = self._syndic_interface
signal.signal(signal.SIGTERM, self.clean_die)
log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id']))
self._init_context_and_poller()
# Start with the publish socket
# Share the poller with the event object
self.socket = self.context.socket(zmq.SUB)
self._setsockopts()
self.socket.connect(self.master_pub)
self.poller.register(self.socket, zmq.POLLIN)
# Send an event to the master that the minion is live
self._fire_master_syndic_start()
# Make sure to gracefully handle SIGUSR1
enable_sigusr1_handler()
@ -1981,6 +2034,8 @@ class Syndic(Minion):
jdict['__load__'].update(
self.mminion.returners[fstr](event['data']['jid'])
)
if 'master_id' in event['data']:
jdict['master_id'] = event['data']['master_id']
jdict[event['data']['id']] = event['data']['return']
else:
# Add generic event aggregation here
@ -2009,6 +2064,235 @@ class Syndic(Minion):
del self.local
class MultiSyndic(MinionBase):
'''
Make a MultiSyndic minion, this minion will handle relaying jobs and returns from
all minions connected to it to the list of masters it is connected to.
Note: jobs will be returned best-effort to the requesting master. This also means
(since we are using zmq) that if a job was fired and the master disconnects
between the publish and return, that the return will end up in a zmq buffer
in this Syndic headed to that original master.
In addition, since these classes all seem to use a mix of blocking and non-blocking
calls (with varying timeouts along the way) this daemon does not handle failure well,
it will (under most circumstances) stall the daemon for ~60s attempting to re-auth
with the down master
'''
# time to connect to upstream master
SYNDIC_CONNECT_TIMEOUT = 5
def __init__(self, opts):
opts['loop_interval'] = 1
super(MultiSyndic, self).__init__(opts)
self.mminion = salt.minion.MasterMinion(opts)
# create all of the syndics you need
self.master_syndics = {}
for master in set(self.opts['master']):
s_opts = copy.copy(self.opts)
s_opts['master'] = master
self.master_syndics[master] = {'opts': s_opts,
'auth_wait': s_opts['acceptance_wait_time'],
'dead_until': 0}
self._connect_to_master(master)
# TODO: do we need all of this?
def _connect_to_master(self, master):
'''
Attempt to connect to master, including back-off for each one
return boolean of wether you connected or not
'''
if master not in self.master_syndics:
log.error('Unable to connect to {0}, not in the list of masters'.format(master))
return False
minion = self.master_syndics[master]
# if we need to be dead for a while, stay that way
if minion['dead_until'] > time.time():
return False
if time.time() - minion['auth_wait'] > minion.get('last', 0):
try:
t_minion = Syndic(minion['opts'],
timeout=self.SYNDIC_CONNECT_TIMEOUT,
safe=False,
)
self.master_syndics[master]['syndic'] = t_minion
self.master_syndics[master]['generator'] = t_minion.tune_in_no_block()
self.master_syndics[master]['auth_wait'] = self.opts['acceptance_wait_time']
self.master_syndics[master]['dead_until'] = 0
return True
except SaltClientError:
log.error('Error while bring up minion for multi-syndic. Is master {0} responding?'.format(master))
# re-use auth-wait as backoff for syndic
minion['dead_until'] = time.time() + minion['auth_wait']
if minion['auth_wait'] < self.opts['acceptance_wait_time_max']:
minion['auth_wait'] += self.opts['acceptance_wait_time']
return False
def _call_syndic(self, func, args=(), kwargs=None, master_id=None):
'''
Wrapper to call a given func on a syndic, best effort to get the one you asked for
'''
if kwargs is None:
kwargs = {}
for master, syndic_dict in self.iter_master_options(master_id):
if 'syndic' not in syndic_dict:
continue
if syndic_dict['dead_until'] > time.time():
log.error('Unable to call {0} on {1}, that syndic is dead for now'.format(func, master_id))
continue
try:
getattr(syndic_dict['syndic'], func)(*args, **kwargs)
return
except SaltClientError:
log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id))
# re-use auth-wait as backoff for syndic
syndic_dict['dead_until'] = time.time() + syndic_dict['auth_wait']
if syndic_dict['auth_wait'] < self.opts['acceptance_wait_time_max']:
syndic_dict['auth_wait'] += self.opts['acceptance_wait_time']
continue
log.critical('Unable to call {0} on any masters!'.format(func))
def iter_master_options(self, master_id=None):
'''
Iterate (in order) over your options for master
'''
masters = self.master_syndics.keys()
shuffle(masters)
if master_id not in self.master_syndics:
master_id = masters.pop(0)
else:
masters.remove(master_id)
while True:
yield master_id, self.master_syndics[master_id]
if len(masters) == 0:
break
master_id = masters.pop(0)
def _reset_event_aggregation(self):
self.jids = {}
self.raw_events = []
self.event_forward_timeout = None
# Syndic Tune In
def tune_in(self):
'''
Lock onto the publisher. This is the main event loop for the syndic
'''
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local.event.subscribe('')
log.debug('MultiSyndic {0!r} trying to tune in'.format(self.opts['id']))
# Share the poller with the event object
self.poller = self.local.event.poller
# Make sure to gracefully handle SIGUSR1
enable_sigusr1_handler()
loop_interval = int(self.opts['loop_interval'])
self._reset_event_aggregation()
while True:
try:
# Do all the maths in seconds
timeout = loop_interval
if self.event_forward_timeout is not None:
timeout = min(timeout,
self.event_forward_timeout - time.time())
if timeout >= 0:
log.trace('Polling timeout: %f', timeout)
socks = dict(self.poller.poll(timeout * 1000))
else:
# This shouldn't really happen.
# But there's no harm being defensive
log.warning('Negative timeout in syndic main loop')
socks = {}
# check all of your master_syndics, have them do their thing
for master_id, syndic_dict in self.master_syndics.iteritems():
# if not connected, lets try
if 'generator' not in syndic_dict:
# if we couldn't connect, lets try later
if not self._connect_to_master(master_id):
continue
syndic_dict['generator'].next()
# events
if socks.get(self.local.event.sub) == zmq.POLLIN:
self._process_event_socket()
if (self.event_forward_timeout is not None and
self.event_forward_timeout < time.time()):
self._forward_events()
# We don't handle ZMQErrors like the other minions
# I've put explicit handling around the recieve calls
# in the process_*_socket methods. If we see any other
# errors they may need some kind of handling so log them
# for now.
except Exception:
log.critical(
'An exception occurred while polling the syndic',
exc_info=True
)
def _process_event_socket(self):
tout = time.time() + self.opts['syndic_max_event_process_time']
while tout > time.time():
try:
event = self.local.event.get_event_noblock()
except zmq.ZMQError as e:
# EAGAIN indicates no more events at the moment
# EINTR some kind of signal maybe someone trying
# to get us to quit so escape our timeout
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
break
raise
log.trace('Got event {0}'.format(event['tag']))
if self.event_forward_timeout is None:
self.event_forward_timeout = (
time.time() + self.opts['syndic_event_forward_timeout']
)
if salt.utils.is_jid(event['tag']) and 'return' in event['data']:
if 'jid' not in event['data']:
# Not a job return
continue
jdict = self.jids.setdefault(event['tag'], {})
if not jdict:
jdict['__fun__'] = event['data'].get('fun')
jdict['__jid__'] = event['data']['jid']
jdict['__load__'] = {}
fstr = '{0}.get_jid'.format(self.opts['master_job_cache'])
jdict['__load__'].update(
self.mminion.returners[fstr](event['data']['jid'])
)
if 'master_id' in event['data']:
# __'s to make sure it doesn't print out on the master cli
jdict['__master_id__'] = event['data']['master_id']
jdict[event['data']['id']] = event['data']['return']
else:
# Add generic event aggregation here
if 'retcode' not in event['data']:
self.raw_events.append(event)
def _forward_events(self):
log.trace('Forwarding events')
if self.raw_events:
self._call_syndic('_fire_master',
kwargs={'events': self.raw_events,
'pretag': tagify(self.opts['id'], base='syndic')},
)
for jid, jid_ret in self.jids.iteritems():
self._call_syndic('_return_pub', args=(jid_ret, '_syndic_return'), master_id=jid_ret.get('__master_id__'))
self._reset_event_aggregation()
class Matcher(object):
'''
Use to return the value for matching calls from the master