Make Syndic asynchronous.

This commit is contained in:
Dmitry Kuzmenko 2016-03-18 23:35:19 +03:00
parent fe86a3d209
commit a6d93e4891
3 changed files with 230 additions and 46 deletions

View File

@ -69,6 +69,9 @@ except ImportError:
pass
# pylint: enable=import-error
# Import tornado
import tornado.gen # pylint: disable=F0401
log = logging.getLogger(__name__)
@ -1493,6 +1496,102 @@ class LocalClient(object):
return {'jid': payload['load']['jid'],
'minions': payload['load']['minions']}
@tornado.gen.coroutine
def pub_async(self,
tgt,
fun,
arg=(),
expr_form='glob',
ret='',
jid='',
timeout=5,
io_loop=None,
listen=True,
**kwargs):
'''
Take the required arguments and publish the given command.
Arguments:
tgt:
The tgt is a regex or a glob used to match up the ids on
the minions. Salt works by always publishing every command
to all of the minions and then the minions determine if
the command is for them based on the tgt value.
fun:
The function name to be called on the remote host(s), this
must be a string in the format "<modulename>.<function name>"
arg:
The arg option needs to be a tuple of arguments to pass
to the calling function, if left blank
Returns:
jid:
A string, as returned by the publisher, which is the job
id, this will inform the client where to get the job results
minions:
A set, the targets that the tgt passed should match.
'''
# Make sure the publisher is running by checking the unix socket
if (self.opts.get('ipc_mode', '') != 'tcp' and
not os.path.exists(os.path.join(self.opts['sock_dir'],
'publish_pull.ipc'))):
log.error(
'Unable to connect to the salt master publisher at '
'{0}'.format(self.opts['sock_dir'])
)
raise SaltClientError
payload_kwargs = self._prep_pub(
tgt,
fun,
arg,
expr_form,
ret,
jid,
timeout,
**kwargs)
master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \
':' + str(self.opts['ret_port'])
channel = salt.transport.client.AsyncReqChannel.factory(self.opts,
io_loop=io_loop,
crypt='clear',
master_uri=master_uri)
try:
# Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError()
payload = yield channel.send(payload_kwargs, timeout=timeout)
except SaltReqTimeoutError:
raise SaltReqTimeoutError(
'Salt request timed out. The master is not responding. '
'If this error persists after verifying the master is up, '
'worker_threads may need to be increased.'
)
if not payload:
# The master key could have changed out from under us! Regen
# and try again if the key has changed
key = self.__read_master_key()
if key == self.key:
raise tornado.gen.Return(payload)
self.key = key
payload_kwargs['key'] = self.key
payload = yield channel.send(payload_kwargs)
error = payload.pop('error', None)
if error is not None:
raise PublishError(error)
if not payload:
raise tornado.gen.Return(payload)
# We have the payload, let's get rid of the channel fast(GC'ed faster)
del channel
raise tornado.gen.Return({'jid': payload['load']['jid'],
'minions': payload['load']['minions']})
def __del__(self):
# This IS really necessary!
# When running tests, if self.events is not destroyed, we leak 2

View File

