From a6d93e48915b081b6e54b5eeef8275cc3ddbd614 Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko Date: Fri, 18 Mar 2016 23:35:19 +0300 Subject: [PATCH] Make Syndic asynchronous. --- salt/client/__init__.py | 99 +++++++++++++++++++++++ salt/minion.py | 173 ++++++++++++++++++++++++++++++---------- salt/transport/tcp.py | 4 +- 3 files changed, 230 insertions(+), 46 deletions(-) diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 9202f035e1..843ce4122c 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -69,6 +69,9 @@ except ImportError: pass # pylint: enable=import-error +# Import tornado +import tornado.gen # pylint: disable=F0401 + log = logging.getLogger(__name__) @@ -1493,6 +1496,102 @@ class LocalClient(object): return {'jid': payload['load']['jid'], 'minions': payload['load']['minions']} + @tornado.gen.coroutine + def pub_async(self, + tgt, + fun, + arg=(), + expr_form='glob', + ret='', + jid='', + timeout=5, + io_loop=None, + listen=True, + **kwargs): + ''' + Take the required arguments and publish the given command. + Arguments: + tgt: + The tgt is a regex or a glob used to match up the ids on + the minions. Salt works by always publishing every command + to all of the minions and then the minions determine if + the command is for them based on the tgt value. + fun: + The function name to be called on the remote host(s), this + must be a string in the format "." + arg: + The arg option needs to be a tuple of arguments to pass + to the calling function, if left blank + Returns: + jid: + A string, as returned by the publisher, which is the job + id, this will inform the client where to get the job results + minions: + A set, the targets that the tgt passed should match. + ''' + # Make sure the publisher is running by checking the unix socket + if (self.opts.get('ipc_mode', '') != 'tcp' and + not os.path.exists(os.path.join(self.opts['sock_dir'], + 'publish_pull.ipc'))): + log.error( + 'Unable to connect to the salt master publisher at ' + '{0}'.format(self.opts['sock_dir']) + ) + raise SaltClientError + + payload_kwargs = self._prep_pub( + tgt, + fun, + arg, + expr_form, + ret, + jid, + timeout, + **kwargs) + + master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \ + ':' + str(self.opts['ret_port']) + channel = salt.transport.client.AsyncReqChannel.factory(self.opts, + io_loop=io_loop, + crypt='clear', + master_uri=master_uri) + + try: + # Ensure that the event subscriber is connected. + # If not, we won't get a response, so error out + if listen and not self.event.connect_pub(timeout=timeout): + raise SaltReqTimeoutError() + payload = yield channel.send(payload_kwargs, timeout=timeout) + except SaltReqTimeoutError: + raise SaltReqTimeoutError( + 'Salt request timed out. The master is not responding. ' + 'If this error persists after verifying the master is up, ' + 'worker_threads may need to be increased.' + ) + + if not payload: + # The master key could have changed out from under us! Regen + # and try again if the key has changed + key = self.__read_master_key() + if key == self.key: + raise tornado.gen.Return(payload) + self.key = key + payload_kwargs['key'] = self.key + payload = yield channel.send(payload_kwargs) + + error = payload.pop('error', None) + if error is not None: + raise PublishError(error) + + if not payload: + raise tornado.gen.Return(payload) + + # We have the payload, let's get rid of the channel fast(GC'ed faster) + del channel + + raise tornado.gen.Return({'jid': payload['load']['jid'], + 'minions': payload['load']['minions']}) + def __del__(self): # This IS really necessary! # When running tests, if self.events is not destroyed, we leak 2 diff --git a/salt/minion.py b/salt/minion.py index 1a4d07c0bd..ab43fae948 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2079,6 +2079,9 @@ class Syndic(Minion): super(Syndic, self).__init__(opts, **kwargs) self.mminion = salt.minion.MasterMinion(opts) self.jid_forward_cache = set() + self.jids = {} + self.raw_events = [] + self.pub_future = None def _handle_decoded_payload(self, data): ''' @@ -2107,37 +2110,43 @@ class Syndic(Minion): if field in data: kwargs[field] = data[field] - try: - # Send out the publication - self.local.pub(data['tgt'], - data['fun'], - data['arg'], - data['tgt_type'], - data['ret'], - data['jid'], - data['to'], - **kwargs) - except Exception as exc: - log.warning('Unable to forward pub data: {0}'.format(exc)) + def timeout_handler(*args): + log.warning('Unable to forward pub data: {0}'.format(args[1])) + return True + + with tornado.stack_context.ExceptionStackContext(timeout_handler): + self.local.pub_async(data['tgt'], + data['fun'], + data['arg'], + data['tgt_type'], + data['ret'], + data['jid'], + data['to'], + io_loop=self.io_loop, + callback=lambda _: None, + **kwargs) def _fire_master_syndic_start(self): # Send an event to the master that the minion is live self._fire_master( 'Syndic {0} started at {1}'.format( - self.opts['id'], - time.asctime() + self.opts['id'], + time.asctime() ), - 'syndic_start' + 'syndic_start', + sync=False, ) self._fire_master( 'Syndic {0} started at {1}'.format( - self.opts['id'], - time.asctime() + self.opts['id'], + time.asctime() ), tagify([self.opts['id'], 'start'], 'syndic'), + sync=False, ) # Syndic Tune In + @tornado.gen.coroutine def tune_in(self, start=True): ''' Lock onto the publisher. This is the main event loop for the syndic @@ -2240,17 +2249,45 @@ class Syndic(Minion): if 'retcode' not in event['data']: self.raw_events.append(event) + @tornado.gen.coroutine + def _return_pub_multi(self, values): + for value in values: + yield self._return_pub(value, + '_syndic_return', + timeout=self._return_retry_timer(), + sync=False) + def _forward_events(self): log.trace('Forwarding events') # pylint: disable=no-member if self.raw_events: - self._fire_master(events=self.raw_events, + events = self.raw_events + self.raw_events = [] + self._fire_master(events=events, pretag=tagify(self.opts['id'], base='syndic'), - ) - for jid in self.jids: - self._return_pub(self.jids[jid], - '_syndic_return', - timeout=self._return_retry_timer()) - self._reset_event_aggregation() + sync=False) + if self.jids and (self.pub_future is None or self.pub_future.done()): + values = self.jids.values() + self.jids = {} + self.pub_future = self._return_pub_multi(values) + + @tornado.gen.coroutine + def reconnect(self): + if hasattr(self, 'pub_channel'): + self.pub_channel.on_recv(None) + if hasattr(self.pub_channel, 'close'): + self.pub_channel.close() + del self.pub_channel + + # if eval_master finds a new master for us, self.connected + # will be True again on successful master authentication + master, self.pub_channel = yield self.eval_master(opts=self.opts) + + if self.connected: + self.opts['master'] = master + self.pub_channel.on_recv(self._process_cmd_socket) + log.info('Minion is ready to receive requests!') + + raise tornado.gen.Return(self) def destroy(self): ''' @@ -2306,11 +2343,22 @@ class MultiSyndic(MinionBase): self.jid_forward_cache = set() if io_loop is None: - zmq.eventloop.ioloop.install() - self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop() + if HAS_ZMQ: + zmq.eventloop.ioloop.install() + self.io_loop = LOOP_CLASS.current() else: self.io_loop = io_loop + # List of events + self.raw_events = [] + # Dict of rets: {master_id: {event_tag: job_ret, ...}, ...} + self.job_rets = {} + # List of delayed job_rets which was unable to send for some reason and will be resend to + # any available master + self.delayed = [] + # Active pub futures: {master_id: (future, [job_ret, ...]), ...} + self.pub_futures = {} + def _spawn_syndics(self): ''' Spawn all the coroutines which will sign in the syndics @@ -2360,9 +2408,8 @@ class MultiSyndic(MinionBase): ''' # if its connected, mark it dead if self._syndics[master].done(): - syndic = self._syndics.result() # pylint: disable=no-member - syndic.destroy() - self._syndics[master] = self._connect_syndic(syndic.opts) + syndic = self._syndics[master].result() # pylint: disable=no-member + self._syndics[master] = syndic.reconnect() else: log.info('Attempting to mark {0} as dead, although it is already marked dead'.format(master)) # TODO: debug? @@ -2374,18 +2421,51 @@ class MultiSyndic(MinionBase): kwargs = {} for master, syndic_future in self.iter_master_options(master_id): if not syndic_future.done() or syndic_future.exception(): - log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master_id)) + log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master)) continue try: getattr(syndic_future.result(), func)(*args, **kwargs) return except SaltClientError: - log.error('Unable to call {0} on {1}, trying another...'.format(func, master_id)) + log.error('Unable to call {0} on {1}, trying another...'.format(func, master)) self._mark_master_dead(master) continue log.critical('Unable to call {0} on any masters!'.format(func)) + def _return_pub_syndic(self, values, master_id=None): + ''' + Wrapper to call the '_return_pub_multi' a syndic, best effort to get the one you asked for + ''' + func = '_return_pub_multi' + for master, syndic_future in self.iter_master_options(master_id): + if not syndic_future.done() or syndic_future.exception(): + log.error('Unable to call {0} on {1}, that syndic is not connected'.format(func, master)) + continue + + future, data = self.pub_futures.get(master, (None, None)) + if future is not None: + if not future.done(): + if master == master_id: + # Targeted master previous send not done yet, call again later + return False + else: + # Fallback master is busy, try the next one + continue + elif future.exception(): + # Previous execution on this master returned an error + log.error('Unable to call {0} on {1}, trying another...'.format(func, master)) + self._mark_master_dead(master) + del self.pub_futures[master] + # Add not sent data to the delayed list and try the next master + self.delayed.extend(data) + continue + future = getattr(syndic_future.result(), func)(values) + self.pub_futures[master] = (future, values) + return True + # Loop done and didn't exit: wasn't sent, try again later + return False + def iter_master_options(self, master_id=None): ''' Iterate (in order) over your options for master @@ -2405,7 +2485,7 @@ class MultiSyndic(MinionBase): master_id = masters.pop(0) def _reset_event_aggregation(self): - self.jids = {} + self.job_rets = {} self.raw_events = [] # Syndic Tune In @@ -2453,7 +2533,8 @@ class MultiSyndic(MinionBase): log.debug('Return received with matching master_id, not forwarding') return - jdict = self.jids.setdefault(event['tag'], {}) + master = event['data'].get('master_id') + jdict = self.job_rets.setdefault(master, {}).setdefault(event['tag'], {}) if not jdict: jdict['__fun__'] = event['data'].get('fun') jdict['__jid__'] = event['data']['jid'] @@ -2471,9 +2552,9 @@ class MultiSyndic(MinionBase): tmp = sorted(list(self.jid_forward_cache)) tmp.pop(0) self.jid_forward_cache = set(tmp) - if 'master_id' in event['data']: + if master is not None: # __'s to make sure it doesn't print out on the master cli - jdict['__master_id__'] = event['data']['master_id'] + jdict['__master_id__'] = master jdict[event['data']['id']] = event['data']['return'] else: # TODO: config to forward these? If so we'll have to keep track of who @@ -2487,20 +2568,24 @@ class MultiSyndic(MinionBase): def _forward_events(self): log.trace('Forwarding events') # pylint: disable=no-member if self.raw_events: + events = self.raw_events + self.raw_events = [] self._call_syndic('_fire_master', - kwargs={'events': self.raw_events, + kwargs={'events': events, 'pretag': tagify(self.opts['id'], base='syndic'), 'timeout': self.SYNDIC_EVENT_TIMEOUT, + 'sync': False, }, ) - for jid, jid_ret in self.jids.items(): - self._call_syndic('_return_pub', - args=(jid_ret, '_syndic_return'), - kwargs={'timeout': self.SYNDIC_EVENT_TIMEOUT}, - master_id=jid_ret.get('__master_id__'), - ) - - self._reset_event_aggregation() + if self.delayed: + res = self._return_pub_syndic(self.delayed) + if res: + self.delayed = [] + for master in list(six.iterkeys(self.job_rets)): + values = self.job_rets[master].values() + res = self._return_pub_syndic(values, master_id=master) + if res: + del self.job_rets[master] class Matcher(object): diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index b300d0b986..45aa7b39b9 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -376,7 +376,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran @tornado.gen.coroutine def connect(self): try: - self.auth = salt.crypt.AsyncAuth(self.opts) + self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop) self.tok = self.auth.gen_token('salt') if not self.auth.authenticated: yield self.auth.authenticate() @@ -551,7 +551,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object): log.trace('req client disconnected {0}'.format(address)) self.clients.remove((stream, address)) except Exception as e: - log.trace('other master-side exception??', e, e.__module__, e.extra) + log.trace('other master-side exception: {0}'.format(e)) self.clients.remove((stream, address)) stream.close()