mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 08:58:59 +00:00
Fix race condition when returning events from commands
It has been observed that when running this command: ``` salt "*" test.ping ``` sometimes the command would return `Minion did not return. [No response]` for some of the minions even though the minions did indeed respond (reproduced running Windows salt-master on Python 3 using the TCP transport). After investigating this further, it seems that there is a race condition where if the response via event happens before events are being listened for, the response is lost. For instance, in `salt.client.LocalClient.cmd_cli` which is what is invoked in the command above, it won't start listening for events until `get_cli_event_returns` which invokes `get_iter_returns` which invokes `get_returns_no_block` which invokes `self.event.get_event` which will connect to the event bus if it hasn't connected yet (which is the case the first time it hits this code). But events may be fired anytime after `self.pub()` is executed which occurs before this code. We need to ensure that events are being listened for before it is possible they return. We also want to avoid issue #31454 which is what PR #36024 fixed but in turn caused this issue. This is the approach I have taken to try to tackle this issue: It doesn't seem possible to generically discern if events can be returned by a given function that invokes `run_job` and contains an event searching function such as `get_cli_event_returns`. So for all such functions that could possibly need to search the event bus, we do the following: - Record if the event bus is currently being listened to. - When invoking `run_job`, ensure that `listen=True` so that `self.pub()` will ensure that the event bus is listed to before sending the payload. - When all possible event bus activities are concluded, if the event bus was not originally being listened to, stop listening to it. This is designed so that issue #31454 does not reappear. We do this via a try/finally block in all instances of such code. Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
parent
fb2add012f
commit
c59a5adb0d
@ -290,6 +290,7 @@ class LocalClient(object):
|
||||
timeout=None,
|
||||
jid='',
|
||||
kwarg=None,
|
||||
listen=False,
|
||||
**kwargs):
|
||||
'''
|
||||
Asynchronously send a command to connected minions
|
||||
@ -316,6 +317,7 @@ class LocalClient(object):
|
||||
ret,
|
||||
jid=jid,
|
||||
timeout=self._get_timeout(timeout),
|
||||
listen=listen,
|
||||
**kwargs)
|
||||
except SaltClientError:
|
||||
# Re-raise error with specific message
|
||||
@ -564,32 +566,38 @@ class LocalClient(object):
|
||||
function name.
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
jid,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
try:
|
||||
pub_data = self.run_job(tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
jid,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
ret = {}
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
**kwargs):
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
|
||||
if fn_ret:
|
||||
for mid, data in six.iteritems(fn_ret):
|
||||
ret[mid] = data.get('ret', {})
|
||||
ret = {}
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
**kwargs):
|
||||
|
||||
return ret
|
||||
if fn_ret:
|
||||
for mid, data in six.iteritems(fn_ret):
|
||||
ret[mid] = data.get('ret', {})
|
||||
return ret
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_cli(
|
||||
self,
|
||||
@ -614,40 +622,47 @@ class LocalClient(object):
|
||||
:returns: A generator
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
try:
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose,
|
||||
progress,
|
||||
**kwargs):
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
if not fn_ret:
|
||||
continue
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
try:
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose,
|
||||
progress,
|
||||
**kwargs):
|
||||
|
||||
yield fn_ret
|
||||
except KeyboardInterrupt:
|
||||
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
|
||||
'The minions may not have all finished running and any '
|
||||
'remaining minions will return upon completion. To '
|
||||
'look up the return data for this job later run:\n'
|
||||
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
|
||||
raise SystemExit(msg)
|
||||
if not fn_ret:
|
||||
continue
|
||||
|
||||
yield fn_ret
|
||||
except KeyboardInterrupt:
|
||||
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
|
||||
'The minions may not have all finished running and any '
|
||||
'remaining minions will return upon completion. To '
|
||||
'look up the return data for this job later run:\n'
|
||||
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
|
||||
raise SystemExit(msg)
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_iter(
|
||||
self,
|
||||
@ -677,30 +692,37 @@ class LocalClient(object):
|
||||
{'stewart': {'ret': True}}
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
if kwargs.get('yield_pub_data'):
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=self._get_timeout(timeout),
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
**kwargs):
|
||||
if not fn_ret:
|
||||
continue
|
||||
yield fn_ret
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
else:
|
||||
if kwargs.get('yield_pub_data'):
|
||||
yield pub_data
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=self._get_timeout(timeout),
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
**kwargs):
|
||||
if not fn_ret:
|
||||
continue
|
||||
yield fn_ret
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_iter_no_block(
|
||||
self,
|
||||
@ -737,31 +759,38 @@ class LocalClient(object):
|
||||
{'stewart': {'ret': True}}
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=timeout,
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
block=False,
|
||||
**kwargs):
|
||||
if fn_ret and any([show_jid, verbose]):
|
||||
for minion in fn_ret.keys():
|
||||
fn_ret[minion]['jid'] = pub_data['jid']
|
||||
yield fn_ret
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=timeout,
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
block=False,
|
||||
**kwargs):
|
||||
if fn_ret and any([show_jid, verbose]):
|
||||
for minion in fn_ret.keys():
|
||||
fn_ret[minion]['jid'] = pub_data['jid']
|
||||
yield fn_ret
|
||||
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_full_return(
|
||||
self,
|
||||
@ -778,24 +807,31 @@ class LocalClient(object):
|
||||
Execute a salt command and return
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
return (self.get_cli_static_event_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout,
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose))
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
|
||||
return (self.get_cli_static_event_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout,
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose))
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def get_cli_returns(
|
||||
self,
|
||||
@ -1460,6 +1496,7 @@ class LocalClient(object):
|
||||
ret='',
|
||||
jid='',
|
||||
timeout=5,
|
||||
listen=False,
|
||||
**kwargs):
|
||||
'''
|
||||
Take the required arguments and publish the given command.
|
||||
@ -1509,6 +1546,10 @@ class LocalClient(object):
|
||||
master_uri=master_uri)
|
||||
|
||||
try:
|
||||
# Ensure that the event subscriber is connected.
|
||||
# If not, we won't get a response, so error out
|
||||
if listen and not self.event.connect_pub(timeout=timeout):
|
||||
raise SaltReqTimeoutError()
|
||||
payload = channel.send(payload_kwargs, timeout=timeout)
|
||||
except SaltReqTimeoutError:
|
||||
raise SaltReqTimeoutError(
|
||||
|
@ -365,6 +365,18 @@ class SaltEvent(object):
|
||||
self.cpub = True
|
||||
return self.cpub
|
||||
|
||||
def close_pub(self):
|
||||
'''
|
||||
Close the publish connection (if established)
|
||||
'''
|
||||
if not self.cpub:
|
||||
return
|
||||
|
||||
self.subscriber.close()
|
||||
self.subscriber = None
|
||||
self.pending_events = []
|
||||
self.cpub = False
|
||||
|
||||
def connect_pull(self, timeout=1):
|
||||
'''
|
||||
Establish a connection with the event pull socket
|
||||
|
Loading…
Reference in New Issue
Block a user