Merge branch 'develop' into add-choco-no-progress-option

This commit is contained in:
garethgreenaway 2017-06-06 10:30:05 -07:00 committed by GitHub
commit b260b5f380
25 changed files with 186 additions and 91 deletions

View File

@ -655,7 +655,7 @@ def create(vm_):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args=__utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
args=__utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -1201,8 +1201,8 @@ def create(vm_):
def _query_ip_address():
data = request_instance(kwargs=vm_)
ifaces = data['network_profile']['network_interfaces']
iface = ifaces.keys()[0]
ip_name = ifaces[iface]['ip_configurations'].keys()[0]
iface = list(ifaces)[0]
ip_name = list(ifaces[iface]['ip_configurations'])[0]
if vm_.get('public_ip') is True:
hostname = ifaces[iface]['ip_configurations'][ip_name]['public_ip_address']

View File

@ -432,7 +432,7 @@ def create(vm_):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args=__utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
args=__utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -257,7 +257,7 @@ def create(vm_):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args=__utils__['cloud.filter_event']('requesting', event_data, event_data.keys()),
args=__utils__['cloud.filter_event']('requesting', event_data, list(event_data)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -43,11 +43,11 @@ from __future__ import absolute_import
import pprint
import logging
import time
import hashlib
# Import salt cloud libs
import salt.config as config
import salt.utils.cloud
import salt.utils.hashutils
from salt.exceptions import SaltCloudSystemExit, SaltCloudException
# Get logging started
@ -112,7 +112,7 @@ def create(vm_):
public_ips = list_public_ips()
if len(public_ips.keys()) < 1:
raise SaltCloudException('No more IPs available')
host_ip = public_ips.keys()[0]
host_ip = list(public_ips)[0]
create_kwargs = {
'name': vm_['name'],
@ -126,7 +126,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', create_kwargs, create_kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', create_kwargs, list(create_kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
@ -534,7 +534,7 @@ def _query(action=None,
epoch = str(int(time.time()))
hashtext = ''.join((apikey, sharedsecret, epoch))
args['sig'] = hashlib.md5(hashtext).hexdigest()
args['sig'] = salt.utils.hashutils.md5_digest(hashtext)
args['format'] = 'json'
args['v'] = '1.0'
args['api_key'] = apikey

View File

@ -290,7 +290,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -341,7 +341,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(name),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -538,7 +538,7 @@ def create(vm_):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args=__utils__['cloud.filter_event']('requesting', event_kwargs, event_kwargs.keys()),
args=__utils__['cloud.filter_event']('requesting', event_kwargs, list(event_kwargs)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -686,7 +686,7 @@ def request_instance(vm_=None, call=None):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', event_kwargs, event_kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', event_kwargs, list(event_kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
@ -1054,7 +1054,7 @@ def create(vm_):
'event',
'created instance',
'salt/cloud/{0}/created'.format(vm_['name']),
args=__utils__['cloud.filter_event']('created', event_data, event_data.keys()),
args=__utils__['cloud.filter_event']('created', event_data, list(event_data)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -1029,7 +1029,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
)

View File

@ -556,7 +556,7 @@ def request_instance(vm_=None, call=None):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', event_kwargs, event_kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', event_kwargs, list(event_kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -258,7 +258,7 @@ def create_node(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', data, data.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', data, list(data)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -762,7 +762,7 @@ def create_node(vm_, newid):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', newnode, newnode.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', newnode, list(newnode)),
},
sock_dir=__opts__['sock_dir'],
)

View File

@ -694,7 +694,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', params, params.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', params, list(params)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -246,7 +246,7 @@ def create(server_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(server_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -373,7 +373,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(name),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -326,7 +326,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(name),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']

View File

@ -155,7 +155,7 @@ def create(vm_info):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_info['name']),
args=__utils__['cloud.filter_event']('requesting', request_kwargs, request_kwargs.keys()),
args=__utils__['cloud.filter_event']('requesting', request_kwargs, list(request_kwargs)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)
@ -181,7 +181,7 @@ def create(vm_info):
'event',
'created machine',
'salt/cloud/{0}/created'.format(vm_info['name']),
args=__utils__['cloud.filter_event']('created', vm_result, vm_result.keys()),
args=__utils__['cloud.filter_event']('created', vm_result, list(vm_result)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -2648,7 +2648,7 @@ def create(vm_):
'event',
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args=__utils__['cloud.filter_event']('requesting', event_kwargs, event_kwargs.keys()),
args=__utils__['cloud.filter_event']('requesting', event_kwargs, list(event_kwargs)),
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport']
)

View File

@ -284,7 +284,7 @@ def create(vm_):
'requesting instance',
'salt/cloud/{0}/requesting'.format(vm_['name']),
args={
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, kwargs.keys()),
'kwargs': __utils__['cloud.filter_event']('requesting', kwargs, list(kwargs)),
},
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport'],

View File

@ -16,6 +16,7 @@ Dependencies
- :mod:`napalm proxy minion <salt.proxy.napalm>`
.. versionadded:: 2016.11.0
.. versionchanged:: Nitrogen
'''
from __future__ import absolute_import
@ -1037,7 +1038,7 @@ def load_template(template_name,
template_mode='755',
saltenv=None,
template_engine='jinja',
skip_verify=True,
skip_verify=False,
defaults=None,
test=False,
commit=True,

View File

@ -41,6 +41,11 @@ def _ping(tgt, tgt_type, timeout, gather_job_timeout):
if not pub_data:
return pub_data
log.debug(
'manage runner will ping the following minion(s): %s',
', '.join(sorted(pub_data['minions']))
)
returned = set()
for fn_ret in client.get_cli_event_returns(
pub_data['jid'],
@ -50,13 +55,16 @@ def _ping(tgt, tgt_type, timeout, gather_job_timeout):
tgt_type,
gather_job_timeout=gather_job_timeout):
log.debug('fn_ret: %s', fn_ret)
if fn_ret:
for mid, _ in six.iteritems(fn_ret):
log.debug('minion \'%s\' returned from ping', mid)
returned.add(mid)
not_returned = set(pub_data['minions']) - returned
not_returned = sorted(set(pub_data['minions']) - returned)
returned = sorted(returned)
return list(returned), list(not_returned)
return returned, not_returned
def status(output=True, tgt='*', tgt_type='glob', expr_form=None, timeout=None, gather_job_timeout=None):

View File

@ -796,6 +796,7 @@ class SaltMessageClient(object):
self._on_recv = None
self._closing = False
self._connecting_future = self.connect()
self._stream_return_future = tornado.concurrent.Future()
self.io_loop.spawn_callback(self._stream_return)
# TODO: timeout inflight sessions
@ -813,6 +814,24 @@ class SaltMessageClient(object):
# the next message and the associated read future is marked
# 'StreamClosedError' when the stream is closed.
self._read_until_future.exc_info()
if (not self._stream_return_future.done() and
self.io_loop != tornado.ioloop.IOLoop.current(
instance=False)):
# If _stream_return() hasn't completed, it means the IO
# Loop is stopped (such as when using
# 'salt.utils.async.SyncWrapper'). Ensure that
# _stream_return() completes by restarting the IO Loop.
# This will prevent potential errors on shutdown.
orig_loop = tornado.ioloop.IOLoop.current()
self.io_loop.make_current()
try:
self.io_loop.add_future(
self._stream_return_future,
lambda future: self.io_loop.stop()
)
self.io_loop.start()
finally:
orig_loop.make_current()
self._tcp_client.close()
# Clear callback references to allow the object that they belong to
# to be deleted.
@ -863,66 +882,69 @@ class SaltMessageClient(object):
@tornado.gen.coroutine
def _stream_return(self):
while not self._closing and (
not self._connecting_future.done() or
self._connecting_future.result() is not True):
yield self._connecting_future
unpacker = msgpack.Unpacker()
while not self._closing:
try:
self._read_until_future = self._stream.read_bytes(4096, partial=True)
wire_bytes = yield self._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
if six.PY3:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
header = framed_msg['head']
body = framed_msg['body']
message_id = header.get('mid')
try:
while not self._closing and (
not self._connecting_future.done() or
self._connecting_future.result() is not True):
yield self._connecting_future
unpacker = msgpack.Unpacker()
while not self._closing:
try:
self._read_until_future = self._stream.read_bytes(4096, partial=True)
wire_bytes = yield self._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
if six.PY3:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
header = framed_msg['head']
body = framed_msg['body']
message_id = header.get('mid')
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_result(body)
self.remove_message_timeout(message_id)
else:
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, body)
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_result(body)
self.remove_message_timeout(message_id)
else:
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
except tornado.iostream.StreamClosedError as e:
log.debug('tcp stream to {0}:{1} closed, unable to recv'.format(self.host, self.port))
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
except TypeError:
# This is an invalid transport
if 'detect_mode' in self.opts:
log.info('There was an error trying to use TCP transport; '
'attempting to fallback to another transport')
else:
raise SaltClientError
except Exception as e:
log.error('Exception parsing response', exc_info=True)
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, body)
else:
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
except tornado.iostream.StreamClosedError as e:
log.debug('tcp stream to {0}:{1} closed, unable to recv'.format(self.host, self.port))
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
except TypeError:
# This is an invalid transport
if 'detect_mode' in self.opts:
log.info('There was an error trying to use TCP transport; '
'attempting to fallback to another transport')
else:
raise SaltClientError
except Exception as e:
log.error('Exception parsing response', exc_info=True)
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
finally:
self._stream_return_future.set_result(True)
@tornado.gen.coroutine
def _stream_send(self):

View File

@ -1756,6 +1756,12 @@ def filter_event(tag, data, defaults):
if use_defaults is False:
defaults = []
# For PY3, if something like ".keys()" or ".values()" is used on a dictionary,
# it returns a dict_view and not a list like in PY2. "defaults" should be passed
# in with the correct data type, but don't stack-trace in case it wasn't.
if not isinstance(defaults, list):
defaults = list(defaults)
defaults = list(set(defaults + keys))
for key in defaults:

View File

@ -35,6 +35,15 @@ try:
except ImportError:
HAS_NAPALM = False
try:
# try importing ConnectionClosedException
# from napalm-base
# this exception has been introduced only in version 0.24.0
from napalm_base.exceptions import ConnectionClosedException
HAS_CONN_CLOSED_EXC_CLASS = True
except ImportError:
HAS_CONN_CLOSED_EXC_CLASS = False
from salt.ext import six as six
@ -129,6 +138,7 @@ def call(napalm_device, method, *args, **kwargs):
result = False
out = None
opts = napalm_device.get('__opts__', {})
retry = kwargs.pop('__retry', True) # retry executing the task?
try:
if not napalm_device.get('UP', False):
raise Exception('not connected')
@ -146,12 +156,52 @@ def call(napalm_device, method, *args, **kwargs):
except Exception as error:
# either not connected
# either unable to execute the command
hostname = napalm_device.get('HOSTNAME', '[unspecified hostname]')
err_tb = traceback.format_exc() # let's get the full traceback and display for debugging reasons.
if isinstance(error, NotImplementedError):
comment = '{method} is not implemented for the NAPALM {driver} driver!'.format(
method=method,
driver=napalm_device.get('DRIVER_NAME')
)
elif retry and HAS_CONN_CLOSED_EXC_CLASS and isinstance(error, ConnectionClosedException):
# Received disconection whilst executing the operation.
# Instructed to retry (default behaviour)
# thus trying to re-establish the connection
# and re-execute the command
# if any of the operations (close, open, call) will rise again ConnectionClosedException
# it will fail loudly.
kwargs['__retry'] = False # do not attempt re-executing
comment = 'Disconnected from {device}. Trying to reconnect.'.format(device=hostname)
log.error(err_tb)
log.error(comment)
log.debug('Clearing the connection with {device}'.format(device=hostname))
call(napalm_device, 'close', __retry=False) # safely close the connection
# Make sure we don't leave any TCP connection open behind
# if we fail to close properly, we might not be able to access the
log.debug('Re-opening the connection with {device}'.format(device=hostname))
call(napalm_device, 'open', __retry=False)
log.debug('Connection re-opened with {device}'.format(device=hostname))
log.debug('Re-executing {method}'.format(method=method))
return call(napalm_device, method, *args, **kwargs)
# If still not able to reconnect and execute the task,
# the proxy keepalive feature (if enabled) will attempt
# to reconnect.
# If the device is using a SSH-based connection, the failure
# will also notify the paramiko transport and the `is_alive` flag
# is going to be set correctly.
# More background: the network device may decide to disconnect,
# although the SSH session itself is alive and usable, the reason
# being the lack of activity on the CLI.
# Paramiko's keepalive doesn't help in this case, as the ServerAliveInterval
# are targeting the transport layer, whilst the device takes the decision
# when there isn't any activity on the CLI, thus at the application layer.
# Moreover, the disconnect is silent and paramiko's is_alive flag will
# continue to return True, although the connection is already unusable.
# For more info, see https://github.com/paramiko/paramiko/issues/813.
# But after a command fails, the `is_alive` flag becomes aware of these
# changes and will return False from there on. And this is how the
# Salt proxy keepalive helps: immediately after the first failure, it
# will know the state of the connection and will try reconnecting.
else:
comment = 'Cannot execute "{method}" on {device}{port} as {user}. Reason: {error}!'.format(
device=napalm_device.get('HOSTNAME', '[unspecified hostname]'),
@ -199,10 +249,18 @@ def get_device_opts(opts, salt_obj=None):
# still not able to setup
log.error('Incorrect minion config. Please specify at least the napalm driver name!')
# either under the proxy hier, either under the napalm in the config file
network_device['HOSTNAME'] = device_dict.get('host') or device_dict.get('hostname')
network_device['USERNAME'] = device_dict.get('username') or device_dict.get('user')
network_device['DRIVER_NAME'] = device_dict.get('driver') or device_dict.get('os')
network_device['PASSWORD'] = device_dict.get('passwd') or device_dict.get('password') or device_dict.get('pass')
network_device['HOSTNAME'] = device_dict.get('host') or \
device_dict.get('hostname') or \
device_dict.get('fqdn') or \
device_dict.get('ip')
network_device['USERNAME'] = device_dict.get('username') or \
device_dict.get('user')
network_device['DRIVER_NAME'] = device_dict.get('driver') or \
device_dict.get('os')
network_device['PASSWORD'] = device_dict.get('passwd') or \
device_dict.get('password') or \
device_dict.get('pass') or \
''
network_device['TIMEOUT'] = device_dict.get('timeout', 60)
network_device['OPTIONAL_ARGS'] = device_dict.get('optional_args', {})
network_device['ALWAYS_ALIVE'] = device_dict.get('always_alive', True)