From c59a5adb0d0480bbe2ed42c361d95fac2af2718b Mon Sep 17 00:00:00 2001 From: Sergey Kizunov Date: Fri, 30 Sep 2016 10:49:24 -0500 Subject: [PATCH] Fix race condition when returning events from commands It has been observed that when running this command: ``` salt "*" test.ping ``` sometimes the command would return `Minion did not return. [No response]` for some of the minions even though the minions did indeed respond (reproduced running Windows salt-master on Python 3 using the TCP transport). After investigating this further, it seems that there is a race condition where if the response via event happens before events are being listened for, the response is lost. For instance, in `salt.client.LocalClient.cmd_cli` which is what is invoked in the command above, it won't start listening for events until `get_cli_event_returns` which invokes `get_iter_returns` which invokes `get_returns_no_block` which invokes `self.event.get_event` which will connect to the event bus if it hasn't connected yet (which is the case the first time it hits this code). But events may be fired anytime after `self.pub()` is executed which occurs before this code. We need to ensure that events are being listened for before it is possible they return. We also want to avoid issue #31454 which is what PR #36024 fixed but in turn caused this issue. This is the approach I have taken to try to tackle this issue: It doesn't seem possible to generically discern if events can be returned by a given function that invokes `run_job` and contains an event searching function such as `get_cli_event_returns`. So for all such functions that could possibly need to search the event bus, we do the following: - Record if the event bus is currently being listened to. - When invoking `run_job`, ensure that `listen=True` so that `self.pub()` will ensure that the event bus is listed to before sending the payload. - When all possible event bus activities are concluded, if the event bus was not originally being listened to, stop listening to it. This is designed so that issue #31454 does not reappear. We do this via a try/finally block in all instances of such code. Signed-off-by: Sergey Kizunov --- salt/client/__init__.py | 269 +++++++++++++++++++++++----------------- salt/utils/event.py | 12 ++ 2 files changed, 167 insertions(+), 114 deletions(-) diff --git a/salt/client/__init__.py b/salt/client/__init__.py index eb93591d5a..7b18eac554 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -290,6 +290,7 @@ class LocalClient(object): timeout=None, jid='', kwarg=None, + listen=False, **kwargs): ''' Asynchronously send a command to connected minions @@ -316,6 +317,7 @@ class LocalClient(object): ret, jid=jid, timeout=self._get_timeout(timeout), + listen=listen, **kwargs) except SaltClientError: # Re-raise error with specific message @@ -564,32 +566,38 @@ class LocalClient(object): function name. ''' arg = salt.utils.args.condition_input(arg, kwarg) - pub_data = self.run_job(tgt, - fun, - arg, - expr_form, - ret, - timeout, - jid, - **kwargs) + was_listening = self.event.cpub - if not pub_data: - return pub_data + try: + pub_data = self.run_job(tgt, + fun, + arg, + expr_form, + ret, + timeout, + jid, + listen=True, + **kwargs) - ret = {} - for fn_ret in self.get_cli_event_returns( - pub_data['jid'], - pub_data['minions'], - self._get_timeout(timeout), - tgt, - expr_form, - **kwargs): + if not pub_data: + return pub_data - if fn_ret: - for mid, data in six.iteritems(fn_ret): - ret[mid] = data.get('ret', {}) + ret = {} + for fn_ret in self.get_cli_event_returns( + pub_data['jid'], + pub_data['minions'], + self._get_timeout(timeout), + tgt, + expr_form, + **kwargs): - return ret + if fn_ret: + for mid, data in six.iteritems(fn_ret): + ret[mid] = data.get('ret', {}) + return ret + finally: + if not was_listening: + self.event.close_pub() def cmd_cli( self, @@ -614,40 +622,47 @@ class LocalClient(object): :returns: A generator ''' arg = salt.utils.args.condition_input(arg, kwarg) - pub_data = self.run_job( - tgt, - fun, - arg, - expr_form, - ret, - timeout, - **kwargs) + was_listening = self.event.cpub - if not pub_data: - yield pub_data - else: - try: - for fn_ret in self.get_cli_event_returns( - pub_data['jid'], - pub_data['minions'], - self._get_timeout(timeout), - tgt, - expr_form, - verbose, - progress, - **kwargs): + try: + pub_data = self.run_job( + tgt, + fun, + arg, + expr_form, + ret, + timeout, + listen=True, + **kwargs) - if not fn_ret: - continue + if not pub_data: + yield pub_data + else: + try: + for fn_ret in self.get_cli_event_returns( + pub_data['jid'], + pub_data['minions'], + self._get_timeout(timeout), + tgt, + expr_form, + verbose, + progress, + **kwargs): - yield fn_ret - except KeyboardInterrupt: - msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n' - 'The minions may not have all finished running and any ' - 'remaining minions will return upon completion. To ' - 'look up the return data for this job later run:\n' - 'salt-run jobs.lookup_jid {0}').format(pub_data['jid']) - raise SystemExit(msg) + if not fn_ret: + continue + + yield fn_ret + except KeyboardInterrupt: + msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n' + 'The minions may not have all finished running and any ' + 'remaining minions will return upon completion. To ' + 'look up the return data for this job later run:\n' + 'salt-run jobs.lookup_jid {0}').format(pub_data['jid']) + raise SystemExit(msg) + finally: + if not was_listening: + self.event.close_pub() def cmd_iter( self, @@ -677,30 +692,37 @@ class LocalClient(object): {'stewart': {'ret': True}} ''' arg = salt.utils.args.condition_input(arg, kwarg) - pub_data = self.run_job( - tgt, - fun, - arg, - expr_form, - ret, - timeout, - **kwargs) + was_listening = self.event.cpub - if not pub_data: - yield pub_data - else: - if kwargs.get('yield_pub_data'): + try: + pub_data = self.run_job( + tgt, + fun, + arg, + expr_form, + ret, + timeout, + listen=True, + **kwargs) + + if not pub_data: yield pub_data - for fn_ret in self.get_iter_returns(pub_data['jid'], - pub_data['minions'], - timeout=self._get_timeout(timeout), - tgt=tgt, - tgt_type=expr_form, - **kwargs): - if not fn_ret: - continue - yield fn_ret - self._clean_up_subscriptions(pub_data['jid']) + else: + if kwargs.get('yield_pub_data'): + yield pub_data + for fn_ret in self.get_iter_returns(pub_data['jid'], + pub_data['minions'], + timeout=self._get_timeout(timeout), + tgt=tgt, + tgt_type=expr_form, + **kwargs): + if not fn_ret: + continue + yield fn_ret + self._clean_up_subscriptions(pub_data['jid']) + finally: + if not was_listening: + self.event.close_pub() def cmd_iter_no_block( self, @@ -737,31 +759,38 @@ class LocalClient(object): {'stewart': {'ret': True}} ''' arg = salt.utils.args.condition_input(arg, kwarg) - pub_data = self.run_job( - tgt, - fun, - arg, - expr_form, - ret, - timeout, - **kwargs) + was_listening = self.event.cpub - if not pub_data: - yield pub_data - else: - for fn_ret in self.get_iter_returns(pub_data['jid'], - pub_data['minions'], - timeout=timeout, - tgt=tgt, - tgt_type=expr_form, - block=False, - **kwargs): - if fn_ret and any([show_jid, verbose]): - for minion in fn_ret.keys(): - fn_ret[minion]['jid'] = pub_data['jid'] - yield fn_ret + try: + pub_data = self.run_job( + tgt, + fun, + arg, + expr_form, + ret, + timeout, + listen=True, + **kwargs) - self._clean_up_subscriptions(pub_data['jid']) + if not pub_data: + yield pub_data + else: + for fn_ret in self.get_iter_returns(pub_data['jid'], + pub_data['minions'], + timeout=timeout, + tgt=tgt, + tgt_type=expr_form, + block=False, + **kwargs): + if fn_ret and any([show_jid, verbose]): + for minion in fn_ret.keys(): + fn_ret[minion]['jid'] = pub_data['jid'] + yield fn_ret + + self._clean_up_subscriptions(pub_data['jid']) + finally: + if not was_listening: + self.event.close_pub() def cmd_full_return( self, @@ -778,24 +807,31 @@ class LocalClient(object): Execute a salt command and return ''' arg = salt.utils.args.condition_input(arg, kwarg) - pub_data = self.run_job( - tgt, - fun, - arg, - expr_form, - ret, - timeout, - **kwargs) + was_listening = self.event.cpub - if not pub_data: - return pub_data + try: + pub_data = self.run_job( + tgt, + fun, + arg, + expr_form, + ret, + timeout, + listen=True, + **kwargs) - return (self.get_cli_static_event_returns(pub_data['jid'], - pub_data['minions'], - timeout, - tgt, - expr_form, - verbose)) + if not pub_data: + return pub_data + + return (self.get_cli_static_event_returns(pub_data['jid'], + pub_data['minions'], + timeout, + tgt, + expr_form, + verbose)) + finally: + if not was_listening: + self.event.close_pub() def get_cli_returns( self, @@ -1460,6 +1496,7 @@ class LocalClient(object): ret='', jid='', timeout=5, + listen=False, **kwargs): ''' Take the required arguments and publish the given command. @@ -1509,6 +1546,10 @@ class LocalClient(object): master_uri=master_uri) try: + # Ensure that the event subscriber is connected. + # If not, we won't get a response, so error out + if listen and not self.event.connect_pub(timeout=timeout): + raise SaltReqTimeoutError() payload = channel.send(payload_kwargs, timeout=timeout) except SaltReqTimeoutError: raise SaltReqTimeoutError( diff --git a/salt/utils/event.py b/salt/utils/event.py index 70b3c423f1..30cc92fad2 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -365,6 +365,18 @@ class SaltEvent(object): self.cpub = True return self.cpub + def close_pub(self): + ''' + Close the publish connection (if established) + ''' + if not self.cpub: + return + + self.subscriber.close() + self.subscriber = None + self.pending_events = [] + self.cpub = False + def connect_pull(self, timeout=1): ''' Establish a connection with the event pull socket