mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
Basics working, can't handle restart of a master yet...
This commit is contained in:
parent
2b8c106108
commit
f226809cd5
@ -420,7 +420,11 @@ class Syndic(parsers.SyndicOptionParser):
|
||||
# Late import so logging works correctly
|
||||
import salt.minion
|
||||
self.daemonize_if_required()
|
||||
self.syndic = salt.minion.Syndic(self.config)
|
||||
# if its a multisyndic, do so
|
||||
if isinstance(self.config.get('master'), list):
|
||||
self.syndic = salt.minion.MultiSyndic(self.config)
|
||||
else:
|
||||
self.syndic = salt.minion.Syndic(self.config)
|
||||
self.set_pidfile()
|
||||
|
||||
def start(self):
|
||||
|
247
salt/minion.py
247
salt/minion.py
@ -2005,7 +2005,6 @@ class Syndic(Minion):
|
||||
self._fire_master(events=self.raw_events,
|
||||
pretag=tagify(self.opts['id'], base='syndic'),
|
||||
)
|
||||
print (self.jids)
|
||||
for jid in self.jids:
|
||||
self._return_pub(self.jids[jid], '_syndic_return')
|
||||
self._reset_event_aggregation()
|
||||
@ -2021,6 +2020,252 @@ class Syndic(Minion):
|
||||
if hasattr(self, 'local'):
|
||||
del self.local
|
||||
|
||||
class MultiSyndic(Syndic):
|
||||
'''
|
||||
Make a Syndic minion, this minion will handle relaying jobs and returns from
|
||||
all minions connected to it to the list of masters it is connected to.
|
||||
|
||||
Note: jobs will be returned best-effort to the requesting master
|
||||
'''
|
||||
# timeout for one of the connections to an upstream master
|
||||
MASTER_MINION_CONNECT_TIMEOUT = 5
|
||||
def __init__(self, opts):
|
||||
opts['loop_interval'] = 1
|
||||
# keep a copy, since we'll be messing with it a bit
|
||||
self.raw_opts = copy.copy(opts)
|
||||
super(MultiSyndic, self).__init__(opts)
|
||||
|
||||
self.context = zmq.Context()
|
||||
|
||||
self.master_minions = {}
|
||||
for master in set(self.raw_opts['master']):
|
||||
s_opts = copy.copy(self.raw_opts)
|
||||
s_opts['master'] = master
|
||||
self.master_minions[master] = {'opts': s_opts,
|
||||
'auth_wait': s_opts['acceptance_wait_time']}
|
||||
self._connect_to_master(master)
|
||||
|
||||
def _connect_to_master(self, master):
|
||||
'''
|
||||
Attempt to connect to master, including back-off for each one
|
||||
|
||||
return boolean of wether you connected or not
|
||||
'''
|
||||
if master not in self.master_minions:
|
||||
log.error('Unable to connect to {0}, not in the list of masters'.format(master))
|
||||
return False
|
||||
|
||||
minion = self.master_minions[master]
|
||||
if time.time() - minion['auth_wait'] > minion.get('last', 0):
|
||||
minion['last'] = time.time()
|
||||
if minion['auth_wait'] < self.opts['acceptance_wait_time_max']:
|
||||
minion['auth_wait'] += auth_wait
|
||||
try:
|
||||
t_minion = Minion(minion['opts'], self.MASTER_MINION_CONNECT_TIMEOUT, False)
|
||||
|
||||
# TODO: make less terrible...
|
||||
# connect the minion to the master
|
||||
t_minion.socket = self.context.socket(zmq.SUB)
|
||||
t_minion._set_reconnect_ivl()
|
||||
t_minion._setsockopts()
|
||||
t_minion.socket.connect(t_minion.master_pub)
|
||||
|
||||
self.master_minions[master]['minion'] = t_minion
|
||||
self.master_minions[master]['auth_wait'] = self.opts['acceptance_wait_time']
|
||||
|
||||
return True
|
||||
except SaltClientError:
|
||||
log.error('Error while bring up minion for multi-master. Is master {0} responding?'.format(master))
|
||||
return False
|
||||
|
||||
def _pick_master(self, master_id=None):
|
||||
'''
|
||||
Pick a master (based on the master_id if you have it) and change the opts
|
||||
to match that master (MAJOR HACK)
|
||||
'''
|
||||
if master_id not in self.master_minions:
|
||||
# TODO: make random
|
||||
master_id = self.master_minions.keys()[0]
|
||||
|
||||
self.opts = self.master_minions[master_id]['minion'].opts
|
||||
return master_id
|
||||
|
||||
def _fire_master(self, data=None, tag=None, events=None, pretag=None):
|
||||
# if you don't care, or the master you want we dont have, pick one at random
|
||||
master = self._pick_master()
|
||||
|
||||
self.master_minions[master]['minion']._fire_master(data=data,
|
||||
tag=tag,
|
||||
events=events,
|
||||
pretag=pretag,
|
||||
)
|
||||
|
||||
# Syndic Tune In
|
||||
def tune_in(self):
|
||||
'''
|
||||
Lock onto the publisher. This is the main event loop for the syndic
|
||||
'''
|
||||
# Instantiate the local client
|
||||
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
|
||||
self.local.event.subscribe('')
|
||||
self.local.opts['interface'] = self._syndic_interface
|
||||
|
||||
signal.signal(signal.SIGTERM, self.clean_die)
|
||||
log.debug('Syndic {0!r} trying to tune in'.format(self.opts['id']))
|
||||
|
||||
# Share the poller with the event object
|
||||
self.poller = self.local.event.poller
|
||||
|
||||
# register the master_minions on this poller
|
||||
for master_id, minion_map in self.master_minions.iteritems():
|
||||
if 'minion' not in minion_map:
|
||||
continue
|
||||
self.poller.register(minion_map['minion'].socket, zmq.POLLIN)
|
||||
|
||||
|
||||
# Send an event to the master that the minion is live
|
||||
self._fire_master(
|
||||
'Syndic {0} started at {1}'.format(
|
||||
self.opts['id'],
|
||||
time.asctime()
|
||||
),
|
||||
'syndic_start'
|
||||
)
|
||||
self._fire_master(
|
||||
'Syndic {0} started at {1}'.format(
|
||||
self.opts['id'],
|
||||
time.asctime()
|
||||
),
|
||||
tagify([self.opts['id'], 'start'], 'syndic'),
|
||||
)
|
||||
|
||||
# Make sure to gracefully handle SIGUSR1
|
||||
enable_sigusr1_handler()
|
||||
|
||||
loop_interval = int(self.opts['loop_interval'])
|
||||
self._reset_event_aggregation()
|
||||
while True:
|
||||
try:
|
||||
# Do all the maths in seconds
|
||||
timeout = loop_interval
|
||||
if self.event_forward_timeout is not None:
|
||||
timeout = min(timeout,
|
||||
self.event_forward_timeout - time.time())
|
||||
if timeout >= 0:
|
||||
log.trace('Polling timeout: %f', timeout)
|
||||
socks = dict(self.poller.poll(timeout * 1000))
|
||||
else:
|
||||
# This shouldn't really happen.
|
||||
# But there's no harm being defensive
|
||||
log.warning('Negative timeout in syndic main loop')
|
||||
socks = {}
|
||||
# check all of your master_minions
|
||||
for master_id, minion_map in self.master_minions.iteritems():
|
||||
if 'minion' not in minion_map:
|
||||
if not self._connect_to_master(master_id):
|
||||
continue
|
||||
self.poller.register(minion_map['minion'].socket, zmq.POLLIN)
|
||||
# check if you had events
|
||||
if socks.get(minion_map['minion'].socket) == zmq.POLLIN:
|
||||
self._process_cmd_socket(minion_map['minion'])
|
||||
|
||||
if socks.get(self.local.event.sub) == zmq.POLLIN:
|
||||
self._process_event_socket()
|
||||
|
||||
if (self.event_forward_timeout is not None and
|
||||
self.event_forward_timeout < time.time()):
|
||||
self._forward_events()
|
||||
# We don't handle ZMQErrors like the other minions
|
||||
# I've put explicit handling around the recieve calls
|
||||
# in the process_*_socket methods. If we see any other
|
||||
# errors they may need some kind of handling so log them
|
||||
# for now.
|
||||
except Exception:
|
||||
log.critical(
|
||||
'An exception occurred while polling the syndic',
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
def _process_cmd_socket(self, minion):
|
||||
'''
|
||||
process a pub job that a minion recieved
|
||||
'''
|
||||
try:
|
||||
messages = minion.socket.recv_multipart(zmq.NOBLOCK)
|
||||
messages_len = len(messages)
|
||||
idx = None
|
||||
if messages_len == 1:
|
||||
idx = 0
|
||||
elif messages_len == 2:
|
||||
idx = 1
|
||||
else:
|
||||
raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len))
|
||||
|
||||
payload = minion.serial.loads(messages[idx])
|
||||
except zmq.ZMQError as e:
|
||||
# Swallow errors for bad wakeups or signals needing processing
|
||||
if e.errno != errno.EAGAIN and e.errno != errno.EINTR:
|
||||
raise
|
||||
log.trace('Handling payload')
|
||||
# TODO: remove this Terrible hack...
|
||||
self.crypticle = minion.crypticle
|
||||
self._handle_payload(payload)
|
||||
|
||||
def _reset_event_aggregation(self):
|
||||
self.jids = {}
|
||||
self.raw_events = []
|
||||
self.event_forward_timeout = None
|
||||
|
||||
def _process_event_socket(self):
|
||||
tout = time.time() + self.opts['syndic_max_event_process_time']
|
||||
while tout > time.time():
|
||||
try:
|
||||
event = self.local.event.get_event_noblock()
|
||||
except zmq.ZMQError as e:
|
||||
# EAGAIN indicates no more events at the moment
|
||||
# EINTR some kind of signal maybe someone trying
|
||||
# to get us to quit so escape our timeout
|
||||
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
|
||||
break
|
||||
raise
|
||||
log.trace('Got event {0}'.format(event['tag']))
|
||||
if self.event_forward_timeout is None:
|
||||
self.event_forward_timeout = (
|
||||
time.time() + self.opts['syndic_event_forward_timeout']
|
||||
)
|
||||
if salt.utils.is_jid(event['tag']) and 'return' in event['data']:
|
||||
if 'jid' not in event['data']:
|
||||
# Not a job return
|
||||
continue
|
||||
jdict = self.jids.setdefault(event['tag'], {})
|
||||
if not jdict:
|
||||
jdict['__fun__'] = event['data'].get('fun')
|
||||
jdict['__jid__'] = event['data']['jid']
|
||||
jdict['__load__'] = {}
|
||||
fstr = '{0}.get_jid'.format(self.opts['master_job_cache'])
|
||||
jdict['__load__'].update(
|
||||
self.mminion.returners[fstr](event['data']['jid'])
|
||||
)
|
||||
if 'master_id' in event['data']:
|
||||
# __'s to make sure it doesn't print out on the master cli
|
||||
jdict['__master_id__'] = event['data']['master_id']
|
||||
jdict[event['data']['id']] = event['data']['return']
|
||||
else:
|
||||
# Add generic event aggregation here
|
||||
if 'retcode' not in event['data']:
|
||||
self.raw_events.append(event)
|
||||
|
||||
def _forward_events(self):
|
||||
log.trace('Forwarding events')
|
||||
if self.raw_events:
|
||||
self._fire_master(events=self.raw_events,
|
||||
pretag=tagify(self.opts['id'], base='syndic'),
|
||||
)
|
||||
for jid, jid_ret in self.jids.iteritems():
|
||||
self._pick_master(jid_ret.get('__master_id__'))
|
||||
self._return_pub(jid_ret, '_syndic_return')
|
||||
self._reset_event_aggregation()
|
||||
|
||||
|
||||
class Matcher(object):
|
||||
'''
|
||||
|
Loading…
Reference in New Issue
Block a user