Merge branch '2018.3' into '2019.2'

Conflicts:
  - salt/transport/zeromq.py
  - tests/unit/modules/test_kubernetesmod.py
  - tests/unit/modules/test_win_file.py
This commit is contained in:
Ch3LL 2019-03-14 16:13:54 -04:00
commit 9ada8d9b7f
No known key found for this signature in database
GPG Key ID: 132B55A7C13EFA73
27 changed files with 634 additions and 64 deletions

View File

@ -211,7 +211,7 @@ execution modules
keystone
keystoneng
kmod
kubernetes
kubernetesmod
launchctl_service
layman
ldap3

View File

@ -1,6 +0,0 @@
=======================
salt.modules.kubernetes
=======================
.. automodule:: salt.modules.kubernetes
:members:

View File

@ -0,0 +1,6 @@
==========================
salt.modules.kubernetesmod
==========================
.. automodule:: salt.modules.kubernetesmod
:members:

View File

@ -340,8 +340,17 @@ Building the documentation
4. A useful method of viewing the HTML documentation locally is to start
Python's built-in HTTP server:
Python 3:
.. code-block:: bash
cd /path/to/salt/doc/_build/html
python3 -m http.server
Python 2:
.. code-block:: bash
cd /path/to/salt/doc/_build/html
python -m SimpleHTTPServer

View File

@ -86,7 +86,7 @@ Dependencies
Salt should run on any Unix-like platform so long as the dependencies are met.
* `Python 2.7`_ >= 2.7 <3.0
* `Python`_ - Python2 >= 2.7, Python3 >= 3.4
* `msgpack-python`_ - High-performance message interchange format
* `YAML`_ - Python YAML bindings
* `Jinja2`_ - parsing Salt States (configurable in the master settings)
@ -95,7 +95,7 @@ Salt should run on any Unix-like platform so long as the dependencies are met.
cloud service providers using a unified API
* `Requests`_ - HTTP library
* `Tornado`_ - Web framework and asynchronous networking library
* `futures`_ - Backport of the concurrent.futures package from Python 3.2
* `futures`_ - Python2 only dependency. Backport of the concurrent.futures package from Python 3.2
Depending on the chosen Salt transport, `ZeroMQ`_ or `RAET`_, dependencies
vary:
@ -142,7 +142,7 @@ Optional Dependencies
settings)
* gcc - dynamic `Cython`_ module compiling
.. _`Python 2.7`: http://python.org/download/
.. _`Python`: http://python.org/download/
.. _`ZeroMQ`: http://zeromq.org/
.. _`pyzmq`: https://github.com/zeromq/pyzmq
.. _`msgpack-python`: https://pypi.python.org/pypi/msgpack-python/

View File

@ -710,7 +710,7 @@ Execution modules
- :mod:`salt.modules.grafana4 <salt.modules.grafana4>`
- :mod:`salt.modules.heat <salt.modules.heat>`
- :mod:`salt.modules.icinga2 <salt.modules.icinga2>`
- :mod:`salt.modules.kubernetes <salt.modules.kubernetes>`
- :mod:`salt.modules.kubernetesmod <salt.modules.kubernetesmod>`
- :mod:`salt.modules.logmod <salt.modules.logmod>`
- :mod:`salt.modules.mattermost <salt.modules.mattermost>`
- :mod:`salt.modules.namecheap_dns <salt.modules.namecheap_dns>`

View File

@ -7,8 +7,8 @@ Salt compatibility code
# Import python libs
from __future__ import absolute_import, unicode_literals, print_function
import sys
import types
import logging
import binascii
# Import 3rd-party libs
from salt.exceptions import SaltException
@ -173,7 +173,7 @@ class IPv6AddressScoped(ipaddress.IPv6Address):
self._ip = address
elif self._is_packed_binary(address):
self._check_packed_address(address, 16)
self._ip = ipaddress._int_from_bytes(address, 'big')
self._ip = int(binascii.hexlify(address), 16)
else:
address = str(address)
if '/' in address:
@ -190,7 +190,7 @@ class IPv6AddressScoped(ipaddress.IPv6Address):
packed = False
if isinstance(data, bytes) and len(data) == 16 and b':' not in data:
try:
packed = bool(int(str(bytearray(data)).encode('hex'), 16))
packed = bool(int(binascii.hexlify(data), 16))
except ValueError:
pass

