Merge pull request #36799 from cachedout/carbon_develop

Carbon -> develop merge fwd
This commit is contained in:
Mike Place 2016-10-06 20:18:52 +09:00 committed by GitHub
commit bd199b8feb
8 changed files with 254 additions and 148 deletions

View File

@ -13,6 +13,7 @@ ioloop==0.1a0
ipaddress==1.0.16
Jinja2==2.8
libnacl==1.4.5
lxml==3.6.0
Mako==1.0.4
MarkupSafe==0.23
msgpack-python==0.4.8

View File

@ -290,6 +290,7 @@ class LocalClient(object):
timeout=None,
jid='',
kwarg=None,
listen=False,
**kwargs):
'''
Asynchronously send a command to connected minions
@ -316,6 +317,7 @@ class LocalClient(object):
ret,
jid=jid,
timeout=self._get_timeout(timeout),
listen=listen,
**kwargs)
except SaltClientError:
# Re-raise error with specific message
@ -564,32 +566,38 @@ class LocalClient(object):
function name.
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
listen=True,
**kwargs)
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
if not pub_data:
return pub_data
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
return ret
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
return ret
finally:
if not was_listening:
self.event.close_pub()
def cmd_cli(
self,
@ -614,40 +622,47 @@ class LocalClient(object):
:returns: A generator
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not fn_ret:
continue
if not pub_data:
yield pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
yield fn_ret
except KeyboardInterrupt:
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To '
'look up the return data for this job later run:\n'
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
raise SystemExit(msg)
if not fn_ret:
continue
yield fn_ret
except KeyboardInterrupt:
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To '
'look up the return data for this job later run:\n'
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
raise SystemExit(msg)
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter(
self,
@ -677,30 +692,37 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
if kwargs.get('yield_pub_data'):
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not pub_data:
yield pub_data
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
else:
if kwargs.get('yield_pub_data'):
yield pub_data
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter_no_block(
self,
@ -737,31 +759,38 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
self._clean_up_subscriptions(pub_data['jid'])
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_full_return(
self,
@ -778,24 +807,31 @@ class LocalClient(object):
Execute a salt command and return
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
if not pub_data:
return pub_data
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
finally:
if not was_listening:
self.event.close_pub()
def get_cli_returns(
self,
@ -1460,6 +1496,7 @@ class LocalClient(object):
ret='',
jid='',
timeout=5,
listen=False,
**kwargs):
'''
Take the required arguments and publish the given command.
@ -1509,6 +1546,10 @@ class LocalClient(object):
master_uri=master_uri)
try:
# Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError()
payload = channel.send(payload_kwargs, timeout=timeout)
except SaltReqTimeoutError:
raise SaltReqTimeoutError(

View File

@ -1511,7 +1511,8 @@ class Minion(MinionBase):
salt.utils.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
ret = {
'return': {},
'success': {},
'retcode': {},
'success': {}
}
for ind in range(0, len(data['fun'])):
ret['success'][data['fun'][ind]] = False
@ -1528,7 +1529,12 @@ class Minion(MinionBase):
func,
data['arg'][ind],
data)
minion_instance.functions.pack['__context__']['retcode'] = 0
ret['return'][data['fun'][ind]] = func(*args, **kwargs)
ret['retcode'][data['fun'][ind]] = minion_instance.functions.pack['__context__'].get(
'retcode',
0
)
ret['success'][data['fun'][ind]] = True
except Exception as exc:
trb = traceback.format_exc()

View File