@ -2079,6 +2079,9 @@ class Syndic(Minion):
super(Syndic, self).__init__(opts, **kwargs)
self.mminion = salt.minion.MasterMinion(opts)
self.jid_forward_cache = set()
self.jids = {}
self.raw_events = []
self.pub_future = None
def _handle_decoded_payload(self, data):
'''
@ -2107,37 +2110,43 @@ class Syndic(Minion):
if field in data:
kwargs[field] = data[field]
try:
# Send out the publication
self.local.pub(data['tgt'],
data['fun'],
data['arg'],
data['tgt_type'],
data['ret'],
data['jid'],
data['to'],
**kwargs)
except Exception as exc:
log.warning('Unable to forward pub data: {0}'.format(exc))
def timeout_handler(*args):
log.warning('Unable to forward pub data: {0}'.format(args[1]))
return True
with tornado.stack_context.ExceptionStackContext(timeout_handler):
self.local.pub_async(data['tgt'],
data['fun'],
data['arg'],
data['tgt_type'],
data['ret'],
data['jid'],
data['to'],
io_loop=self.io_loop,
callback=lambda _: None,
**kwargs)
def _fire_master_syndic_start(self):
# Send an event to the master that the minion is live
self._fire_master(
'Syndic {0} started at {1}'.format(
self.opts['id'],
time.asctime()
self.opts['id'],
time.asctime()
),
'syndic_start'
'syndic_start',
sync=False,
)
self._fire_master(
'Syndic {0} started at {1}'.format(
self.opts['id'],
time.asctime()
self.opts['id'],
time.asctime()
),
tagify([self.opts['id'], 'start'], 'syndic'),
sync=False,
)
# Syndic Tune In
@tornado.gen.coroutine
def tune_in(self, start=True):
'''
Lock onto the publisher. This is the main event loop for the syndic
@ -2240,17 +2249,45 @@ class Syndic(Minion):
if 'retcode' not in event['data']:
self.raw_events.append(event)
@tornado.gen.coroutine
def _return_pub_multi(self, values):
for value in values:
yield self._return_pub(value,
'_syndic_return',
timeout=self._return_retry_timer(),
sync=False)
def _forward_events(self):
log.trace('Forwarding events') # pylint: disable=no-member
if self.raw_events:
self._fire_master(events=self.raw_events,
events = self.raw_events
self.raw_events = []
self._fire_master(events=events,
pretag=tagify(self.opts['id'], base='syndic'),
)
for jid in self.jids:
self._return_pub(self.jids[jid],
'_syndic_return',
timeout=self._return_retry_timer())
self._reset_event_aggregation()
sync=False)
if self.jids and (self.pub_future is None or self.pub_future.done()):
values = self.jids.values()
self.jids = {}
self.pub_future = self._return_pub_multi(values)
@tornado.gen.coroutine
def reconnect(self):
if hasattr(self, 'pub_channel'):
self.pub_channel.on_recv(None)
if hasattr(self.pub_channel, 'close'):
self.pub_channel.close()
del self.pub_channel
# if eval_master finds a new master for us, self.connected
# will be True again on successful master authentication
master, self.pub_channel = yield self.eval_master(opts=self.opts)
if self.connected:
self.opts['master'] = master
self.pub_channel.on_recv(self._process_cmd_socket)
log.info('Minion is ready to receive requests!')
raise tornado.gen.Return(self)
def destroy(self):
'''
@ -2306,11 +2343,22 @@ class MultiSyndic(MinionBase):
self.jid_forward_cache = set()
if io_loop is None:
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
if HAS_ZMQ:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
else:
self.io_loop = io_loop
# List of events
self.raw_events = []
# Dict of rets: {master_id: {event_tag: job_ret, ...}, ...}
self.job_rets = {}
# List of delayed job_rets which was unable to send for some reason and will be resend to
# any available master
self.delayed = []
# Active pub futures: {master_id: (future, [job_ret, ...]), ...}
self.pub_futures = {}
def _spawn_syndics(self):
'''
Spawn all the coroutines which will sign in the syndics
@ -2360,9 +2408,8 @@ class MultiSyndic(MinionBase):
'''
# if its connected, mark it dead
if self._syndics[master].done():
syndic = self._syndics.result() # pylint: disable=no-member
syndic.destroy()
self._syndics[master] = self._connect_syndic(syndic.opts)
syndic = self._syndics[master].result() # pylint: disable=no-member
self._syndics[master] = syndic.reconnect()
else:
log.info('Attempting to mark {0} as dead, although it is already marked dead'.format(master)) # TODO: debug?
@ -2374,18 +2421,51 @@ class MultiSyndic(MinionBase):
kwargs = {}
for master, syndic_future in self.iter_master_options(master_id):
if not syndic_future.done() or syndic_future.exception():
log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master_id))
log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master))
continue
try:
getattr(syndic_future.result(), func)(*args, **kwargs)
return
except SaltClientError:
log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id))
log.error('Unable to call {0} on {1}, trying another...'.format(func, master))
self._mark_master_dead(master)
continue
log.critical('Unable to call {0} on any masters!'.format(func))
def _return_pub_syndic(self, values, master_id=None):
'''
Wrapper to call the '_return_pub_multi' a syndic, best effort to get the one you asked for
'''
func = '_return_pub_multi'
for master, syndic_future in self.iter_master_options(master_id):
if not syndic_future.done() or syndic_future.exception():
log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master))
continue
future, data = self.pub_futures.get(master, (None, None))
if future is not None:
if not future.done():
if master == master_id:
# Targeted master previous send not done yet, call again later
return False
else:
# Fallback master is busy, try the next one
continue
elif future.exception():
# Previous execution on this master returned an error
log.error('Unable to call {0} on {1}, trying another...'.format(func, master))
self._mark_master_dead(master)
del self.pub_futures[master]
# Add not sent data to the delayed list and try the next master
self.delayed.extend(data)
continue
future = getattr(syndic_future.result(), func)(values)
self.pub_futures[master] = (future, values)
return True
# Loop done and didn't exit: wasn't sent, try again later
return False
def iter_master_options(self, master_id=None):
'''
Iterate (in order) over your options for master
@ -2405,7 +2485,7 @@ class MultiSyndic(MinionBase):
master_id = masters.pop(0)
def _reset_event_aggregation(self):
self.jids = {}
self.job_rets = {}
self.raw_events = []
# Syndic Tune In
@ -2453,7 +2533,8 @@ class MultiSyndic(MinionBase):
log.debug('Return received with matching master_id, not forwarding')
return
jdict = self.jids.setdefault(event['tag'], {})
master = event['data'].get('master_id')
jdict = self.job_rets.setdefault(master, {}).setdefault(event['tag'], {})
if not jdict:
jdict['__fun__'] = event['data'].get('fun')
jdict['__jid__'] = event['data']['jid']
@ -2471,9 +2552,9 @@ class MultiSyndic(MinionBase):
tmp = sorted(list(self.jid_forward_cache))
tmp.pop(0)
self.jid_forward_cache = set(tmp)
if 'master_id' in event['data']:
if master is not None:
# __'s to make sure it doesn't print out on the master cli
jdict['__master_id__'] = event['data']['master_id']
jdict['__master_id__'] = master
jdict[event['data']['id']] = event['data']['return']
else:
# TODO: config to forward these? If so we'll have to keep track of who
@ -2487,20 +2568,24 @@ class MultiSyndic(MinionBase):
def _forward_events(self):
log.trace('Forwarding events') # pylint: disable=no-member
if self.raw_events:
events = self.raw_events
self.raw_events = []
self._call_syndic('_fire_master',
kwargs={'events': self.raw_events,
kwargs={'events': events,
'pretag': tagify(self.opts['id'], base='syndic'),
'timeout': self.SYNDIC_EVENT_TIMEOUT,
'sync': False,
},
)
for jid, jid_ret in self.jids.items():
self._call_syndic('_return_pub',
args=(jid_ret, '_syndic_return'),
kwargs={'timeout': self.SYNDIC_EVENT_TIMEOUT},
master_id=jid_ret.get('__master_id__'),
)
self._reset_event_aggregation()
if self.delayed:
res = self._return_pub_syndic(self.delayed)
if res:
self.delayed = []
for master in list(six.iterkeys(self.job_rets)):
values = self.job_rets[master].values()
res = self._return_pub_syndic(values, master_id=master)
if res:
del self.job_rets[master]
class Matcher(object):

View File

@ -376,7 +376,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
@tornado.gen.coroutine
def connect(self):
try:
self.auth = salt.crypt.AsyncAuth(self.opts)
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
self.tok = self.auth.gen_token('salt')
if not self.auth.authenticated:
yield self.auth.authenticate()
@ -551,7 +551,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
log.trace('req client disconnected {0}'.format(address))
self.clients.remove((stream, address))
except Exception as e:
log.trace('other master-side exception??', e, e.__module__, e.extra)
log.trace('other master-side exception: {0}'.format(e))
self.clients.remove((stream, address))
stream.close()