View File

@ -238,21 +238,21 @@ def beacon(config):
break
path = os.path.dirname(path)
for path in _config.get('files', {}):
excludes = _config['files'][path].get('exclude', '')
excludes = _config['files'][path].get('exclude', '')
if excludes and isinstance(excludes, list):
for exclude in excludes:
if isinstance(exclude, dict):
if exclude.values()[0].get('regex', False):
_exclude = next(iter(exclude))
if exclude[_exclude].get('regex', False):
try:
if re.search(list(exclude)[0], event.pathname):
if re.search(_exclude, event.pathname):
_append = False
except Exception:
log.warning('Failed to compile regex: %s',
list(exclude)[0])
_exclude)
else:
exclude = list(exclude)[0]
exclude = _exclude
elif '*' in exclude:
if fnmatch.fnmatch(event.pathname, exclude):
_append = False
@ -312,7 +312,7 @@ def beacon(config):
excl = []
for exclude in excludes:
if isinstance(exclude, dict):
excl.append(exclude.keys()[0])
excl.append(list(exclude)[0])
else:
excl.append(exclude)
excl = pyinotify.ExcludeFilter(excl)

View File

@ -157,14 +157,13 @@ def _windows_disks():
namespace = r'\\root\microsoft\windows\storage'
path = 'MSFT_PhysicalDisk'
where = '(MediaType=3 or MediaType=4)'
get = 'DeviceID,MediaType'
ret = {'disks': [], 'SSDs': []}
cmdret = __salt__['cmd.run_all'](
'{0} /namespace:{1} path {2} where {3} get {4} /format:table'.format(
wmic, namespace, path, where, get))
'{0} /namespace:{1} path {2} get {3} /format:table'.format(
wmic, namespace, path, get))
if cmdret['retcode'] != 0:
log.trace('Disk grain does not support this version of Windows')
@ -181,10 +180,12 @@ def _windows_disks():
elif mediatype == '4':
log.trace('Device %s reports itself as an SSD', device)
ret['SSDs'].append(device)
ret['disks'].append(device)
elif mediatype == '5':
log.trace('Device %s reports itself as an SCM', device)
ret['disks'].append(device)
else:
log.trace(
'Unable to identify device %s as an SSD or HDD. It does '
'not report 3 or 4', device
)
log.trace('Device %s reports itself as Unspecified', device)
ret['disks'].append(device)
return ret

View File