@ -353,6 +353,16 @@ class IPCClient(object):
if self.stream is not None and not self.stream.closed():
self.stream.close()
# Remove the entry from the instance map so
# that a closed entry may not be reused.
# This forces this operation even if the reference
# count of the entry has not yet gone to zero.
if self.io_loop in IPCClient.instance_map:
loop_instance_map = IPCClient.instance_map[self.io_loop]
key = str(self.socket_path)
if key in loop_instance_map:
del loop_instance_map[key]
class IPCMessageClient(IPCClient):
'''

View File

@ -365,6 +365,18 @@ class SaltEvent(object):
self.cpub = True
return self.cpub
def close_pub(self):
'''
Close the publish connection (if established)
'''
if not self.cpub:
return
self.subscriber.close()
self.subscriber = None
self.pending_events = []
self.cpub = False
def connect_pull(self, timeout=1):
'''
Establish a connection with the event pull socket
@ -735,39 +747,65 @@ class SaltEvent(object):
if self._run_io_loop_sync and not self.keep_loop:
self.io_loop.close()
def _fire_ret_load_specific_fun(self, load, fun):
'''
Helper function for fire_ret_load
'''
if isinstance(load['fun'], list):
# Multi-function job
ret = load.get('return', {})
ret = ret.get(fun, {})
# This was already validated to exist and be non-zero in the
# caller.
retcode = load['retcode'][fun]
else:
# Single-function job
ret = load.get('return', {})
retcode = load['retcode']
try:
for tag, data in six.iteritems(ret):
data['retcode'] = retcode
tags = tag.split('_|-')
if data.get('result') is False:
self.fire_event(
data,
'{0}.{1}'.format(tags[0], tags[-1])
) # old dup event
data['jid'] = load['jid']
data['id'] = load['id']
data['success'] = False
data['return'] = 'Error: {0}.{1}'.format(
tags[0], tags[-1])
data['fun'] = fun
data['user'] = load['user']
self.fire_event(
data,
tagify([load['jid'],
'sub',
load['id'],
'error',
fun],
'job'))
except Exception:
pass
def fire_ret_load(self, load):
'''
Fire events based on information in the return load
'''
if load.get('retcode') and load.get('fun'):
# Minion fired a bad retcode, fire an event
if load['fun'] in SUB_EVENT:
try:
for tag, data in six.iteritems(load.get('return', {})):
data['retcode'] = load['retcode']
tags = tag.split('_|-')
if data.get('result') is False:
self.fire_event(
data,
'{0}.{1}'.format(tags[0], tags[-1])
) # old dup event
data['jid'] = load['jid']
data['id'] = load['id']
data['success'] = False
data['return'] = 'Error: {0}.{1}'.format(
tags[0], tags[-1])
data['fun'] = load['fun']
data['user'] = load['user']
self.fire_event(
data,
tagify([load['jid'],
'sub',
load['id'],
'error',
load['fun']],
'job'))
except Exception:
pass
if isinstance(load['fun'], list):
# Multi-function job
for fun in load['fun']:
if load['retcode'].get(fun, 0) and fun in SUB_EVENT:
# Minion fired a bad retcode, fire an event
self._fire_ret_load_specific_fun(load, fun)
else:
# Single-function job
if load['fun'] in SUB_EVENT:
# Minion fired a bad retcode, fire an event
self._fire_ret_load_specific_fun(load, load['fun'])
def set_event_handler(self, event_handler):
'''

View File

@ -261,7 +261,7 @@ class OptionParser(optparse.OptionParser, object):
)
if self._setup_mp_logging_listener_ is True:
# Stop the logging queue listener process
log.shutdown_multiprocessing_logging_listener()
log.shutdown_multiprocessing_logging_listener(daemonizing=True)
if isinstance(msg, six.string_types) and msg and msg[-1] != '\n':
msg = '{0}\n'.format(msg)
optparse.OptionParser.exit(self, status, msg)

View File

@ -1320,7 +1320,7 @@ class TestDaemon(object):
self.log_server_process.join()
# Shutdown the multiprocessing logging queue listener
salt_log_setup.shutdown_multiprocessing_logging()
salt_log_setup.shutdown_multiprocessing_logging_listener()
salt_log_setup.shutdown_multiprocessing_logging_listener(daemonizing=True)
def pre_setup_minions(self):
'''

View File

@ -148,6 +148,7 @@ class PkgModuleTest(integration.ModuleCase,
test_install()
test_remove()
@requires_salt_modules('pkg.hold')
@requires_network()
@destructiveTest
def test_hold_unhold(self):
@ -258,7 +259,7 @@ class PkgModuleTest(integration.ModuleCase,
if vim_version_dict == {}:
# Latest version is installed, get its version and construct
# a version selector so the immediately previous version is selected
vim_version_dict = self.run_function('pkg.info_available', ['vim'])
vim_version_dict = self.run_function('pkg.info', ['vim'])
vim_version = 'version=<'+vim_version_dict['vim']['version']
else:
# Vim was not installed, so pkg.latest_version returns the latest one.
@ -267,13 +268,20 @@ class PkgModuleTest(integration.ModuleCase,
# Install a version of vim that should need upgrading
ret = self.run_function('pkg.install', ['vim', vim_version])
if not isinstance(ret, dict):
if ret.startswith('ERROR'):
self.skipTest('Could not install earlier vim to complete test.')
else:
self.assertNotEqual(ret, {})
# Run a system upgrade, which should catch the fact that Vim needs upgrading, and upgrade it.
ret = self.run_function(func)
# The changes dictionary should not be empty.
self.assertIn('changes', ret)
self.assertIn('vim', ret['changes'])
if 'changes' in ret:
self.assertIn('vim', ret['changes'])
else:
self.assertIn('vim', ret)
else:
ret = self.run_function('pkg.list_updates')
if ret == '':
@ -283,6 +291,8 @@ class PkgModuleTest(integration.ModuleCase,
# The changes dictionary should not be empty.
self.assertNotEqual(ret, {})
if 'changes' in ret:
self.assertNotEqual(ret['changes'], {})
if __name__ == '__main__':