Fix race condition when returning events from commands

It has been observed that when running this command:

```
salt "*" test.ping
```

sometimes the command would return `Minion did not return. [No response]`
for some of the minions even though the minions did indeed respond
(reproduced running Windows salt-master on Python 3 using the TCP
transport).

After investigating this further, it seems that there is a race condition
where if the response via event happens before events are being listened
for, the response is lost. For instance, in
`salt.client.LocalClient.cmd_cli` which is what is invoked in the command
above, it won't start listening for events until `get_cli_event_returns`
which invokes `get_iter_returns` which invokes `get_returns_no_block`
which invokes `self.event.get_event` which will connect to the event bus
if it hasn't connected yet (which is the case the first time it hits
this code). But events may be fired anytime after `self.pub()` is
executed which occurs before this code.

We need to ensure that events are being listened for before it is
possible they return. We also want to avoid issue #31454 which is what
PR #36024 fixed but in turn caused this issue. This is the approach I
have taken to try to tackle this issue:

It doesn't seem possible to generically discern if events can be
returned by a given function that invokes `run_job` and contains an
event searching function such as `get_cli_event_returns`. So for all
such functions that could possibly need to search the event bus, we
do the following:
- Record if the event bus is currently being listened to.
- When invoking `run_job`, ensure that `listen=True` so that `self.pub()`
will ensure that the event bus is listed to before sending the payload.
- When all possible event bus activities are concluded, if the event
bus was not originally being listened to, stop listening to it. This is
designed so that issue #31454 does not reappear. We do this via a
try/finally block in all instances of such code.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2016-09-30 10:49:24 -05:00
parent fb2add012f
commit c59a5adb0d
2 changed files with 167 additions and 114 deletions

View File

@ -290,6 +290,7 @@ class LocalClient(object):
timeout=None,
jid='',
kwarg=None,
listen=False,
**kwargs):
'''
Asynchronously send a command to connected minions
@ -316,6 +317,7 @@ class LocalClient(object):
ret,
jid=jid,
timeout=self._get_timeout(timeout),
listen=listen,
**kwargs)
except SaltClientError:
# Re-raise error with specific message
@ -564,32 +566,38 @@ class LocalClient(object):
function name.
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
listen=True,
**kwargs)
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
if not pub_data:
return pub_data
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
return ret
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
return ret
finally:
if not was_listening:
self.event.close_pub()
def cmd_cli(
self,
@ -614,40 +622,47 @@ class LocalClient(object):
:returns: A generator
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not fn_ret:
continue
if not pub_data:
yield pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
yield fn_ret
except KeyboardInterrupt:
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To '
'look up the return data for this job later run:\n'
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
raise SystemExit(msg)
if not fn_ret:
continue
yield fn_ret
except KeyboardInterrupt:
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To '
'look up the return data for this job later run:\n'
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
raise SystemExit(msg)
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter(
self,
@ -677,30 +692,37 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
if kwargs.get('yield_pub_data'):
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not pub_data:
yield pub_data
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
else:
if kwargs.get('yield_pub_data'):
yield pub_data
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter_no_block(
self,
@ -737,31 +759,38 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
self._clean_up_subscriptions(pub_data['jid'])
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_full_return(
self,
@ -778,24 +807,31 @@ class LocalClient(object):
Execute a salt command and return
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
if not pub_data:
return pub_data
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
finally:
if not was_listening:
self.event.close_pub()
def get_cli_returns(
self,
@ -1460,6 +1496,7 @@ class LocalClient(object):
ret='',
jid='',
timeout=5,
listen=False,
**kwargs):
'''
Take the required arguments and publish the given command.
@ -1509,6 +1546,10 @@ class LocalClient(object):
master_uri=master_uri)
try:
# Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError()
payload = channel.send(payload_kwargs, timeout=timeout)
except SaltReqTimeoutError:
raise SaltReqTimeoutError(

View File

@ -365,6 +365,18 @@ class SaltEvent(object):
self.cpub = True
return self.cpub
def close_pub(self):
'''
Close the publish connection (if established)
'''
if not self.cpub:
return
self.subscriber.close()
self.subscriber = None
self.pending_events = []
self.cpub = False
def connect_pull(self, timeout=1):
'''
Establish a connection with the event pull socket