mirror of
https://github.com/valitydev/salt.git
synced 2024-11-06 16:45:27 +00:00
Merge branch '2018.3' into ping_reaction_test
This commit is contained in:
commit
7c963c16b3
@ -539,7 +539,11 @@ def _run(cmd,
|
||||
run_env = env
|
||||
|
||||
else:
|
||||
run_env = os.environ.copy()
|
||||
if salt.utils.platform.is_windows():
|
||||
import nt
|
||||
run_env = nt.environ.copy()
|
||||
else:
|
||||
run_env = os.environ.copy()
|
||||
run_env.update(env)
|
||||
|
||||
if prepend_path:
|
||||
@ -3033,7 +3037,12 @@ def shell_info(shell, list_modules=False):
|
||||
# salt-call will general have home set, the salt-minion service may not
|
||||
# We need to assume ports of unix shells to windows will look after
|
||||
# themselves in setting HOME as they do it in many different ways
|
||||
newenv = os.environ
|
||||
if salt.utils.platform.is_windows():
|
||||
import nt
|
||||
newenv = nt.environ
|
||||
else:
|
||||
newenv = os.environ
|
||||
|
||||
if ('HOME' not in newenv) and (not salt.utils.platform.is_windows()):
|
||||
newenv['HOME'] = os.path.expanduser('~')
|
||||
log.debug('HOME environment set to %s', newenv['HOME'])
|
||||
|
@ -5,10 +5,12 @@ IPC transport classes
|
||||
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import errno
|
||||
import logging
|
||||
import socket
|
||||
import weakref
|
||||
import time
|
||||
import sys
|
||||
|
||||
# Import 3rd-party libs
|
||||
import msgpack
|
||||
@ -83,6 +85,11 @@ class FutureWithTimeout(tornado.concurrent.Future):
|
||||
self.set_exception(exc)
|
||||
|
||||
|
||||
class IPCExceptionProxy(object):
|
||||
def __init__(self, orig_info):
|
||||
self.orig_info = orig_info
|
||||
|
||||
|
||||
class IPCServer(object):
|
||||
'''
|
||||
A Tornado IPC server very similar to Tornado's TCPServer class
|
||||
@ -237,31 +244,7 @@ class IPCClient(object):
|
||||
case it is used as the port for a tcp
|
||||
localhost connection.
|
||||
'''
|
||||
|
||||
# Create singleton map between two sockets
|
||||
instance_map = weakref.WeakKeyDictionary()
|
||||
|
||||
def __new__(cls, socket_path, io_loop=None):
|
||||
io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
if io_loop not in IPCClient.instance_map:
|
||||
IPCClient.instance_map[io_loop] = weakref.WeakValueDictionary()
|
||||
loop_instance_map = IPCClient.instance_map[io_loop]
|
||||
|
||||
# FIXME
|
||||
key = six.text_type(socket_path)
|
||||
|
||||
client = loop_instance_map.get(key)
|
||||
if client is None:
|
||||
log.debug('Initializing new IPCClient for path: %s', key)
|
||||
client = object.__new__(cls)
|
||||
# FIXME
|
||||
client.__singleton_init__(io_loop=io_loop, socket_path=socket_path)
|
||||
loop_instance_map[key] = client
|
||||
else:
|
||||
log.debug('Re-using IPCClient for %s', key)
|
||||
return client
|
||||
|
||||
def __singleton_init__(self, socket_path, io_loop=None):
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
'''
|
||||
Create a new IPC client
|
||||
|
||||
@ -280,10 +263,6 @@ class IPCClient(object):
|
||||
encoding = 'utf-8'
|
||||
self.unpacker = msgpack.Unpacker(encoding=encoding)
|
||||
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
# Handled by singleton __new__
|
||||
pass
|
||||
|
||||
def connected(self):
|
||||
return self.stream is not None and not self.stream.closed()
|
||||
|
||||
@ -332,9 +311,8 @@ class IPCClient(object):
|
||||
if self.stream is None:
|
||||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
self.stream = IOStream(
|
||||
socket.socket(sock_type, socket.SOCK_STREAM),
|
||||
socket.socket(sock_type, socket.SOCK_STREAM)
|
||||
)
|
||||
|
||||
try:
|
||||
log.trace('IPCClient: Connecting to socket: %s', self.socket_path)
|
||||
yield self.stream.connect(sock_addr)
|
||||
@ -354,7 +332,16 @@ class IPCClient(object):
|
||||
yield tornado.gen.sleep(1)
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
try:
|
||||
self.close()
|
||||
except socket.error as exc:
|
||||
if exc.errno != errno.EBADF:
|
||||
# If its not a bad file descriptor error, raise
|
||||
raise
|
||||
except TypeError:
|
||||
# This is raised when Python's GC has collected objects which
|
||||
# would be needed when calling self.close()
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
@ -368,16 +355,6 @@ class IPCClient(object):
|
||||
if self.stream is not None and not self.stream.closed():
|
||||
self.stream.close()
|
||||
|
||||
# Remove the entry from the instance map so
|
||||
# that a closed entry may not be reused.
|
||||
# This forces this operation even if the reference
|
||||
# count of the entry has not yet gone to zero.
|
||||
if self.io_loop in IPCClient.instance_map:
|
||||
loop_instance_map = IPCClient.instance_map[self.io_loop]
|
||||
key = six.text_type(self.socket_path)
|
||||
if key in loop_instance_map:
|
||||
del loop_instance_map[key]
|
||||
|
||||
|
||||
class IPCMessageClient(IPCClient):
|
||||
'''
|
||||
@ -591,12 +568,13 @@ class IPCMessageSubscriberService(IPCClient):
|
||||
|
||||
To use this refer to IPCMessageSubscriber documentation.
|
||||
'''
|
||||
def __singleton_init__(self, socket_path, io_loop=None):
|
||||
super(IPCMessageSubscriberService, self).__singleton_init__(
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
super(IPCMessageSubscriberService, self).__init__(
|
||||
socket_path, io_loop=io_loop)
|
||||
self.saved_data = []
|
||||
self._read_in_progress = Lock()
|
||||
self.handlers = weakref.WeakSet()
|
||||
self.read_stream_future = None
|
||||
|
||||
def _subscribe(self, handler):
|
||||
self.handlers.add(handler)
|
||||
@ -624,16 +602,16 @@ class IPCMessageSubscriberService(IPCClient):
|
||||
if timeout is None:
|
||||
timeout = 5
|
||||
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
while self._has_subscribers():
|
||||
if read_stream_future is None:
|
||||
read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
if self.read_stream_future is None:
|
||||
self.read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
|
||||
try:
|
||||
wire_bytes = yield FutureWithTimeout(self.io_loop,
|
||||
read_stream_future,
|
||||
self.read_stream_future,
|
||||
timeout)
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
|
||||
self.unpacker.feed(wire_bytes)
|
||||
msgs = [msg['body'] for msg in self.unpacker]
|
||||
@ -648,6 +626,7 @@ class IPCMessageSubscriberService(IPCClient):
|
||||
break
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
|
||||
exc = IPCExceptionProxy(sys.exc_info())
|
||||
self._feed_subscribers([exc])
|
||||
break
|
||||
|
||||
@ -672,7 +651,7 @@ class IPCMessageSubscriberService(IPCClient):
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: %s', exc)
|
||||
yield tornado.gen.sleep(1)
|
||||
self._read(timeout)
|
||||
yield self._read(timeout)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
@ -680,8 +659,11 @@ class IPCMessageSubscriberService(IPCClient):
|
||||
Sockets and filehandles should be closed explicitly, to prevent
|
||||
leaks.
|
||||
'''
|
||||
if not self._closing:
|
||||
super(IPCMessageSubscriberService, self).close()
|
||||
super(IPCMessageSubscriberService, self).close()
|
||||
if self.read_stream_future is not None and self.read_stream_future.done():
|
||||
exc = self.read_stream_future.exception()
|
||||
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
|
||||
log.error("Read future returned exception %r", exc)
|
||||
|
||||
def __del__(self):
|
||||
if IPCMessageSubscriberService in globals():
|
||||
@ -755,8 +737,8 @@ class IPCMessageSubscriber(object):
|
||||
raise tornado.gen.Return(None)
|
||||
if data is None:
|
||||
break
|
||||
elif isinstance(data, Exception):
|
||||
raise data
|
||||
elif isinstance(data, IPCExceptionProxy):
|
||||
six.reraise(*data.orig_info)
|
||||
elif callback:
|
||||
self.service.io_loop.spawn_callback(callback, data)
|
||||
else:
|
||||
@ -775,6 +757,7 @@ class IPCMessageSubscriber(object):
|
||||
|
||||
def close(self):
|
||||
self.service.unsubscribe(self)
|
||||
self.service.close()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
@ -1611,11 +1611,19 @@ class Pygit2(GitProvider):
|
||||
'''
|
||||
Clean stale local refs so they don't appear as fileserver environments
|
||||
'''
|
||||
try:
|
||||
if pygit2.GIT_FETCH_PRUNE:
|
||||
# Don't need to clean anything, pygit2 can do it by itself
|
||||
return []
|
||||
except AttributeError:
|
||||
# However, only in 0.26.2 and newer
|
||||
pass
|
||||
if self.credentials is not None:
|
||||
log.debug(
|
||||
'pygit2 does not support detecting stale refs for '
|
||||
'authenticated remotes, saltenvs will not reflect '
|
||||
'branches/tags removed from remote \'%s\'', self.id
|
||||
'The installed version of pygit2 (%s) does not support '
|
||||
'detecting stale refs for authenticated remotes, saltenvs '
|
||||
'will not reflect branches/tags removed from remote \'%s\'',
|
||||
PYGIT2_VERSION, self.id
|
||||
)
|
||||
return []
|
||||
return super(Pygit2, self).clean_stale_refs()
|
||||
@ -1721,6 +1729,11 @@ class Pygit2(GitProvider):
|
||||
else:
|
||||
if self.credentials is not None:
|
||||
origin.credentials = self.credentials
|
||||
try:
|
||||
fetch_kwargs['prune'] = pygit2.GIT_FETCH_PRUNE
|
||||
except AttributeError:
|
||||
# pruning only available in pygit2 >= 0.26.2
|
||||
pass
|
||||
try:
|
||||
fetch_results = origin.fetch(**fetch_kwargs)
|
||||
except GitError as exc:
|
||||
@ -2573,7 +2586,8 @@ class GitBase(object):
|
||||
LIBGIT2_VERSION
|
||||
)
|
||||
)
|
||||
if not salt.utils.path.which('git'):
|
||||
if not getattr(pygit2, 'GIT_FETCH_PRUNE', False) \
|
||||
and not salt.utils.path.which('git'):
|
||||
errors.append(
|
||||
'The git command line utility is required when using the '
|
||||
'\'pygit2\' {0}_provider.'.format(self.role)
|
||||
|
@ -380,3 +380,12 @@ class CMDModuleTest(ModuleCase):
|
||||
self.assertIn('administrator', cmd)
|
||||
else:
|
||||
self.assertEqual('root', cmd)
|
||||
|
||||
@skipIf(not salt.utils.platform.is_windows(), 'minion is not windows')
|
||||
def test_windows_env_handling(self):
|
||||
'''
|
||||
Ensure that nt.environ is used properly with cmd.run*
|
||||
'''
|
||||
out = self.run_function('cmd.run', ['set'], env={"abc": "123", "ABC": "456"}).splitlines()
|
||||
self.assertIn('abc=123', out)
|
||||
self.assertIn('ABC=456', out)
|
||||
|
@ -111,35 +111,72 @@ class MineTest(ModuleCase):
|
||||
['grains.items']
|
||||
)
|
||||
)
|
||||
# Smoke testing that grains should now exist in the mine
|
||||
ret_grains = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'grains.items']
|
||||
)
|
||||
self.assertEqual(ret_grains['minion']['id'], 'minion')
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.send',
|
||||
['test.arg', 'foo=bar', 'fnord=roscivs'],
|
||||
)
|
||||
)
|
||||
ret_args = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.arg']
|
||||
)
|
||||
expected = {
|
||||
'minion': {
|
||||
'args': [],
|
||||
'kwargs': {
|
||||
'fnord': 'roscivs',
|
||||
'foo': 'bar',
|
||||
},
|
||||
},
|
||||
}
|
||||
# Smoke testing that test.arg exists in the mine
|
||||
self.assertDictEqual(ret_args, expected)
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.send',
|
||||
['test.echo', 'foo']
|
||||
)
|
||||
)
|
||||
ret_grains = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'grains.items']
|
||||
)
|
||||
self.assertEqual(ret_grains['minion']['id'], 'minion')
|
||||
ret_echo = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.echo']
|
||||
)
|
||||
# Smoke testing that we were also able to set test.echo in the mine
|
||||
self.assertEqual(ret_echo['minion'], 'foo')
|
||||
self.assertTrue(
|
||||
self.run_function(
|
||||
'mine.delete',
|
||||
['grains.items']
|
||||
['test.arg']
|
||||
)
|
||||
)
|
||||
ret_grains_deleted = self.run_function(
|
||||
ret_arg_deleted = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'grains.items']
|
||||
['minion', 'test.arg']
|
||||
)
|
||||
# Now comes the real test - did we obliterate test.arg from the mine?
|
||||
# We could assert this a different way, but there shouldn't be any
|
||||
# other tests that are setting this mine value, so this should
|
||||
# definitely avoid any race conditions.
|
||||
self.assertFalse(
|
||||
ret_arg_deleted.get('minion', {})
|
||||
.get('kwargs', {})
|
||||
.get('fnord', None) == 'roscivs',
|
||||
'{} contained "fnord":"roscivs", which should be gone'.format(
|
||||
ret_arg_deleted,
|
||||
)
|
||||
)
|
||||
self.assertEqual(ret_grains_deleted.get('minion', None), None)
|
||||
ret_echo_stays = self.run_function(
|
||||
'mine.get',
|
||||
['minion', 'test.echo']
|
||||
)
|
||||
# Of course, one more health check - we want targeted removal.
|
||||
# This isn't horseshoes or hand grenades - test.arg should go away
|
||||
# but test.echo should still be available.
|
||||
self.assertEqual(ret_echo_stays['minion'], 'foo')
|
||||
|
@ -86,13 +86,14 @@ class IPCMessageClient(BaseIPCReqCase):
|
||||
'''
|
||||
|
||||
def _get_channel(self):
|
||||
channel = salt.transport.ipc.IPCMessageClient(
|
||||
socket_path=self.socket_path,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
channel.connect(callback=self.stop)
|
||||
self.wait()
|
||||
return channel
|
||||
if not hasattr(self, 'channel') or self.channel is None:
|
||||
self.channel = salt.transport.ipc.IPCMessageClient(
|
||||
socket_path=self.socket_path,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
self.channel.connect(callback=self.stop)
|
||||
self.wait()
|
||||
return self.channel
|
||||
|
||||
def setUp(self):
|
||||
super(IPCMessageClient, self).setUp()
|
||||
@ -106,6 +107,8 @@ class IPCMessageClient(BaseIPCReqCase):
|
||||
if exc.errno != errno.EBADF:
|
||||
# If its not a bad file descriptor error, raise
|
||||
raise
|
||||
finally:
|
||||
self.channel = None
|
||||
|
||||
def test_basic_send(self):
|
||||
msg = {'foo': 'bar', 'stop': True}
|
||||
|
Loading…
Reference in New Issue
Block a user