@ -1212,13 +1212,15 @@ def mount(name, device, mkmnt=False, fstype='', opts='defaults', user=None, util
lopts = ','.join(opts)
args = '-o {0}'.format(lopts)
# use of fstype on AIX differs from typical Linux use of -t functionality
# AIX uses -v vfsname, -t fstype mounts all with fstype in /etc/filesystems
if 'AIX' in __grains__['os']:
if fstype:
if fstype:
# use of fstype on AIX differs from typical Linux use of -t
# functionality AIX uses -v vfsname, -t fstype mounts all with
# fstype in /etc/filesystems
if 'AIX' in __grains__['os']:
args += ' -v {0}'.format(fstype)
else:
args += ' -t {0}'.format(fstype)
else:
args += ' -t {0}'.format(fstype)
cmd = 'mount {0} {1} {2} '.format(args, device, name)
out = __salt__['cmd.run_all'](cmd, runas=user, python_shell=False)
if out['retcode']:

View File

@ -1263,7 +1263,12 @@ def user_exists(user,
salt '*' mysql.user_exists 'username' password_column='authentication_string'
'''
run_verify = False
server_version = version(**connection_args)
server_version = salt.utils.data.decode(version(**connection_args))
if not server_version:
last_err = __context__['mysql.error']
err = 'MySQL Error: Unable to fetch current server version. Last error was: "{}"'.format(last_err)
log.error(err)
return False
compare_version = '10.2.0' if 'MariaDB' in server_version else '8.0.11'
dbc = _connect(**connection_args)
# Did we fail to connect with the user we are checking
@ -1402,7 +1407,12 @@ def user_create(user,
salt '*' mysql.user_create 'username' 'hostname' password_hash='hash'
salt '*' mysql.user_create 'username' 'hostname' allow_passwordless=True
'''
server_version = version(**connection_args)
server_version = salt.utils.data.decode(version(**connection_args))
if not server_version:
last_err = __context__['mysql.error']
err = 'MySQL Error: Unable to fetch current server version. Last error was: "{}"'.format(last_err)
log.error(err)
return False
compare_version = '10.2.0' if 'MariaDB' in server_version else '8.0.11'
if user_exists(user, host, **connection_args):
log.info('User \'%s\'@\'%s\' already exists', user, host)
@ -1507,7 +1517,12 @@ def user_chpass(user,
salt '*' mysql.user_chpass frank localhost password_hash='hash'
salt '*' mysql.user_chpass frank localhost allow_passwordless=True
'''
server_version = version(**connection_args)
server_version = salt.utils.data.decode(version(**connection_args))
if not server_version:
last_err = __context__['mysql.error']
err = 'MySQL Error: Unable to fetch current server version. Last error was: "{}"'.format(last_err)
log.error(err)
return False
compare_version = '10.2.0' if 'MariaDB' in server_version else '8.0.11'
args = {}
if password is not None:
@ -1863,7 +1878,12 @@ def grant_exists(grant,
'SELECT,INSERT,UPDATE,...' 'database.*' 'frank' 'localhost'
'''
server_version = version(**connection_args)
server_version = salt.utils.data.decode(version(**connection_args))
if not server_version:
last_err = __context__['mysql.error']
err = 'MySQL Error: Unable to fetch current server version. Last error was: "{}"'.format(last_err)
log.error(err)
return False
if 'ALL' in grant:
if salt.utils.versions.version_cmp(server_version, '8.0') >= 0 and \
'MariaDB' not in server_version:

View File

@ -1071,7 +1071,7 @@ def remove(path, force=False):
raise SaltInvocationError('File path must be absolute: {0}'.format(path))
# Does the file/folder exists
if not os.path.exists(path):
if not os.path.exists(path) and not is_link(path):
raise CommandExecutionError('Path not found: {0}'.format(path))
# Remove ReadOnly Attribute

View File

@ -1722,16 +1722,21 @@ class Keys(LowDataAdapter):
priv_key_file = tarfile.TarInfo('minion.pem')
priv_key_file.size = len(priv_key)
fileobj = six.StringIO()
fileobj = BytesIO()
tarball = tarfile.open(fileobj=fileobj, mode='w')
tarball.addfile(pub_key_file, six.StringIO(pub_key))
tarball.addfile(priv_key_file, six.StringIO(priv_key))
if six.PY3:
pub_key = pub_key.encode(__salt_system_encoding__)
priv_key = priv_key.encode(__salt_system_encoding__)
tarball.addfile(pub_key_file, BytesIO(pub_key))
tarball.addfile(priv_key_file, BytesIO(priv_key))
tarball.close()
headers = cherrypy.response.headers
headers['Content-Disposition'] = 'attachment; filename="saltkeys-{0}.tar"'.format(lowstate[0]['id_'])
headers['Content-Type'] = 'application/x-tar'
headers['Content-Length'] = fileobj.len
headers['Content-Length'] = len(fileobj.getvalue())
headers['Cache-Control'] = 'no-cache'
fileobj.seek(0)

View File

@ -4362,7 +4362,7 @@ def replace(name,
pattern
A regular expression, to be matched using Python's
:py:func:`~re.search`.
:py:func:`re.search`.
.. note::

View File

@ -4,7 +4,7 @@ Manage kubernetes resources as salt states
==========================================
NOTE: This module requires the proper pillar values set. See
salt.modules.kubernetes for more information.
salt.modules.kubernetesmod for more information.
.. warning::

View File

@ -234,8 +234,6 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
self._closing = True
if hasattr(self, 'message_client'):
self.message_client.close()
else:
log.debug('No message_client attr for AsyncZeroMQReqChannel found. Not destroying sockets.')
# Remove the entry from the instance map so that a closed entry may not
# be reused.

View File

@ -14,12 +14,17 @@ import re
# Import Salt libs
import salt.utils.path
import salt.modules.cmdmod
__salt__ = {
'cmd.run_all': salt.modules.cmdmod.run_all
}
log = logging.getLogger(__name__)
def get_certs_path():
icinga2_output = __salt__['cmd.run_all']([salt.utils.path.which('icinga2'), "--version"], python_shell=False)
icinga2_output = __salt__['cmd.run_all']([salt.utils.path.which('icinga2'),
"--version"], python_shell=False)
version = re.search(r'r\d+\.\d+', icinga2_output['stdout']).group(0)
# Return new certs path for icinga2 >= 2.8
if int(version.split('.')[1]) >= 8:

View File

@ -56,6 +56,13 @@ def _init_libcrypto():
'''
libcrypto = _load_libcrypto()
try:
libcrypto.OPENSSL_init_crypto()
except AttributeError:
# Support for OpenSSL < 1.1 (OPENSSL_API_COMPAT < 0x10100000L)
libcrypto.OPENSSL_no_config()
libcrypto.OPENSSL_add_all_algorithms_noconf()
libcrypto.RSA_new.argtypes = ()
libcrypto.RSA_new.restype = c_void_p
libcrypto.RSA_free.argtypes = (c_void_p, )
@ -70,13 +77,6 @@ def _init_libcrypto():
libcrypto.RSA_private_encrypt.argtypes = (c_int, c_char_p, c_char_p, c_void_p, c_int)
libcrypto.RSA_public_decrypt.argtypes = (c_int, c_char_p, c_char_p, c_void_p, c_int)
try:
libcrypto.OPENSSL_init_crypto()
except AttributeError:
# Support for OpenSSL < 1.1 (OPENSSL_API_COMPAT < 0x10100000L)
libcrypto.OPENSSL_no_config()
libcrypto.OPENSSL_add_all_algorithms_noconf()
return libcrypto

View File

@ -25,6 +25,16 @@ except ImportError:
log = logging.getLogger(__name__)
__virtualname__ = 'win_update'
def __virtual__():
if not salt.utils.platform.is_windows():
return False, 'win_update: Not available on Windows'
if not HAS_PYWIN32:
return False, 'win_update: Missing pywin32'
return __virtualname__
class Updates(object):
'''

View File

@ -655,14 +655,18 @@ def _fetch_events(q):
atexit.register(_clean_queue)
a_config = AdaptedConfigurationTestCaseMixin()
event = salt.utils.event.get_event('minion', sock_dir=a_config.get_config('minion')['sock_dir'], opts=a_config.get_config('minion'))
event = salt.utils.event.get_event(
'minion',
sock_dir=a_config.get_config('minion')['sock_dir'],
opts=a_config.get_config('minion'),
)
while True:
try:
events = event.get_event(full=False)
except Exception:
except Exception as exc:
# This is broad but we'll see all kinds of issues right now
# if we drop the proc out from under the socket while we're reading
pass
log.exception("Exception caught while getting events %r", exc)
q.put(events)

View File

@ -156,3 +156,43 @@ class INotifyBeaconTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(len(ret), 1)
self.assertEqual(ret[0]['path'], fp)
self.assertEqual(ret[0]['change'], 'IN_DELETE')
def test_multi_files_exclude(self):
dp1 = os.path.join(self.tmpdir, 'subdir1')
dp2 = os.path.join(self.tmpdir, 'subdir2')
os.mkdir(dp1)
os.mkdir(dp2)
_exclude1 = '{0}/subdir1/*tmpfile*$'.format(self.tmpdir)
_exclude2 = '{0}/subdir2/*filetmp*$'.format(self.tmpdir)
config = [{'files': {dp1: {'mask': ['create', 'delete'],
'recurse': True,
'exclude': [{_exclude1: {'regex': True}}],
'auto_add': True}}},
{'files': {dp2: {'mask': ['create', 'delete'],
'recurse': True,
'exclude': [{_exclude2: {'regex': True}}],
'auto_add': True}}}]
ret = inotify.validate(config)
self.assertEqual(ret, (True, 'Valid beacon configuration'))
fp = os.path.join(dp1, 'tmpfile')
with salt.utils.files.fopen(fp, 'w') as f:
pass
ret = inotify.beacon(config)
self.assertEqual(len(ret), 0)
os.remove(fp)
ret = inotify.beacon(config)
self.assertEqual(len(ret), 0)
fp = os.path.join(dp2, 'tmpfile')
with salt.utils.files.fopen(fp, 'w') as f:
pass
ret = inotify.beacon(config)
self.assertEqual(len(ret), 1)
self.assertEqual(ret[0]['path'], fp)
self.assertEqual(ret[0]['change'], 'IN_CREATE')
os.remove(fp)
ret = inotify.beacon(config)
self.assertEqual(len(ret), 1)
self.assertEqual(ret[0]['path'], fp)
self.assertEqual(ret[0]['change'], 'IN_DELETE')

View File

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Shane Lee <slee@saltstack.com>`
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import textwrap
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
patch,
MagicMock,
NO_MOCK,
NO_MOCK_REASON
)
# Import Salt Libs
import salt.grains.disks as disks
@skipIf(NO_MOCK, NO_MOCK_REASON)
class IscsiGrainsTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for _windows_disks grains
'''
def setup_loader_modules(self):
return {
disks: {
'__salt__': {},
},
}
def test__windows_disks(self):
'''
Test grains._windows_disks, normal return
Should return a populated dictionary
'''
mock_which = MagicMock(return_value='C:\\Windows\\System32\\wbem\\WMIC.exe')
wmic_result = textwrap.dedent('''
DeviceId MediaType
0 4
1 0
2 3
3 5
''')
mock_run_all = MagicMock(return_value={'stdout': wmic_result,
'retcode': 0})
with patch('salt.utils.path.which', mock_which), \
patch.dict(disks.__salt__, {'cmd.run_all': mock_run_all}):
result = disks._windows_disks()
expected = {
'SSDs': ['\\\\.\\PhysicalDrive0'],
'disks': [
'\\\\.\\PhysicalDrive0',
'\\\\.\\PhysicalDrive1',
'\\\\.\\PhysicalDrive2',
'\\\\.\\PhysicalDrive3']}
self.assertDictEqual(result, expected)
cmd = ' '.join([
'C:\\Windows\\System32\\wbem\\WMIC.exe',
'/namespace:\\\\root\\microsoft\\windows\\storage',
'path',
'MSFT_PhysicalDisk',
'get',
'DeviceID,MediaType',
'/format:table'
])
mock_run_all.assert_called_once_with(cmd)
def test__windows_disks_retcode(self):
'''
Test grains._windows_disks, retcode 1
Should return empty lists
'''
mock_which = MagicMock(return_value='C:\\Windows\\System32\\wbem\\WMIC.exe')
mock_run_all = MagicMock(return_value={'stdout': '',
'retcode': 1})
with patch('salt.utils.path.which', mock_which), \
patch.dict(disks.__salt__, {'cmd.run_all': mock_run_all}):
result = disks._windows_disks()
expected = {
'SSDs': [],
'disks': []}
self.assertDictEqual(result, expected)

View File

@ -21,7 +21,7 @@ from tests.support.mock import (
import salt.utils.files
import salt.utils.platform
from salt.modules import kubernetes
from salt.modules import kubernetesmod as kubernetes
@contextmanager
@ -31,7 +31,7 @@ def mock_kubernetes_library():
it caused kubernetes._cleanup() to get called for virtually every
test, which blows up. This prevents that specific blow-up once
"""
with patch('salt.modules.kubernetes.kubernetes') as mock_kubernetes_lib:
with patch('salt.modules.kubernetesmod.kubernetes') as mock_kubernetes_lib:
yield mock_kubernetes_lib
@ -40,7 +40,7 @@ def mock_kubernetes_library():
"Skipping test_kubernetes.py")
class KubernetesTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.modules.kubernetes
Test cases for salt.modules.kubernetesmod
'''
def setup_loader_modules(self):
@ -114,7 +114,7 @@ class KubernetesTestCase(TestCase, LoaderModuleMockMixin):
:return:
'''
with mock_kubernetes_library() as mock_kubernetes_lib:
with patch('salt.modules.kubernetes.show_deployment', Mock(return_value=None)):
with patch('salt.modules.kubernetesmod.show_deployment', Mock(return_value=None)):
with patch.dict(kubernetes.__salt__, {'config.option': Mock(side_effect=self.settings)}):
mock_kubernetes_lib.client.V1DeleteOptions = Mock(return_value="")
mock_kubernetes_lib.client.ExtensionsV1beta1Api.return_value = Mock(
@ -193,7 +193,7 @@ class KubernetesTestCase(TestCase, LoaderModuleMockMixin):
Test kubernetes.node_labels
:return:
'''
with patch('salt.modules.kubernetes.node') as mock_node:
with patch('salt.modules.kubernetesmod.node') as mock_node:
mock_node.return_value = {
'metadata': {
'labels': {
@ -213,7 +213,7 @@ class KubernetesTestCase(TestCase, LoaderModuleMockMixin):
kubectl [apply|create|replace] --record does
:return:
'''
with patch('salt.modules.kubernetes.sys.argv', ['/usr/bin/salt-call', 'state.apply']) as mock_sys:
with patch('salt.modules.kubernetesmod.sys.argv', ['/usr/bin/salt-call', 'state.apply']) as mock_sys:
func = getattr(kubernetes, '__dict_to_object_meta')
data = func(name='test-pod', namespace='test', metadata={})

View File

@ -305,6 +305,13 @@ class MountTestCase(TestCase, LoaderModuleMockMixin):
'stderr': True})
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device'))
mock.assert_called_with('mount device name ',
python_shell=False, runas=None)
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device', fstype='fstype'))
mock.assert_called_with('mount -t fstype device name ',
python_shell=False, runas=None)
mock = MagicMock(return_value={'retcode': False,
'stderr': False})
@ -320,6 +327,35 @@ class MountTestCase(TestCase, LoaderModuleMockMixin):
'stderr': True})
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device'))
mock.assert_called_with('mount device name ',
python_shell=False, runas=None)
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device', fstype='fstype'))
mock.assert_called_with('mount -v fstype device name ',
python_shell=False, runas=None)
mock = MagicMock(return_value={'retcode': False,
'stderr': False})
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device'))
with patch.dict(mount.__grains__, {'os': 'Linux'}):
mock = MagicMock(return_value=True)
with patch.object(os.path, 'exists', mock):
mock = MagicMock(return_value=None)
with patch.dict(mount.__salt__, {'file.mkdir': None}):
mock = MagicMock(return_value={'retcode': True,
'stderr': True})
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device'))
mock.assert_called_with('mount -o defaults device name ',
python_shell=False, runas=None)
with patch.dict(mount.__salt__, {'cmd.run_all': mock}):
self.assertTrue(mount.mount('name', 'device', fstype='fstype'))
mock.assert_called_with('mount -o defaults -t fstype device name ',
python_shell=False, runas=None)
mock = MagicMock(return_value={'retcode': False,
'stderr': False})

View File

@ -12,6 +12,7 @@ from tests.support.mock import patch, NO_MOCK, NO_MOCK_REASON
# Import Salt Libs
import salt.modules.win_file as win_file
import salt.modules.temp as temp
from salt.exceptions import CommandExecutionError
import salt.utils.platform
import salt.utils.win_dacl
@ -46,3 +47,272 @@ class WinFileTestCase(TestCase):
with patch('os.path.exists', return_value=False):
self.assertRaises(
CommandExecutionError, win_file.check_perms, self.FAKE_PATH)
@destructiveTest
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not salt.utils.platform.is_windows(), 'Requires Pywin32 libraries')
class WinFileCheckPermsTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for the check_perms function in salt.modules.win_file
'''
temp_file = ''
current_user = ''
def setup_loader_modules(self):
self.current_user = salt.utils.win_functions.get_current_user(False)
return {
win_file: {
'__opts__': {
'test': False}}}
def setUp(self):
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
self.temp_file.close()
salt.utils.win_dacl.set_owner(obj_name=self.temp_file.name,
principal=self.current_user)
salt.utils.win_dacl.set_inheritance(obj_name=self.temp_file.name,
enabled=True)
self.assertEqual(
salt.utils.win_dacl.get_owner(obj_name=self.temp_file.name),
self.current_user)
def tearDown(self):
os.remove(self.temp_file.name)
def test_check_perms_set_owner_test_true(self):
'''
Test setting the owner of a file with test=True
'''
with patch.dict(win_file.__opts__, {'test': True}):
expected = {'comment': '',
'changes': {},
'pchanges': {'owner': 'Administrators'},
'name': self.temp_file.name,
'result': None}
ret = win_file.check_perms(path=self.temp_file.name,
owner='Administrators',
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_set_owner(self):
'''
Test setting the owner of a file
'''
expected = {'comment': '',
'pchanges': {},
'changes': {'owner': 'Administrators'},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
owner='Administrators',
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_deny_test_true(self):
'''
Test setting deny perms on a file with test=True
'''
with patch.dict(win_file.__opts__, {'test': True}):
expected = {'comment': '',
'pchanges': {
'deny_perms': {
'Users': {'perms': 'read_execute'}}},
'changes': {'deny_perms': {}},
'name': self.temp_file.name,
'result': None}
ret = win_file.check_perms(
path=self.temp_file.name,
deny_perms={
'Users': {
'perms': 'read_execute'}},
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_deny(self):
'''
Test setting deny perms on a file
'''
expected = {'comment': '',
'pchanges': {'deny_perms': {}},
'changes': {
'deny_perms': {
'Users': {'perms': 'read_execute'}}},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
deny_perms={
'Users': {
'perms': 'read_execute'}},
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_grant_test_true(self):
'''
Test setting grant perms on a file with test=True
'''
with patch.dict(win_file.__opts__, {'test': True}):
expected = {'comment': '',
'pchanges': {
'grant_perms': {
'Users': {'perms': 'read_execute'}}},
'changes': {'grant_perms': {}},
'name': self.temp_file.name,
'result': None}
ret = win_file.check_perms(
path=self.temp_file.name,
grant_perms={
'Users': {
'perms': 'read_execute'}},
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_grant(self):
'''
Test setting grant perms on a file
'''
expected = {'comment': '',
'pchanges': {'grant_perms': {}},
'changes': {
'grant_perms': {
'Users': {'perms': 'read_execute'}}},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
grant_perms={
'Users': {
'perms': 'read_execute'}},
inheritance=None)
self.assertDictEqual(expected, ret)
def test_check_perms_inheritance_false_test_true(self):
'''
Test setting inheritance to False with test=True
'''
with patch.dict(win_file.__opts__, {'test': True}):
expected = {'comment': '',
'pchanges': {'inheritance': False},
'changes': {},
'name': self.temp_file.name,
'result': None}
ret = win_file.check_perms(path=self.temp_file.name,
inheritance=False)
self.assertDictEqual(expected, ret)
def test_check_perms_inheritance_false(self):
'''
Test setting inheritance to False
'''
expected = {'comment': '',
'pchanges': {},
'changes': {'inheritance': False},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
inheritance=False)
self.assertDictEqual(expected, ret)
def test_check_perms_inheritance_true(self):
'''
Test setting inheritance to true when it's already true (default)
'''
expected = {'comment': '',
'pchanges': {},
'changes': {},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
inheritance=True)
self.assertDictEqual(expected, ret)
def test_check_perms_reset_test_true(self):
'''
Test resetting perms with test=True. This shows minimal changes
'''
# Turn off inheritance
salt.utils.win_dacl.set_inheritance(obj_name=self.temp_file.name,
enabled=False,
clear=True)
# Set some permissions
salt.utils.win_dacl.set_permissions(obj_name=self.temp_file.name,
principal='Administrator',
permissions='full_control')
with patch.dict(win_file.__opts__, {'test': True}):
expected = {
'comment': '',
'pchanges': {
'remove_perms': {
'Administrator': {
'grant': {
'applies to': 'Not Inherited (file)',
'permissions': ['Full control'],
'inherited': False}}},
'grant_perms': {
'Administrators': {'perms': 'full_control'},
'Users': {'perms': 'read_execute'}}},
'changes': {'grant_perms': {}},
'name': self.temp_file.name,
'result': None}
ret = win_file.check_perms(path=self.temp_file.name,
grant_perms={
'Users': {
'perms': 'read_execute'},
'Administrators': {
'perms': 'full_control'}},
inheritance=False,
reset=True)
self.assertDictEqual(expected, ret)
def test_check_perms_reset(self):
'''
Test resetting perms on a File
'''
# Turn off inheritance
salt.utils.win_dacl.set_inheritance(obj_name=self.temp_file.name,
enabled=False,
clear=True)
# Set some permissions
salt.utils.win_dacl.set_permissions(obj_name=self.temp_file.name,
principal='Administrator',
permissions='full_control')
expected = {
'comment': '',
'pchanges': {'grant_perms': {}},
'changes': {
'remove_perms': {
'Administrator': {
'grant': {
'applies to': 'Not Inherited (file)',
'permissions': ['Full control'],
'inherited': False}}},
'grant_perms': {
'Administrators': {'perms': 'full_control'},
'Users': {'perms': 'read_execute'}}},
'name': self.temp_file.name,
'result': True}
ret = win_file.check_perms(path=self.temp_file.name,
grant_perms={
'Users': {
'perms': 'read_execute'},
'Administrators': {
'perms': 'full_control'}},
inheritance=False,
reset=True)
self.assertDictEqual(expected, ret)
def test_issue_52002_check_file_remove_symlink(self):
'''
Make sure that directories including symlinks or symlinks can be removed
'''
base = temp.dir(prefix='base')
target = os.path.join(base, 'child 1', 'target/')
symlink = os.path.join(base, 'child 2', 'link')
self.assertFalse(win_file.directory_exists(target))
self.assertFalse(win_file.directory_exists(symlink))
self.assertTrue(win_file.makedirs_(target))
self.assertTrue(win_file.directory_exists(symlink))
self.assertTrue(win_file.symlink(target, symlink))
self.assertTrue(win_file.is_link(symlink))
self.assertTrue(win_file.remove(base))
self.assertFalse(win_file.directory_exists(base))

View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
'''
Unit tests for salt._compat
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import sys
# Import Salt Testing libs
from tests.support.unit import TestCase
# Import Salt libs
import salt._compat as compat
# Import 3rd Party libs
from salt.ext.six import binary_type, text_type
log = logging.getLogger(__name__)
PY3 = sys.version_info.major == 3
class CompatTestCase(TestCase):
def test_text(self):
ret = compat.text_('test string')
self.assertTrue(isinstance(ret, text_type))
def test_text_binary(self):
ret = compat.text_(b'test string')
self.assertTrue(isinstance(ret, text_type))
def test_bytes(self):
ret = compat.bytes_('test string')
self.assertTrue(isinstance(ret, binary_type))
def test_bytes_binary(self):
ret = compat.bytes_(b'test string')
self.assertTrue(isinstance(ret, binary_type))
def test_ascii_native(self):
ret = compat.ascii_native_('test string')
self.assertTrue(isinstance(ret, str))
def test_ascii_native_binary(self):
ret = compat.ascii_native_(b'test string')
self.assertTrue(isinstance(ret, str))
def test_native(self):
ret = compat.native_('test string')
self.assertTrue(isinstance(ret, str))
def test_native_binary(self):
ret = compat.native_(b'test string')
self.assertTrue(isinstance(ret, str))
def test_string_io(self):
ret = compat.string_io('test string')
if PY3:
expected = 'io.StringIO object'
else:
expected = 'cStringIO.StringI object'
self.assertTrue(expected in repr(ret))
def test_string_io_unicode(self):
ret = compat.string_io(u'test string \xf8')
if PY3:
expected = 'io.StringIO object'
else:
expected = 'StringIO.StringIO instance'
self.assertTrue(expected in repr(ret))
def test_ipv6_class__is_packed_binary(self):
ipv6 = compat.IPv6AddressScoped('2001:db8::')
self.assertEqual(str(ipv6), '2001:db8::')
def test_ipv6_class__is_packed_binary_integer(self):
ipv6 = compat.IPv6AddressScoped(42540766411282592856903984951653826560)
self.assertEqual(str(ipv6), '2001:db8::')
def test_ipv6_class__is_packed_binary__issue_51831(self):
ipv6 = compat.IPv6AddressScoped(b'sixteen.digit.bn')
self.assertEqual(str(ipv6), '7369:7874:6565:6e2e:6469:6769:742e:626e')