Merge branch 'oxygen.rc1' into multiprocessing_queue_full

This commit is contained in:
Gareth J. Greenaway 2018-02-01 17:08:20 -08:00 committed by GitHub
commit df13f0b3e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1700 additions and 506 deletions

View File

@ -1273,8 +1273,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_file_list'}
return [sdecode(fn_) for fn_ in self.channel.send(load)]
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def file_list_emptydirs(self, saltenv='base', prefix=''):
'''
@ -1283,7 +1283,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_file_list_emptydirs'}
self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def dir_list(self, saltenv='base', prefix=''):
'''
@ -1292,7 +1293,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_dir_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def symlink_list(self, saltenv='base', prefix=''):
'''
@ -1301,7 +1303,8 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'prefix': prefix,
'cmd': '_symlink_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def __hash_and_stat_file(self, path, saltenv='base'):
'''
@ -1367,21 +1370,24 @@ class RemoteClient(Client):
'''
load = {'saltenv': saltenv,
'cmd': '_file_list'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def envs(self):
'''
Return a list of available environments
'''
load = {'cmd': '_file_envs'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def master_opts(self):
'''
Return the master opts data
'''
load = {'cmd': '_master_opts'}
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
def master_tops(self):
'''
@ -1392,7 +1398,8 @@ class RemoteClient(Client):
'opts': self.opts}
if self.auth:
load['tok'] = self.auth.gen_token(b'salt')
return self.channel.send(load)
return salt.utils.data.decode(self.channel.send(load)) if six.PY2 \
else self.channel.send(load)
class FSClient(RemoteClient):

View File

@ -17,11 +17,11 @@ import time
import salt.loader
import salt.utils.data
import salt.utils.files
import salt.utils.locales
import salt.utils.path
import salt.utils.url
import salt.utils.versions
from salt.utils.args import get_function_argspec as _argspec
from salt.utils.decorators import ensure_unicode_args
# Import 3rd-party libs
from salt.ext import six
@ -546,8 +546,8 @@ class Fileserver(object):
Find the path and return the fnd structure, this structure is passed
to other backend interfaces.
'''
path = salt.utils.locales.sdecode(path)
saltenv = salt.utils.locales.sdecode(saltenv)
path = salt.utils.stringutils.to_unicode(path)
saltenv = salt.utils.stringutils.to_unicode(saltenv)
back = self.backends(back)
kwargs = {}
fnd = {'path': '',
@ -626,7 +626,7 @@ class Fileserver(object):
if not isinstance(load['saltenv'], six.string_types):
load['saltenv'] = six.text_type(load['saltenv'])
fnd = self.find_file(salt.utils.locales.sdecode(load['path']),
fnd = self.find_file(salt.utils.stringutils.to_unicode(load['path']),
load['saltenv'])
if not fnd.get('back'):
return '', None
@ -731,6 +731,7 @@ class Fileserver(object):
)
return ret
@ensure_unicode_args
def file_list(self, load):
'''
Return a list of files from the dominant environment
@ -749,14 +750,13 @@ class Fileserver(object):
fstr = '{0}.file_list'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def file_list_emptydirs(self, load):
'''
List all emptydirs in the given environment
@ -775,14 +775,13 @@ class Fileserver(object):
fstr = '{0}.file_list_emptydirs'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def dir_list(self, load):
'''
List all directories in the given environment
@ -801,14 +800,13 @@ class Fileserver(object):
fstr = '{0}.dir_list'.format(fsb)
if fstr in self.servers:
ret.update(self.servers[fstr](load))
# upgrade all set elements to a common encoding
ret = [salt.utils.locales.sdecode(f) for f in ret]
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':
ret = [f for f in ret if f.startswith(prefix)]
return sorted(ret)
@ensure_unicode_args
def symlink_list(self, load):
'''
Return a list of symlinked files and dirs
@ -827,10 +825,6 @@ class Fileserver(object):
symlstr = '{0}.symlink_list'.format(fsb)
if symlstr in self.servers:
ret = self.servers[symlstr](load)
# upgrade all set elements to a common encoding
ret = dict([
(salt.utils.locales.sdecode(x), salt.utils.locales.sdecode(y)) for x, y in ret.items()
])
# some *fs do not handle prefix. Ensure it is filtered
prefix = load.get('prefix', '').strip('/')
if prefix != '':

View File

@ -132,20 +132,23 @@ def _linux_disks():
ret = {'disks': [], 'SSDs': []}
for entry in glob.glob('/sys/block/*/queue/rotational'):
with salt.utils.files.fopen(entry) as entry_fp:
device = entry.split('/')[3]
flag = entry_fp.read(1)
if flag == '0':
ret['SSDs'].append(device)
log.trace('Device %s reports itself as an SSD', device)
elif flag == '1':
ret['disks'].append(device)
log.trace('Device %s reports itself as an HDD', device)
else:
log.trace(
'Unable to identify device %s as an SSD or HDD. It does '
'not report 0 or 1', device
)
try:
with salt.utils.files.fopen(entry) as entry_fp:
device = entry.split('/')[3]
flag = entry_fp.read(1)
if flag == '0':
ret['SSDs'].append(device)
log.trace('Device %s reports itself as an SSD', device)
elif flag == '1':
ret['disks'].append(device)
log.trace('Device %s reports itself as an HDD', device)
else:
log.trace(
'Unable to identify device %s as an SSD or HDD. It does '
'not report 0 or 1', device
)
except IOError:
pass
return ret

View File

@ -1437,6 +1437,9 @@ class Minion(MinionBase):
Override this method if you wish to handle the decoded data
differently.
'''
# Ensure payload is unicode. Disregard failure to decode binary blobs.
if six.PY2:
data = salt.utils.data.decode(data, keep=True)
if 'user' in data:
log.info(
'User %s Executing command %s with jid %s',

View File

@ -19,6 +19,7 @@ import logging
# Import Salt libs
import salt.utils.args
import salt.utils.data
import salt.utils.path
import salt.utils.platform
from salt.exceptions import SaltInvocationError
@ -380,7 +381,15 @@ def do(cmdline, runas=None, env=None):
if not env:
env = {}
env['PATH'] = '{0}/shims:{1}'.format(path, os.environ['PATH'])
# NOTE: Env vars (and their values) need to be str type on both Python 2
# and 3. The code below first normalizes all path components to unicode to
# stitch them together, and then converts the result back to a str type.
env[str('PATH')] = salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
os.pathsep.join((
salt.utils.path.join(path, 'shims'),
salt.utils.stringutils.to_unicode(os.environ['PATH'])
))
)
try:
cmdline = salt.utils.args.shlex_split(cmdline)

View File

@ -735,14 +735,14 @@ def get_config_file():
return __SYSLOG_NG_CONFIG_FILE
def _run_command(cmd, options=()):
def _run_command(cmd, options=(), env=None):
'''
Runs the command cmd with options as its CLI parameters and returns the
result as a dictionary.
'''
params = [cmd]
params.extend(options)
return __salt__['cmd.run_all'](params, python_shell=False)
return __salt__['cmd.run_all'](params, env=env, python_shell=False)
def _determine_config_version(syslog_ng_sbin_dir):
@ -785,49 +785,26 @@ def set_parameters(version=None,
return _format_return_data(0)
def _add_to_path_envvar(directory):
'''
Adds directory to the PATH environment variable and returns the original
one.
'''
orig_path = os.environ.get('PATH', '')
if directory:
if not os.path.isdir(directory):
log.error('The given parameter is not a directory')
os.environ['PATH'] = '{0}{1}{2}'.format(orig_path,
os.pathsep,
directory)
return orig_path
def _restore_path_envvar(original):
'''
Sets the PATH environment variable to the parameter.
'''
if original:
os.environ['PATH'] = original
def _run_command_in_extended_path(syslog_ng_sbin_dir, command, params):
'''
Runs the given command in an environment, where the syslog_ng_sbin_dir is
added then removed from the PATH.
Runs the specified command with the syslog_ng_sbin_dir in the PATH
'''
orig_path = _add_to_path_envvar(syslog_ng_sbin_dir)
if not salt.utils.path.which(command):
error_message = (
'Unable to execute the command \'{0}\'. It is not in the PATH.'
.format(command)
)
log.error(error_message)
_restore_path_envvar(orig_path)
raise CommandExecutionError(error_message)
ret = _run_command(command, options=params)
_restore_path_envvar(orig_path)
return ret
orig_path = os.environ.get('PATH', '')
env = None
if syslog_ng_sbin_dir:
# Custom environment variables should be str types. This code
# normalizes the paths to unicode to join them together, and then
# converts back to a str type.
env = {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
os.pathsep.join(
salt.utils.data.decode(
(orig_path, syslog_ng_sbin_dir)
)
)
)
}
return _run_command(command, options=params, env=env)
def _format_return_data(retcode, stdout=None, stderr=None):

View File

@ -25,9 +25,6 @@ import salt.utils.path
import salt.utils.platform
from salt.exceptions import CommandExecutionError, SaltInvocationError
# Import 3rd-party libs
from salt.ext import six
__virtualname__ = 'system'
@ -509,7 +506,6 @@ def get_computer_desc():
salt '*' system.get_computer_desc
'''
desc = None
hostname_cmd = salt.utils.path.which('hostnamectl')
if hostname_cmd:
desc = __salt__['cmd.run'](
@ -517,6 +513,7 @@ def get_computer_desc():
python_shell=False
)
else:
desc = None
pattern = re.compile(r'^\s*PRETTY_HOSTNAME=(.*)$')
try:
with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
@ -528,12 +525,12 @@ def get_computer_desc():
desc = _strip_quotes(match.group(1).strip())
# no break so we get the last occurance
except IOError:
pass
if desc is None:
return False
if six.PY3:
desc = desc.replace('\\"', '"')
else:
desc = desc.replace('\\"', '"').decode('string_escape')
return desc
return desc.replace(r'\"', r'"').replace(r'\n', '\n').replace(r'\t', '\t')
def set_computer_desc(desc):
@ -551,10 +548,9 @@ def set_computer_desc(desc):
salt '*' system.set_computer_desc "Michael's laptop"
'''
if six.PY3:
desc = desc.replace('"', '\\"')
else:
desc = desc.encode('string_escape').replace('"', '\\"')
desc = salt.utils.stringutils.to_unicode(
desc).replace('"', r'\"').replace('\n', r'\n').replace('\t', r'\t')
hostname_cmd = salt.utils.path.which('hostnamectl')
if hostname_cmd:
result = __salt__['cmd.retcode'](
@ -567,23 +563,22 @@ def set_computer_desc(desc):
with salt.utils.files.fopen('/etc/machine-info', 'w'):
pass
is_pretty_hostname_found = False
pattern = re.compile(r'^\s*PRETTY_HOSTNAME=(.*)$')
new_line = 'PRETTY_HOSTNAME="{0}"'.format(desc)
new_line = salt.utils.stringutils.to_str('PRETTY_HOSTNAME="{0}"'.format(desc))
try:
with salt.utils.files.fopen('/etc/machine-info', 'r+') as mach_info:
lines = mach_info.readlines()
for i, line in enumerate(lines):
if pattern.match(line):
is_pretty_hostname_found = True
if pattern.match(salt.utils.stringutils.to_unicode(line)):
lines[i] = new_line
if not is_pretty_hostname_found:
break
else:
# PRETTY_HOSTNAME line was not found, add it to the end
lines.append(new_line)
# time to write our changes to the file
mach_info.seek(0, 0)
mach_info.truncate()
mach_info.write(salt.utils.stringutils.to_str(''.join(lines)))
mach_info.write(salt.utils.stringutils.to_str('\n'))
mach_info.writelines(lines)
return True
except IOError:
return False

View File

@ -6,7 +6,7 @@ Note that not all Windows applications will rehash the PATH environment variable
Only the ones that listen to the WM_SETTINGCHANGE message
http://support.microsoft.com/kb/104011
'''
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import logging
@ -14,7 +14,10 @@ import os
import re
# Import Salt libs
import salt.utils.args
import salt.utils.data
import salt.utils.platform
import salt.utils.stringutils
# Import 3rd-party libs
from salt.ext.six.moves import map
@ -28,6 +31,12 @@ except ImportError:
# Settings
log = logging.getLogger(__name__)
HIVE = 'HKEY_LOCAL_MACHINE'
KEY = 'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment'
VNAME = 'PATH'
VTYPE = 'REG_EXPAND_SZ'
PATHSEP = str(os.pathsep) # future lint: disable=blacklisted-function
def __virtual__():
'''
@ -38,16 +47,17 @@ def __virtual__():
return (False, "Module win_path: module only works on Windows systems")
def _normalize_dir(string):
def _normalize_dir(string_):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', string.lower())
return re.sub(r'\\$', '', salt.utils.stringutils.to_unicode(string_))
def rehash():
'''
Send a WM_SETTINGCHANGE Broadcast to Windows to refresh the Environment variables
Send a WM_SETTINGCHANGE Broadcast to Windows to refresh the Environment
variables
CLI Example:
@ -68,9 +78,12 @@ def get_path():
salt '*' win_path.get_path
'''
ret = __salt__['reg.read_value']('HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH')['vdata'].split(';')
ret = salt.utils.stringutils.to_unicode(
__salt__['reg.read_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH')['vdata']
).split(';')
# Trim ending backslash
return list(map(_normalize_dir, ret))
@ -95,17 +108,30 @@ def exists(path):
path = _normalize_dir(path)
sysPath = get_path()
return path in sysPath
return path.lower() in (x.lower() for x in sysPath)
def add(path, index=0):
def _update_local_path(local_path):
os.environ[str('PATH')] = PATHSEP.join(local_path) # future lint: disable=blacklisted-function
def add(path, index=None, **kwargs):
'''
Add the directory to the SYSTEM path in the index location
Add the directory to the SYSTEM path in the index location. Returns
``True`` if successful, otherwise ``False``.
Returns:
boolean True if successful, False if unsuccessful
path
Directory to add to path
CLI Example:
index
Optionally specify an index at which to insert the directory
rehash : True
If the registry was updated, and this value is set to ``True``, sends a
WM_SETTINGCHANGE broadcast to refresh the environment variables. Set
this to ``False`` to skip this broadcast.
CLI Examples:
.. code-block:: bash
@ -115,56 +141,157 @@ def add(path, index=0):
# Will add to the end of the path
salt '*' win_path.add 'c:\\python27' index='-1'
'''
currIndex = -1
sysPath = get_path()
kwargs = salt.utils.args.clean_kwargs(**kwargs)
rehash_ = kwargs.pop('rehash', True)
if kwargs:
salt.utils.args.invalid_kwargs(kwargs)
path = _normalize_dir(path)
index = int(index)
path_str = salt.utils.stringutils.to_str(path)
system_path = get_path()
# validate index boundaries
if index < 0:
index = len(sysPath) + index + 1
if index > len(sysPath):
index = len(sysPath)
# The current path should not have any unicode in it, but don't take any
# chances.
local_path = [
salt.utils.stringutils.to_str(x)
for x in os.environ['PATH'].split(PATHSEP)
]
localPath = os.environ["PATH"].split(os.pathsep)
if path not in localPath:
localPath.append(path)
os.environ["PATH"] = os.pathsep.join(localPath)
if index is not None:
try:
index = int(index)
except (TypeError, ValueError):
index = None
# Check if we are in the system path at the right location
try:
currIndex = sysPath.index(path)
if currIndex != index:
sysPath.pop(currIndex)
def _check_path(dirs, path, index):
'''
Check the dir list for the specified path, at the specified index, and
make changes to the list if needed. Return True if changes were made to
the list, otherwise return False.
'''
dirs_lc = [x.lower() for x in dirs]
try:
# Check index with case normalized
cur_index = dirs_lc.index(path.lower())
except ValueError:
cur_index = None
num_dirs = len(dirs)
# if pos is None, we don't care about where the directory is in the
# PATH. If it is a number, then that number is the index to be used for
# insertion (this number will be different from the index if the index
# is less than -1, for reasons explained in the comments below). If it
# is the string 'END', then the directory must be at the end of the
# PATH, so it should be removed before appending if it is anywhere but
# the end.
pos = index
if index is not None:
if index >= num_dirs or index == -1:
# Set pos to 'END' so we know that we're moving the directory
# if it exists and isn't already at the end.
pos = 'END'
elif index <= -num_dirs:
# Negative index is too large, shift index to beginning of list
index = pos = 0
elif index <= 0:
# Negative indexes (other than -1 which is handled above) must
# be inserted at index + 1 for the item to end up in the
# position you want, since list.insert() inserts before the
# index passed to it. For example:
#
# >>> x = ['one', 'two', 'four', 'five']
# >>> x.insert(-3, 'three')
# >>> x
# ['one', 'three', 'two', 'four', 'five']
# >>> x = ['one', 'two', 'four', 'five']
# >>> x.insert(-2, 'three')
# >>> x
# ['one', 'two', 'three', 'four', 'five']
pos += 1
if pos == 'END':
if cur_index is not None:
if cur_index == num_dirs - 1:
# Directory is already in the desired location, no changes
# need to be made.
return False
else:
# Remove from current location and add it to the end
dirs.pop(cur_index)
dirs.append(path)
return True
else:
# Doesn't exist in list, add it to the end
dirs.append(path)
return True
elif index is None:
# If index is None, that means that if the path is not already in
# list, we will be appending it to the end instead of inserting it
# somewhere in the middle.
if cur_index is not None:
# Directory is already in the PATH, no changes need to be made.
return False
else:
# Directory not in the PATH, and we're not enforcing the index.
# Append it to the list.
dirs.append(path)
return True
else:
return True
except ValueError:
pass
# Add it to the Path
sysPath.insert(index, path)
regedit = __salt__['reg.set_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH',
';'.join(sysPath),
'REG_EXPAND_SZ'
)
# Broadcast WM_SETTINGCHANGE to Windows
if regedit:
return rehash()
else:
if cur_index is not None:
if (index < 0 and cur_index != (num_dirs + index)) \
or (index >= 0 and cur_index != index):
# Directory is present, but not at the desired index.
# Remove it from the non-normalized path list and insert it
# at the correct postition.
dirs.pop(cur_index)
dirs.insert(pos, path)
return True
else:
# Directory is present and its position matches the desired
# index. No changes need to be made.
return False
else:
# Insert the path at the desired index.
dirs.insert(pos, path)
return True
return False
if _check_path(local_path, path_str, index):
_update_local_path(local_path)
def remove(path):
if not _check_path(system_path, path, index):
# No changes necessary
return True
# Move forward with registry update
result = __salt__['reg.set_value'](
HIVE,
KEY,
VNAME,
';'.join(salt.utils.data.decode(system_path)),
VTYPE
)
if result and rehash_:
# Broadcast WM_SETTINGCHANGE to Windows if registry was updated
return rehash()
else:
return result
def remove(path, **kwargs):
r'''
Remove the directory from the SYSTEM path
Returns:
boolean True if successful, False if unsuccessful
rehash : True
If the registry was updated, and this value is set to ``True``, sends a
WM_SETTINGCHANGE broadcast to refresh the environment variables. Set
this to ``False`` to skip this broadcast.
CLI Example:
.. code-block:: bash
@ -172,27 +299,58 @@ def remove(path):
# Will remove C:\Python27 from the path
salt '*' win_path.remove 'c:\\python27'
'''
kwargs = salt.utils.args.clean_kwargs(**kwargs)
rehash_ = kwargs.pop('rehash', True)
if kwargs:
salt.utils.args.invalid_kwargs(kwargs)
path = _normalize_dir(path)
sysPath = get_path()
path_str = salt.utils.stringutils.to_str(path)
system_path = get_path()
localPath = os.environ["PATH"].split(os.pathsep)
if path in localPath:
localPath.remove(path)
os.environ["PATH"] = os.pathsep.join(localPath)
# The current path should not have any unicode in it, but don't take any
# chances.
local_path = [
salt.utils.stringutils.to_str(x)
for x in os.environ['PATH'].split(PATHSEP)
]
try:
sysPath.remove(path)
except ValueError:
def _check_path(dirs, path):
'''
Check the dir list for the specified path, and make changes to the list
if needed. Return True if changes were made to the list, otherwise
return False.
'''
dirs_lc = [x.lower() for x in dirs]
path_lc = path.lower()
new_dirs = []
for index, dirname in enumerate(dirs_lc):
if path_lc != dirname:
new_dirs.append(dirs[index])
if len(new_dirs) != len(dirs):
dirs[:] = new_dirs[:]
return True
else:
return False
if _check_path(local_path, path_str):
_update_local_path(local_path)
if not _check_path(system_path, path):
# No changes necessary
return True
regedit = __salt__['reg.set_value'](
'HKEY_LOCAL_MACHINE',
'SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment',
'PATH',
';'.join(sysPath),
'REG_EXPAND_SZ'
result = __salt__['reg.set_value'](
HIVE,
KEY,
VNAME,
';'.join(salt.utils.data.decode(system_path)),
VTYPE
)
if regedit:
if result and rehash_:
# Broadcast WM_SETTINGCHANGE to Windows if registry was updated
return rehash()
else:
return False
return result

View File

@ -109,9 +109,9 @@ import textwrap
# Import salt libs
import salt.utils.color
import salt.utils.data
import salt.utils.stringutils
import salt.output
from salt.utils.locales import sdecode
# Import 3rd-party libs
from salt.ext import six
@ -156,7 +156,7 @@ def output(data, **kwargs): # pylint: disable=unused-argument
def _format_host(host, data):
host = sdecode(host)
host = salt.utils.data.decode(host)
colors = salt.utils.color.get_colors(
__opts__.get('color'),
@ -183,7 +183,9 @@ def _format_host(host, data):
.format(hcolor, colors)))
for err in data:
if strip_colors:
err = salt.output.strip_esc_sequence(sdecode(err))
err = salt.output.strip_esc_sequence(
salt.utils.data.decode(err)
)
hstrs.append(('{0}----------\n {1}{2[ENDC]}'
.format(hcolor, err, colors)))
if isinstance(data, dict):
@ -246,7 +248,7 @@ def _format_host(host, data):
tcolor = colors['LIGHT_YELLOW']
state_output = __opts__.get('state_output', 'full').lower()
comps = [sdecode(comp) for comp in tname.split('_|-')]
comps = tname.split('_|-')
if state_output.endswith('_id'):
# Swap in the ID for the name. Refs #35137
@ -315,15 +317,13 @@ def _format_host(host, data):
# be sure that ret['comment'] is utf-8 friendly
try:
if not isinstance(ret['comment'], six.text_type):
if six.PY2:
ret['comment'] = six.text_type(ret['comment']).decode('utf-8')
else:
ret['comment'] = salt.utils.stringutils.to_str(ret['comment'])
ret['comment'] = six.text_type(ret['comment'])
except UnicodeDecodeError:
# but try to continue on errors
pass
# If we got here, we're on Python 2 and ret['comment'] somehow
# contained a str type with unicode content.
ret['comment'] = salt.utils.stringutils.to_unicode(ret['comment'])
try:
comment = sdecode(ret['comment'])
comment = salt.utils.data.decode(ret['comment'])
comment = comment.strip().replace(
'\n',
'\n' + ' ' * 14)
@ -356,7 +356,7 @@ def _format_host(host, data):
'tcolor': tcolor,
'comps': comps,
'ret': ret,
'comment': sdecode(comment),
'comment': salt.utils.data.decode(comment),
# This nukes any trailing \n and indents the others.
'colors': colors
}
@ -481,20 +481,12 @@ def _nested_changes(changes):
'''
Print the changes data using the nested outputter
'''
global __opts__ # pylint: disable=W0601
opts = __opts__.copy()
# Pass the __opts__ dict. The loader will splat this modules __opts__ dict
# anyway so have to restore it after the other outputter is done
if __opts__['color']:
__opts__['color'] = 'CYAN'
ret = '\n'
ret += salt.output.out_format(
changes,
'nested',
__opts__,
nested_indent=14)
__opts__ = opts
return ret

View File

@ -30,8 +30,8 @@ from numbers import Number
# Import salt libs
import salt.output
import salt.utils.color
import salt.utils.locales
import salt.utils.odict
import salt.utils.stringutils
from salt.ext import six
@ -63,9 +63,21 @@ class NestDisplay(object):
fmt = '{0}{1}{2}{3}{4}{5}'
try:
return fmt.format(indent, color, prefix, msg, endc, suffix)
return fmt.format(
indent,
color,
prefix,
msg,
endc,
suffix)
except UnicodeDecodeError:
return fmt.format(indent, color, prefix, salt.utils.locales.sdecode(msg), endc, suffix)
return fmt.format(
indent,
color,
prefix,
salt.utils.stringutils.to_unicode(msg),
endc,
suffix)
def display(self, ret, indent, prefix, out):
'''

View File

@ -2,11 +2,13 @@
'''
Manage the Windows System PATH
'''
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
# Python Libs
import re
import os
# Import Salt libs
import salt.utils.stringutils
# Import 3rd-party libs
from salt.ext import six
def __virtual__():
@ -16,11 +18,9 @@ def __virtual__():
return 'win_path' if 'win_path.rehash' in __salt__ else False
def _normalize_dir(string):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', string.lower())
def _format_comments(ret, comments):
ret['comment'] = ' '.join(comments)
return ret
def absent(name):
@ -41,23 +41,24 @@ def absent(name):
'changes': {},
'comment': ''}
localPath = os.environ["PATH"].split(os.pathsep)
if name in localPath:
localPath.remove(name)
os.environ["PATH"] = os.pathsep.join(localPath)
if __salt__['win_path.exists'](name):
ret['changes']['removed'] = name
else:
if not __salt__['win_path.exists'](name):
ret['comment'] = '{0} is not in the PATH'.format(name)
return ret
if __opts__['test']:
ret['comment'] = '{0} would be removed from the PATH'.format(name)
ret['result'] = None
return ret
ret['result'] = __salt__['win_path.remove'](name)
if not ret['result']:
ret['comment'] = 'could not remove {0} from the PATH'.format(name)
__salt__['win_path.remove'](name)
if __salt__['win_path.exists'](name):
ret['comment'] = 'Failed to remove {0} from the PATH'.format(name)
ret['result'] = False
else:
ret['comment'] = 'Removed {0} from the PATH'.format(name)
ret['changes']['removed'] = name
return ret
@ -65,12 +66,16 @@ def exists(name, index=None):
'''
Add the directory to the system PATH at index location
index: where the directory should be placed in the PATH (default: None).
This is 0-indexed, so 0 means to prepend at the very start of the PATH.
[Note: Providing no index will append directory to PATH and
will not enforce its location within the PATH.]
index
Position where the directory should be placed in the PATH. This is
0-indexed, so 0 means to prepend at the very start of the PATH.
Example:
.. note::
If the index is not specified, and the directory needs to be added
to the PATH, then the directory will be appended to the PATH, and
this state will not enforce its location within the PATH.
Examples:
.. code-block:: yaml
@ -80,53 +85,149 @@ def exists(name, index=None):
'C:\\sysinternals':
win_path.exists:
- index: 0
'C:\\mystuff':
win_path.exists:
- index: -1
'''
try:
name = salt.utils.stringutils.to_unicode(name)
except TypeError:
name = six.text_type(name)
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
# determine what to do
sysPath = __salt__['win_path.get_path']()
path = _normalize_dir(name)
localPath = os.environ["PATH"].split(os.pathsep)
if path not in localPath:
localPath.append(path)
os.environ["PATH"] = os.pathsep.join(localPath)
try:
currIndex = sysPath.index(path)
if index is not None:
index = int(index)
if index < 0:
index = len(sysPath) + index + 1
if index > len(sysPath):
index = len(sysPath)
# check placement within PATH
if currIndex != index:
sysPath.pop(currIndex)
ret['changes']['removed'] = '{0} was removed from index {1}'.format(name, currIndex)
else:
ret['comment'] = '{0} is already present in the PATH at the right location'.format(name)
return ret
else: # path is in system PATH; don't care where
ret['comment'] = '{0} is already present in the PATH at the right location'.format(name)
return ret
except ValueError:
pass
if index is None:
index = len(sysPath) # put it at the end
ret['changes']['added'] = '{0} will be added at index {1}'.format(name, index)
if __opts__['test']:
ret['result'] = None
if index is not None and not isinstance(index, six.integer_types):
ret['comment'] = 'Index must be an integer'
ret['result'] = False
return ret
# Add it
ret['result'] = __salt__['win_path.add'](path, index)
if not ret['result']:
ret['comment'] = 'could not add {0} to the PATH'.format(name)
def _get_path_lowercase():
return [x.lower() for x in __salt__['win_path.get_path']()]
def _index(path=None):
if path is None:
path = _get_path_lowercase()
try:
pos = path.index(name.lower())
except ValueError:
return None
else:
if index is not None and index < 0:
# Since a negative index was used, convert the index to a
# negative index to make the changes dict easier to read, as
# well as making comparisons manageable.
return -(len(path) - pos)
else:
return pos
def _changes(old, new):
return {'index': {'old': old, 'new': new}}
pre_path = _get_path_lowercase()
num_dirs = len(pre_path)
if index is not None:
if index > num_dirs:
ret.setdefault('warnings', []).append(
'There are only {0} directories in the PATH, using an index '
'of {0} instead of {1}.'.format(num_dirs, index)
)
index = num_dirs
elif index <= -num_dirs:
ret.setdefault('warnings', []).append(
'There are only {0} directories in the PATH, using an index '
'of 0 instead of {1}.'.format(num_dirs, index)
)
index = 0
old_index = _index(pre_path)
comments = []
if old_index is not None:
# Directory exists in PATH
if index is None:
# We're not enforcing the index, and the directory is in the PATH.
# There's nothing to do here.
comments.append('{0} already exists in the PATH.'.format(name))
return _format_comments(ret, comments)
else:
if index == old_index:
comments.append(
'{0} already exists in the PATH at index {1}.'.format(
name, index
)
)
return _format_comments(ret, comments)
else:
if __opts__['test']:
ret['result'] = None
comments.append(
'{0} would be moved from index {1} to {2}.'.format(
name, old_index, index
)
)
ret['changes'] = _changes(old_index, index)
return _format_comments(ret, comments)
else:
ret['changes']['added'] = '{0} was added at index {1}'.format(name, index)
return ret
# Directory does not exist in PATH
if __opts__['test']:
ret['result'] = None
comments.append(
'{0} would be added to the PATH{1}.'.format(
name,
' at index {0}'.format(index) if index is not None else ''
)
)
ret['changes'] = _changes(old_index, index)
return _format_comments(ret, comments)
try:
ret['result'] = __salt__['win_path.add'](name, index=index, rehash=False)
except Exception as exc:
comments.append('Encountered error: {0}.'.format(exc))
ret['result'] = False
if ret['result']:
ret['result'] = __salt__['win_path.rehash']()
if not ret['result']:
comments.append(
'Updated registry with new PATH, but failed to rehash.'
)
new_index = _index()
if ret['result']:
# If we have not already determined a False result based on the return
# from either win_path.add or win_path.rehash, check the new_index.
ret['result'] = new_index is not None \
if index is None \
else index == new_index
if index is not None and old_index is not None:
comments.append(
'{0} {1} from index {2} to {3}.'.format(
'Moved' if ret['result'] else 'Failed to move',
name,
old_index,
index
)
)
else:
comments.append(
'{0} {1} to the PATH{2}.'.format(
'Added' if ret['result'] else 'Failed to add',
name,
' at index {0}'.format(index) if index else ''
)
)
if old_index != new_index:
ret['changes'] = _changes(old_index, new_index)
return _format_comments(ret, comments)

View File

@ -67,26 +67,41 @@ def compare_lists(old=None, new=None):
return ret
def decode(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Generic function which will decode whichever type is passed, if necessary
If `strict` is True, and `keep` is False, and we fail to decode, a
UnicodeDecodeError will be raised. Passing `keep` as True allows for the
original value to silently be returned in cases where decoding fails. This
can be useful for cases where the data passed to this function is likely to
contain binary blobs, such as in the case of cp.recv.
'''
if isinstance(data, collections.Mapping):
return decode_dict(data, encoding, errors, preserve_dict_class, preserve_tuples)
return decode_dict(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, list):
return decode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
return decode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, tuple):
return decode_tuple(data, encoding, errors, preserve_dict_class) \
return decode_tuple(data, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
return salt.utils.stringutils.to_unicode(data, encoding, errors)
except TypeError:
return data
pass
except UnicodeDecodeError:
if not keep:
raise
return data
def decode_dict(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode_dict(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Decode all string values to Unicode
'''
@ -94,114 +109,158 @@ def decode_dict(data, encoding=None, errors='strict', preserve_dict_class=False,
rv = data.__class__() if preserve_dict_class else {}
for key, value in six.iteritems(data):
if isinstance(key, tuple):
key = decode_tuple(key, encoding, errors, preserve_dict_class) \
key = decode_tuple(key, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(key, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(key, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
key = salt.utils.stringutils.to_unicode(key, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
if isinstance(value, list):
value = decode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = decode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, tuple):
value = decode_tuple(value, encoding, errors, preserve_dict_class) \
value = decode_tuple(value, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, collections.Mapping):
value = decode_dict(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = decode_dict(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
value = salt.utils.stringutils.to_unicode(value, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
rv[key] = value
return rv
def decode_list(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def decode_list(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Decode all string values to Unicode
'''
rv = []
for item in data:
if isinstance(item, list):
item = decode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = decode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, tuple):
item = decode_tuple(item, encoding, errors, preserve_dict_class) \
item = decode_tuple(item, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else decode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
else decode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, collections.Mapping):
item = decode_dict(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = decode_dict(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
item = salt.utils.stringutils.to_unicode(item, encoding, errors)
except TypeError:
pass
except UnicodeDecodeError:
if not keep:
raise
rv.append(item)
return rv
def decode_tuple(data, encoding=None, errors='strict', preserve_dict_class=False):
def decode_tuple(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False):
'''
Decode all string values to Unicode
'''
return tuple(decode_list(data, encoding, errors, preserve_dict_class, True))
return tuple(
decode_list(data, encoding, errors, keep, preserve_dict_class, True))
def encode(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Generic function which will encode whichever type is passed, if necessary
If `strict` is True, and `keep` is False, and we fail to encode, a
UnicodeEncodeError will be raised. Passing `keep` as True allows for the
original value to silently be returned in cases where encoding fails. This
can be useful for cases where the data passed to this function is likely to
contain binary blobs.
'''
if isinstance(data, collections.Mapping):
return encode_dict(data, encoding, errors, preserve_dict_class, preserve_tuples)
return encode_dict(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, list):
return encode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
return encode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(data, tuple):
return encode_tuple(data, encoding, errors, preserve_dict_class) \
return encode_tuple(data, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(data, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(data, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
return salt.utils.stringutils.to_bytes(data, encoding, errors)
except TypeError:
return data
pass
except UnicodeEncodeError:
if not keep:
raise
return data
@jinja_filter('json_decode_dict') # Remove this for Neon
@jinja_filter('json_encode_dict')
def encode_dict(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode_dict(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Encode all string values to bytes
'''
rv = data.__class__() if preserve_dict_class else {}
for key, value in six.iteritems(data):
if isinstance(key, tuple):
key = encode_tuple(key, encoding, errors, preserve_dict_class) \
key = encode_tuple(key, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(key, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(key, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
key = salt.utils.stringutils.to_bytes(key, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
if isinstance(value, list):
value = encode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = encode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, tuple):
value = encode_tuple(value, encoding, errors, preserve_dict_class) \
value = encode_tuple(value, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(value, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(value, collections.Mapping):
value = encode_dict(value, encoding, errors, preserve_dict_class, preserve_tuples)
value = encode_dict(value, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
value = salt.utils.stringutils.to_bytes(value, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
rv[key] = value
return rv
@ -209,35 +268,44 @@ def encode_dict(data, encoding=None, errors='strict', preserve_dict_class=False,
@jinja_filter('json_decode_list') # Remove this for Neon
@jinja_filter('json_encode_list')
def encode_list(data, encoding=None, errors='strict', preserve_dict_class=False, preserve_tuples=False):
def encode_list(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Encode all string values to bytes
'''
rv = []
for item in data:
if isinstance(item, list):
item = encode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = encode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, tuple):
item = encode_tuple(item, encoding, errors, preserve_dict_class) \
item = encode_tuple(item, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(item, encoding, errors, preserve_dict_class, preserve_tuples)
else encode_list(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
elif isinstance(item, collections.Mapping):
item = encode_dict(item, encoding, errors, preserve_dict_class, preserve_tuples)
item = encode_dict(item, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
item = salt.utils.stringutils.to_bytes(item, encoding, errors)
except TypeError:
pass
except UnicodeEncodeError:
if not keep:
raise
rv.append(item)
return rv
def encode_tuple(data, encoding=None, errors='strict', preserve_dict_class=False):
def encode_tuple(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False):
'''
Encode all string values to Unicode
'''
return tuple(encode_list(data, encoding, errors, preserve_dict_class, True))
return tuple(
encode_list(data, encoding, errors, keep, preserve_dict_class, True))
@jinja_filter('exactly_n_true')

View File

@ -14,6 +14,7 @@ from collections import defaultdict
# Import salt libs
import salt.utils.args
import salt.utils.data
from salt.exceptions import CommandExecutionError, SaltConfigurationError
from salt.log import LOG_LEVELS
@ -579,3 +580,19 @@ def ignores_kwargs(*kwarg_names):
return fn(*args, **kwargs_filtered)
return __ignores_kwargs
return _ignores_kwargs
def ensure_unicode_args(function):
'''
Decodes all arguments passed to the wrapped function
'''
@wraps(function)
def wrapped(*args, **kwargs):
if six.PY2:
return function(
*salt.utils.data.decode_list(args),
**salt.utils.data.decode_dict(kwargs)
)
else:
return function(*args, **kwargs)
return wrapped

View File

@ -14,7 +14,6 @@ import posixpath
import re
import string
import struct
import sys
# Import Salt libs
import salt.utils.args
@ -202,7 +201,9 @@ def which(exe=None):
# executable in cwd or fullpath
return exe
ext_list = os.environ.get('PATHEXT', '.EXE').split(';')
ext_list = salt.utils.stringutils.to_str(
os.environ.get('PATHEXT', str('.EXE'))
).split(str(';'))
@real_memoize
def _exe_has_ext():
@ -212,8 +213,13 @@ def which(exe=None):
'''
for ext in ext_list:
try:
pattern = r'.*\.' + ext.lstrip('.') + r'$'
re.match(pattern, exe, re.I).groups()
pattern = r'.*\.{0}$'.format(
salt.utils.stringutils.to_unicode(ext).lstrip('.')
)
re.match(
pattern,
salt.utils.stringutils.to_unicode(exe),
re.I).groups()
return True
except AttributeError:
continue
@ -221,13 +227,17 @@ def which(exe=None):
# Enhance POSIX path for the reliability at some environments, when $PATH is changing
# This also keeps order, where 'first came, first win' for cases to find optional alternatives
search_path = os.environ.get('PATH') and os.environ['PATH'].split(os.pathsep) or list()
for default_path in ['/bin', '/sbin', '/usr/bin', '/usr/sbin', '/usr/local/bin']:
if default_path not in search_path:
search_path.append(default_path)
os.environ['PATH'] = os.pathsep.join(search_path)
system_path = salt.utils.stringutils.to_unicode(os.environ.get('PATH', ''))
search_path = system_path.split(os.pathsep)
if not salt.utils.platform.is_windows():
search_path.extend([
x for x in ('/bin', '/sbin', '/usr/bin',
'/usr/sbin', '/usr/local/bin')
if x not in search_path
])
for path in search_path:
full_path = os.path.join(path, exe)
full_path = join(path, exe)
if _is_executable_file_or_link(full_path):
return full_path
elif salt.utils.platform.is_windows() and not _exe_has_ext():
@ -296,27 +306,12 @@ def join(*parts, **kwargs):
# No args passed to func
return ''
root = salt.utils.stringutils.to_unicode(root)
if not parts:
ret = root
else:
stripped = [p.lstrip(os.sep) for p in parts]
try:
ret = pathlib.join(root, *stripped)
except UnicodeDecodeError:
# This is probably Python 2 and one of the parts contains unicode
# characters in a bytestring. First try to decode to the system
# encoding.
try:
enc = __salt_system_encoding__
except NameError:
enc = sys.stdin.encoding or sys.getdefaultencoding()
try:
ret = pathlib.join(root.decode(enc),
*[x.decode(enc) for x in stripped])
except UnicodeDecodeError:
# Last resort, try UTF-8
ret = pathlib.join(root.decode('UTF-8'),
*[x.decode('UTF-8') for x in stripped])
ret = pathlib.join(root, *salt.utils.data.decode(stripped))
return pathlib.normpath(ret)

View File

@ -6,6 +6,9 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.case import ModuleCase
# Import 3rd-party libs
from salt.ext import six
class StdTest(ModuleCase):
'''
@ -85,10 +88,10 @@ class StdTest(ModuleCase):
'inner': 'value'}
)
data = ret['minion']['ret']
self.assertIn('str', data['args'][0])
self.assertIn(six.text_type.__name__, data['args'][0])
self.assertIn('int', data['args'][1])
self.assertIn('dict', data['kwargs']['outer'])
self.assertIn('str', data['kwargs']['inner'])
self.assertIn(six.text_type.__name__, data['kwargs']['inner'])
def test_full_return_kwarg(self):
ret = self.client.cmd('minion', 'test.ping', full_return=True)

View File

@ -19,6 +19,8 @@ from tests.support.helpers import expensiveTest, generate_random_name
INSTANCE_NAME = generate_random_name('CLOUD-TEST-')
PROVIDER_NAME = 'ec2'
EC2_TIMEOUT = 1000
class EC2Test(ShellCase):
'''
@ -81,18 +83,21 @@ class EC2Test(ShellCase):
Tests creating and deleting an instance on EC2 (classic)
'''
# create the instance
instance = self.run_cloud('-p ec2-test {0}'.format(INSTANCE_NAME), timeout=500)
instance = self.run_cloud('-p ec2-test {0}'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
ret_str = '{0}:'.format(INSTANCE_NAME)
# check if instance returned with salt installed
try:
self.assertIn(ret_str, instance)
except AssertionError:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
raise
# delete the instance
delete = self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
delete = self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
ret_str = ' shutting-down'
# check if deletion was performed appropriately
@ -107,17 +112,19 @@ class EC2Test(ShellCase):
'''
# create the instance
rename = INSTANCE_NAME + '-rename'
instance = self.run_cloud('-p ec2-test {0} --no-deploy'.format(INSTANCE_NAME), timeout=500)
instance = self.run_cloud('-p ec2-test {0} --no-deploy'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
ret_str = '{0}:'.format(INSTANCE_NAME)
# check if instance returned
try:
self.assertIn(ret_str, instance)
except AssertionError:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
raise
change_name = self.run_cloud('-a rename {0} newname={1} --assume-yes'.format(INSTANCE_NAME, rename), timeout=500)
change_name = self.run_cloud('-a rename {0} newname={1} --assume-yes'.format(INSTANCE_NAME, rename), timeout=EC2_TIMEOUT)
check_rename = self.run_cloud('-a show_instance {0} --assume-yes'.format(rename), [rename])
exp_results = [' {0}:'.format(rename), ' size:',
@ -126,11 +133,13 @@ class EC2Test(ShellCase):
for result in exp_results:
self.assertIn(result, check_rename[0])
except AssertionError:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)
raise
# delete the instance
delete = self.run_cloud('-d {0} --assume-yes'.format(rename), timeout=500)
delete = self.run_cloud('-d {0} --assume-yes'.format(rename),
timeout=EC2_TIMEOUT)
ret_str = ' shutting-down'
# check if deletion was performed appropriately
@ -145,4 +154,5 @@ class EC2Test(ShellCase):
# if test instance is still present, delete it
if ret_str in query:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME),
timeout=EC2_TIMEOUT)

View File

@ -0,0 +1 @@

View File

@ -7,6 +7,7 @@ import logging
import os
import signal
import subprocess
import textwrap
# Import Salt Testing libs
from tests.support.case import ModuleCase
@ -318,7 +319,7 @@ class SystemModuleTest(ModuleCase):
@skip_if_not_root
def test_set_computer_desc(self):
'''
Test setting the system hostname
Test setting the computer description
'''
self._save_machine_info()
desc = "test"
@ -328,6 +329,28 @@ class SystemModuleTest(ModuleCase):
self.assertTrue(ret)
self.assertIn(desc, computer_desc)
@destructiveTest
@skip_if_not_root
def test_set_computer_desc_multiline(self):
'''
Test setting the computer description with a multiline string with tabs
and double-quotes.
'''
self._save_machine_info()
desc = textwrap.dedent('''\
'First Line
\tSecond Line: 'single-quoted string'
\t\tThird Line: "double-quoted string with unicode: питон"''')
ret = self.run_function('system.set_computer_desc', [desc])
# self.run_function returns the serialized return, we need to convert
# back to unicode to compare to desc. in the assertIn below.
computer_desc = salt.utils.stringutils.to_unicode(
self.run_function('system.get_computer_desc')
)
self.assertTrue(ret)
self.assertIn(desc, computer_desc)
@skip_if_not_root
def test_has_hwclock(self):
'''

View File

@ -1170,6 +1170,27 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
finally:
shutil.rmtree(name, ignore_errors=True)
def test_recurse_issue_40578(self):
'''
This ensures that the state doesn't raise an exception when it
encounters a file with a unicode filename in the process of invoking
file.source_list.
'''
issue_dir = 'issue-40578'
name = os.path.join(TMP, issue_dir)
try:
ret = self.run_state('file.recurse',
name=name,
source='salt://соль')
self.assertSaltTrueReturn(ret)
self.assertEqual(
sorted(salt.utils.data.decode(os.listdir(name))),
sorted(['foo.txt', 'спам.txt', 'яйца.txt'])
)
finally:
shutil.rmtree(name, ignore_errors=True)
def test_replace(self):
'''
file.replace
@ -2214,8 +2235,6 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
'+마지막 행\n'
)
diff = salt.utils.stringutils.to_str(diff)
# using unicode.encode('utf-8') we should get the same as
# an utf-8 string
# future_lint: disable=blacklisted-function
expected = {
str('file_|-some-utf8-file-create_|-{0}_|-managed').format(test_file_encoded): {

View File

@ -5,6 +5,7 @@ Test module for syslog_ng
# Import Python modules
from __future__ import absolute_import, unicode_literals, print_function
import os
from textwrap import dedent
# Import Salt Testing libs
@ -13,7 +14,6 @@ from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, MagicMock, patch
# Import Salt libs
import salt.utils.path
import salt.modules.syslog_ng as syslog_ng
_VERSION = "3.6.0alpha0"
@ -58,6 +58,12 @@ _SYSLOG_NG_CTL_NOT_INSTALLED_RETURN_VALUE = {
@skipIf(NO_MOCK, NO_MOCK_REASON)
class SyslogNGTestCase(TestCase, LoaderModuleMockMixin):
# pylint: disable=blacklisted-function
orig_env = {str('PATH'): str('/foo:/bar')}
bin_dir = str('/baz')
mocked_env = {str('PATH'): str('/foo:/bar:/baz')}
# pylint: enable=blacklisted-function
def setup_loader_modules(self):
return {syslog_ng: {}}
@ -199,83 +205,139 @@ class SyslogNGTestCase(TestCase, LoaderModuleMockMixin):
'''), b)
def test_version(self):
mock_return_value = {"retcode": 0, 'stdout': VERSION_OUTPUT}
expected_output = {"retcode": 0, "stdout": "3.6.0alpha0"}
mock_args = "syslog-ng -V"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.version,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': VERSION_OUTPUT}
expected_output = {'retcode': 0, 'stdout': _VERSION}
cmd_args = ['syslog-ng', '-V']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.version()
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.version(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_stats(self):
mock_return_value = {"retcode": 0, 'stdout': STATS_OUTPUT}
expected_output = {"retcode": 0, "stdout": STATS_OUTPUT}
mock_args = "syslog-ng-ctl stats"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.stats,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': STATS_OUTPUT}
cmd_args = ['syslog-ng-ctl', 'stats']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.stats()
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.stats(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_modules(self):
mock_return_value = {"retcode": 0, 'stdout': VERSION_OUTPUT}
expected_output = {"retcode": 0, "stdout": _MODULES}
mock_args = "syslog-ng -V"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.modules,
expected_output=expected_output)
cmd_ret = {'retcode': 0, 'stdout': VERSION_OUTPUT}
expected_output = {'retcode': 0, 'stdout': _MODULES}
cmd_args = ['syslog-ng', '-V']
def test_config_test_ok(self):
mock_return_value = {"retcode": 0, "stderr": "", "stdout": "Syslog-ng startup text..."}
mock_args = "syslog-ng --syntax-only"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
expected_output=mock_return_value)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.modules()
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
def test_config_test_fails(self):
mock_return_value = {"retcode": 1, 'stderr': "Syntax error...", "stdout": ""}
mock_args = "syslog-ng --syntax-only"
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
expected_output=mock_return_value)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.modules(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, expected_output)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_config_test(self):
cmd_ret = {'retcode': 0, 'stderr': '', 'stdout': 'Foo'}
cmd_args = ['syslog-ng', '--syntax-only']
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.config_test()
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
result = syslog_ng.config_test(syslog_ng_sbin_dir=self.bin_dir)
self.assertEqual(result, cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)
def test_config_test_cfgfile(self):
cfgfile = "/path/to/syslog-ng.conf"
mock_return_value = {"retcode": 1, 'stderr': "Syntax error...", "stdout": ""}
mock_args = "syslog-ng --syntax-only --cfgfile={0}".format(cfgfile)
self._assert_template(mock_args,
mock_return_value,
function_to_call=syslog_ng.config_test,
function_args={"cfgfile": cfgfile},
expected_output=mock_return_value)
cfgfile = '/path/to/syslog-ng.conf'
cmd_ret = {'retcode': 1, 'stderr': 'Syntax error...', 'stdout': ''}
cmd_args = ['syslog-ng', '--syntax-only',
'--cfgfile={0}'.format(cfgfile)]
def _assert_template(self,
mock_function_args,
mock_return_value,
function_to_call,
expected_output,
function_args=None):
if function_args is None:
function_args = {}
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
self.assertEqual(syslog_ng.config_test(cfgfile=cfgfile), cmd_ret)
cmd_mock.assert_called_once_with(
cmd_args,
env=None,
python_shell=False
)
installed = True
if not salt.utils.path.which("syslog-ng"):
installed = False
if "syslog-ng-ctl" in mock_function_args:
expected_output = _SYSLOG_NG_CTL_NOT_INSTALLED_RETURN_VALUE
else:
expected_output = _SYSLOG_NG_NOT_INSTALLED_RETURN_VALUE
mock_function = MagicMock(return_value=mock_return_value)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': mock_function}):
got = function_to_call(**function_args)
self.assertEqual(expected_output, got)
if installed:
self.assertTrue(mock_function.called)
self.assertEqual(len(mock_function.call_args), 2)
mock_param = mock_function.call_args
self.assertEqual(mock_param[0][0], mock_function_args.split())
cmd_mock = MagicMock(return_value=cmd_ret)
with patch.dict(syslog_ng.__salt__, {'cmd.run_all': cmd_mock}), \
patch.dict(os.environ, self.orig_env):
self.assertEqual(
syslog_ng.config_test(
syslog_ng_sbin_dir=self.bin_dir,
cfgfile=cfgfile
),
cmd_ret
)
cmd_mock.assert_called_once_with(
cmd_args,
env=self.mocked_env,
python_shell=False
)

View File

@ -5,6 +5,7 @@
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
import os
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
@ -18,6 +19,7 @@ from tests.support.mock import (
# Import Salt Libs
import salt.modules.win_path as win_path
import salt.utils.stringutils
class MockWin32API(object):
@ -58,54 +60,223 @@ class WinPathTestCase(TestCase, LoaderModuleMockMixin):
'HWND_BROADCAST': MagicMock,
'WM_SETTINGCHANGE': MagicMock}}
def __init__(self, *args, **kwargs):
super(WinPathTestCase, self).__init__(*args, **kwargs)
self.pathsep = str(';') # future lint: disable=blacklisted-function
def assert_call_matches(self, mock_obj, new_path):
mock_obj.assert_called_once_with(
win_path.HIVE,
win_path.KEY,
win_path.VNAME,
self.pathsep.join(new_path),
win_path.VTYPE
)
def assert_path_matches(self, env, new_path):
self.assertEqual(
env['PATH'],
salt.utils.stringutils.to_str(self.pathsep.join(new_path))
)
def test_rehash(self):
'''
Test to rehash the Environment variables
Test to rehash the Environment variables
'''
self.assertTrue(win_path.rehash())
def test_get_path(self):
'''
Test to Returns the system path
Test to Returns the system path
'''
mock = MagicMock(return_value={'vdata': 'c:\\salt'})
mock = MagicMock(return_value={'vdata': 'C:\\Salt'})
with patch.dict(win_path.__salt__, {'reg.read_value': mock}):
self.assertListEqual(win_path.get_path(), ['c:\\salt'])
self.assertListEqual(win_path.get_path(), ['C:\\Salt'])
def test_exists(self):
'''
Test to check if the directory is configured
Test to check if the directory is configured
'''
mock = MagicMock(return_value='c:\\salt')
with patch.object(win_path, 'get_path', mock):
self.assertTrue(win_path.exists("c:\\salt"))
get_mock = MagicMock(return_value=['C:\\Foo', 'C:\\Bar'])
with patch.object(win_path, 'get_path', get_mock):
# Ensure case insensitivity respected
self.assertTrue(win_path.exists('C:\\FOO'))
self.assertTrue(win_path.exists('c:\\foo'))
self.assertFalse(win_path.exists('c:\\mystuff'))
def test_add(self):
'''
Test to add the directory to the SYSTEM path
Test to add the directory to the SYSTEM path
'''
mock_get = MagicMock(return_value=['c:\\salt'])
with patch.object(win_path, 'get_path', mock_get):
mock_set = MagicMock(return_value=True)
with patch.dict(win_path.__salt__, {'reg.set_value': mock_set}):
mock_rehash = MagicMock(side_effect=[True, False])
with patch.object(win_path, 'rehash', mock_rehash):
self.assertTrue(win_path.add("c:\\salt", 1))
orig_path = ('C:\\Foo', 'C:\\Bar')
self.assertFalse(win_path.add("c:\\salt", 1))
def _env(path):
return {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
self.pathsep.join(path)
)
}
def _run(name, index=None, retval=True, path=None):
if path is None:
path = orig_path
env = _env(path)
mock_get = MagicMock(return_value=list(path))
mock_set = MagicMock(return_value=retval)
with patch.object(win_path, 'PATHSEP', self.pathsep), \
patch.object(win_path, 'get_path', mock_get), \
patch.object(os, 'environ', env), \
patch.dict(win_path.__salt__, {'reg.set_value': mock_set}), \
patch.object(win_path, 'rehash', MagicMock(return_value=True)):
return win_path.add(name, index), env, mock_set
# Test a successful reg update
ret, env, mock_set = _run('c:\\salt', retval=True)
new_path = ('C:\\Foo', 'C:\\Bar', 'c:\\salt')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test an unsuccessful reg update
ret, env, mock_set = _run('c:\\salt', retval=False)
new_path = ('C:\\Foo', 'C:\\Bar', 'c:\\salt')
self.assertFalse(ret)
self.assert_call_matches(mock_set, new_path)
# The local path should still have been modified even
# though reg.set_value failed.
self.assert_path_matches(env, new_path)
# Test adding with a custom index
ret, env, mock_set = _run('c:\\salt', index=1, retval=True)
new_path = ('C:\\Foo', 'c:\\salt', 'C:\\Bar')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and
# no index provided. The path should remain unchanged and we should not
# update the registry.
ret, env, mock_set = _run('c:\\foo', retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which does not match the current index. The
# match should be removed, and the path should be added to the end of
# the list.
ret, env, mock_set = _run('c:\\foo', index=-1, retval=True)
new_path = ('C:\\Bar', 'c:\\foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which matches the current index. No changes
# should be made.
ret, env, mock_set = _run('c:\\foo', index=-2, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which is larger than the size of the list. No
# changes should be made, since in these cases we assume an index of 0,
# and the case-insensitive match is also at index 0.
ret, env, mock_set = _run('c:\\foo', index=-5, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which is larger than the size of the list.
# The match should be removed from its current location and inserted at
# the beginning, since when a negative index is larger than the list,
# we put it at the beginning of the list.
ret, env, mock_set = _run('c:\\bar', index=-5, retval=True)
new_path = ('c:\\bar', 'C:\\Foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test adding path with a case-insensitive match already present, and a
# negative index provided which matches the current index. The path
# should remain unchanged and we should not update the registry.
ret, env, mock_set = _run('c:\\bar', index=-1, retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)
# Test adding path with a case-insensitive match already present, and
# an index provided which does not match the current index, and is also
# larger than the size of the PATH list. The match should be removed,
# and the path should be added to the end of the list.
ret, env, mock_set = _run('c:\\foo', index=5, retval=True)
new_path = ('C:\\Bar', 'c:\\foo')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
def test_remove(self):
'''
Test to remove the directory from the SYSTEM path
Test win_path.remove
'''
mock_get = MagicMock(side_effect=[[1], ['c:\\salt'], ['c:\\salt']])
with patch.object(win_path, 'get_path', mock_get):
self.assertTrue(win_path.remove("c:\\salt"))
orig_path = ('C:\\Foo', 'C:\\Bar', 'C:\\Baz')
mock_set = MagicMock(side_effect=[True, False])
with patch.dict(win_path.__salt__, {'reg.set_value': mock_set}):
mock_rehash = MagicMock(return_value="Salt")
with patch.object(win_path, 'rehash', mock_rehash):
self.assertEqual(win_path.remove("c:\\salt"), "Salt")
def _env(path):
return {
str('PATH'): salt.utils.stringutils.to_str( # future lint: disable=blacklisted-function
self.pathsep.join(path)
)
}
self.assertFalse(win_path.remove("c:\\salt"))
def _run(name='c:\\salt', index=None, retval=True, path=None):
if path is None:
path = orig_path
env = _env(path)
mock_get = MagicMock(return_value=list(path))
mock_set = MagicMock(return_value=retval)
with patch.object(win_path, 'PATHSEP', self.pathsep), \
patch.object(win_path, 'get_path', mock_get), \
patch.object(os, 'environ', env), \
patch.dict(win_path.__salt__, {'reg.set_value': mock_set}), \
patch.object(win_path, 'rehash', MagicMock(return_value=True)):
return win_path.remove(name), env, mock_set
# Test a successful reg update
ret, env, mock_set = _run('C:\\Bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test a successful reg update with a case-insensitive match
ret, env, mock_set = _run('c:\\bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test a successful reg update with multiple case-insensitive matches.
# All matches should be removed.
old_path = orig_path + ('C:\\BAR',)
ret, env, mock_set = _run('c:\\bar', retval=True)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertTrue(ret)
self.assert_call_matches(mock_set, new_path)
self.assert_path_matches(env, new_path)
# Test an unsuccessful reg update
ret, env, mock_set = _run('c:\\bar', retval=False)
new_path = ('C:\\Foo', 'C:\\Baz')
self.assertFalse(ret)
self.assert_call_matches(mock_set, new_path)
# The local path should still have been modified even
# though reg.set_value failed.
self.assert_path_matches(env, new_path)
# Test when no match found
ret, env, mock_set = _run('C:\\NotThere', retval=True)
self.assertTrue(ret)
mock_set.assert_not_called()
self.assert_path_matches(env, orig_path)

View File

@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Rahul Handay <rahulha@saltstack.com>`
Tests for win_path states
'''
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
from __future__ import absolute_import, print_function, unicode_literals
import copy
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
Mock,
MagicMock,
patch,
NO_MOCK,
@ -19,64 +21,512 @@ from tests.support.mock import (
# Import Salt Libs
import salt.states.win_path as win_path
NAME = 'salt'
@skipIf(NO_MOCK, NO_MOCK_REASON)
class WinPathTestCase(TestCase, LoaderModuleMockMixin):
'''
Validate the win_path state
Validate the win_path state
'''
def setup_loader_modules(self):
return {win_path: {}}
def test_absent(self):
'''
Test to remove the directory from the SYSTEM path
Test various cases for win_path.absent
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''}
mock = MagicMock(return_value=False)
with patch.dict(win_path.__salt__, {"win_path.exists": mock}):
with patch.dict(win_path.__opts__, {"test": True}):
ret.update({'comment': 'salt is not in the PATH'})
self.assertDictEqual(win_path.absent('salt'), ret)
ret_base = {'name': NAME, 'result': True, 'changes': {}}
with patch.dict(win_path.__opts__, {"test": False}):
mock = MagicMock(return_value=True)
with patch.dict(win_path.__salt__, {"win_path.remove": mock}):
ret.update({'result': True})
self.assertDictEqual(win_path.absent('salt'), ret)
def _mock(retval):
# Return a new MagicMock for each test case
return MagicMock(side_effect=retval)
def test_exists(self):
# We don't really want to run the remove func
with patch.dict(win_path.__salt__, {'win_path.remove': Mock()}):
# Test mode OFF
with patch.dict(win_path.__opts__, {'test': False}):
# Test already absent
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} is not in the PATH'.format(NAME)
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test successful removal
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True, False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = 'Removed {0} from the PATH'.format(NAME)
ret['changes']['removed'] = NAME
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test unsucessful removal
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True, True])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = 'Failed to remove {0} from the PATH'.format(NAME)
ret['result'] = False
self.assertDictEqual(win_path.absent(NAME), ret)
# Test mode ON
with patch.dict(win_path.__opts__, {'test': True}):
# Test already absent
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([False])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} is not in the PATH'.format(NAME)
ret['result'] = True
self.assertDictEqual(win_path.absent(NAME), ret)
# Test the test-mode return
with patch.dict(win_path.__salt__, {'win_path.exists': _mock([True])}):
ret = copy.deepcopy(ret_base)
ret['comment'] = '{0} would be removed from the PATH'.format(NAME)
ret['result'] = None
self.assertDictEqual(win_path.absent(NAME), ret)
def test_exists_invalid_index(self):
'''
Test to add the directory to the system PATH at index location
Tests win_path.exists when a non-integer index is specified.
'''
ret = {'name': 'salt',
'changes': {},
'result': True,
'comment': ''}
mock = MagicMock(return_value=['Salt', 'Saltdude'])
with patch.dict(win_path.__salt__, {"win_path.get_path": mock}):
mock = MagicMock(side_effect=['Saltdude', 'Saltdude', '/Saltdude',
'Saltdude'])
with patch.object(win_path, '_normalize_dir', mock):
ret.update({'comment': 'salt is already present in the'
' PATH at the right location'})
self.assertDictEqual(win_path.exists('salt', 1), ret)
ret = win_path.exists(NAME, index='foo')
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Index must be an integer'
}
)
self.assertDictEqual(win_path.exists('salt'), ret)
def test_exists_add_no_index_success(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (successful run).
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__opts__, {"test": True}):
ret.update({'comment': '', 'result': None,
'changes': {'added': 'salt will be'
' added at index 2'}})
self.assertDictEqual(win_path.exists('salt'), ret)
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
with patch.dict(win_path.__opts__, {"test": False}):
mock = MagicMock(return_value=False)
with patch.dict(win_path.__salt__, {"win_path.add": mock}):
ret.update({'comment': 'salt is already present in the'
' PATH at the right location',
'result': True, 'changes': {}})
self.assertDictEqual(win_path.exists('salt'), ret)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': None, 'new': 3}},
'result': True,
'comment': 'Added {0} to the PATH.'.format(NAME)
}
)
def test_exists_add_no_index_failure(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (failed run).
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to add {0} to the PATH.'.format(NAME)
}
)
def test_exists_add_no_index_failure_exception(self):
'''
Tests win_path.exists when the directory isn't already in the PATH and
no index is specified (failed run due to exception).
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz'],
['foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME)
add_mock.assert_called_once_with(NAME, index=None, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to add {0} to the PATH.'.format(NAME)
}
)
def test_exists_change_index_success(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (successful run).
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
[NAME, 'foo', 'bar', 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': 3, 'new': 0}},
'result': True,
'comment': 'Moved {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_success(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (successful run).
This tests a negative index.
'''
add_mock = MagicMock(return_value=True)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
self.assert_called_once(rehash_mock)
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': -2, 'new': -1}},
'result': True,
'comment': 'Moved {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_add_exception(self):
'''
Tests win_path.exists when the directory is already in the PATH but an
exception is raised when we attempt to add the key to its new location.
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
['foo', 'bar', 'baz', NAME],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to move {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_add_exception(self):
'''
Tests win_path.exists when the directory is already in the PATH but an
exception is raised when we attempt to add the key to its new location.
This tests a negative index.
'''
add_mock = MagicMock(side_effect=Exception('Global Thermonuclear War'))
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', NAME, 'baz'],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Encountered error: Global Thermonuclear War. '
'Failed to move {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_failure(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (failed run).
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
['foo', 'bar', 'baz', NAME]
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_called_once_with(NAME, index=0, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to move {0} from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_failure(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (failed run).
This tests a negative index.
'''
add_mock = MagicMock(return_value=False)
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
['foo', 'bar', NAME, 'baz']
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': False}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_called_once_with(NAME, index=-1, rehash=False)
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': False,
'comment': 'Failed to move {0} from index -2 to -1.'.format(NAME)
}
)
def test_exists_change_index_test_mode(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (test mode enabled).
'''
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', 'baz', NAME],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': True}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=0)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': 3, 'new': 0}},
'result': None,
'comment': '{0} would be moved from index 3 to 0.'.format(NAME)
}
)
def test_exists_change_negative_index_test_mode(self):
'''
Tests win_path.exists when the directory is already in the PATH and
needs to be moved to a different position (test mode enabled).
'''
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[
['foo', 'bar', NAME, 'baz'],
]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': True}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=-1)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {'index': {'old': -2, 'new': -1}},
'result': None,
'comment': '{0} would be moved from index -2 to -1.'.format(NAME)
}
)
def _test_exists_add_already_present(self, index, test_mode):
'''
Tests win_path.exists when the directory already exists in the PATH.
Helper function to test both with and without and index, and with test
mode both disabled and enabled.
'''
current_path = ['foo', 'bar', 'baz']
if index is None:
current_path.append(NAME)
else:
pos = index if index >= 0 else len(current_path) + index + 1
current_path.insert(pos, NAME)
add_mock = Mock()
rehash_mock = MagicMock(return_value=True)
dunder_salt = {
'win_path.get_path': MagicMock(side_effect=[current_path]),
'win_path.add': add_mock,
'win_path.rehash': rehash_mock,
}
dunder_opts = {'test': test_mode}
with patch.dict(win_path.__salt__, dunder_salt), \
patch.dict(win_path.__opts__, dunder_opts):
ret = win_path.exists(NAME, index=index)
add_mock.assert_not_called()
rehash_mock.assert_not_called()
self.assertDictEqual(
ret,
{
'name': NAME,
'changes': {},
'result': True,
'comment': '{0} already exists in the PATH{1}.'.format(
NAME,
' at index {0}'.format(index) if index is not None else ''
)
}
)
def test_exists_add_no_index_already_present(self):
self._test_exists_add_already_present(None, False)
def test_exists_add_no_index_already_present_test_mode(self):
self._test_exists_add_already_present(None, True)
def test_exists_add_index_already_present(self):
self._test_exists_add_already_present(1, False)
self._test_exists_add_already_present(2, False)
self._test_exists_add_already_present(-1, False)
self._test_exists_add_already_present(-2, False)
def test_exists_add_index_already_present_test_mode(self):
self._test_exists_add_already_present(1, True)
self._test_exists_add_already_present(2, True)
self._test_exists_add_already_present(-1, True)
self._test_exists_add_already_present(-2, True)

View File

@ -9,6 +9,7 @@ import logging
# Import Salt libs
import salt.utils.data
import salt.utils.data
from salt.utils.odict import OrderedDict
from tests.support.unit import TestCase, skipIf, LOREM_IPSUM
from tests.support.mock import patch, NO_MOCK, NO_MOCK_REASON
@ -16,6 +17,8 @@ from salt.ext.six.moves import builtins # pylint: disable=import-error,redefine
log = logging.getLogger(__name__)
_b = lambda x: x.encode('utf-8')
# Some randomized data that will not decode
BYTES = b'\x9c\xb1\xf7\xa3'
class DataTestCase(TestCase):
@ -27,14 +30,17 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, _b('спам'), True, False, None],
(987, 654.321, _b('яйца'), None, (True, False)),
BYTES,
[123, 456.789, _b('спам'), True, False, None, BYTES],
(987, 654.321, _b('яйца'), None, (True, False, BYTES)),
{_b('str_key'): _b('str_val'),
None: True, 123: 456.789,
None: True,
123: 456.789,
'blob': BYTES,
_b('subdict'): {'unicode_key': 'unicode_val',
_b('tuple'): (123, 'hello', _b('world'), True),
_b('list'): [456, _b('спам'), False]}},
OrderedDict([(_b('foo'), 'bar'), (123, 456)])
_b('tuple'): (123, 'hello', _b('world'), True, BYTES),
_b('list'): [456, _b('спам'), False, BYTES]}},
OrderedDict([(_b('foo'), 'bar'), (123, 456), ('blob', BYTES)])
]
def test_sorted_ignorecase(self):
@ -214,28 +220,49 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, 'спам', True, False, None],
(987, 654.321, 'яйца', None, (True, False)),
BYTES,
[123, 456.789, 'спам', True, False, None, BYTES],
(987, 654.321, 'яйца', None, (True, False, BYTES)),
{'str_key': 'str_val',
None: True, 123: 456.789,
None: True,
123: 456.789,
'blob': BYTES,
'subdict': {'unicode_key': 'unicode_val',
'tuple': (123, 'hello', 'world', True),
'list': [456, 'спам', False]}},
OrderedDict([('foo', 'bar'), (123, 456)])
'tuple': (123, 'hello', 'world', True, BYTES),
'list': [456, 'спам', False, BYTES]}},
OrderedDict([('foo', 'bar'), (123, 456), ('blob', BYTES)])
]
ret = salt.utils.data.decode(
self.test_data, encoding='utf-8', preserve_dict_class=True,
self.test_data,
encoding='utf-8',
keep=True,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
# The binary data in the data structure should fail to decode, even
# using the fallback, and raise an exception.
self.assertRaises(
UnicodeDecodeError,
salt.utils.data.decode,
self.test_data,
encoding='utf-8',
keep=False,
preserve_dict_class=True,
preserve_tuples=True)
# Now munge the expected data so that we get what we would expect if we
# disable preservation of dict class and tuples
expected[8] = [987, 654.321, 'яйца', None, [True, False]]
expected[9]['subdict']['tuple'] = [123, 'hello', 'world', True]
expected[10] = {'foo': 'bar', 123: 456}
expected[9] = [987, 654.321, 'яйца', None, [True, False, BYTES]]
expected[10]['subdict']['tuple'] = [123, 'hello', 'world', True, BYTES]
expected[11] = {'foo': 'bar', 123: 456, 'blob': BYTES}
ret = salt.utils.data.decode(
self.test_data, encoding='utf-8', preserve_dict_class=False,
self.test_data,
encoding='utf-8',
keep=True,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
@ -249,6 +276,14 @@ class DataTestCase(TestCase):
self.assertEqual(salt.utils.data.decode('foo'), 'foo')
self.assertEqual(salt.utils.data.decode(_b('bar')), 'bar')
# Test binary blob
self.assertEqual(salt.utils.data.decode(BYTES, keep=True), BYTES)
self.assertRaises(
UnicodeDecodeError,
salt.utils.data.decode,
BYTES,
keep=False)
@skipIf(NO_MOCK, NO_MOCK_REASON)
def test_decode_fallback(self):
'''
@ -270,27 +305,52 @@ class DataTestCase(TestCase):
True,
False,
None,
[123, 456.789, _b('спам'), True, False, None],
(987, 654.321, _b('яйца'), None, (True, False)),
BYTES,
[123, 456.789, _b('спам'), True, False, None, BYTES],
(987, 654.321, _b('яйца'), None, (True, False, BYTES)),
{_b('str_key'): _b('str_val'),
None: True, 123: 456.789,
None: True,
123: 456.789,
_b('blob'): BYTES,
_b('subdict'): {_b('unicode_key'): _b('unicode_val'),
_b('tuple'): (123, _b('hello'), _b('world'), True),
_b('list'): [456, _b('спам'), False]}},
OrderedDict([(_b('foo'), _b('bar')), (123, 456)])
_b('tuple'): (123, _b('hello'), _b('world'), True, BYTES),
_b('list'): [456, _b('спам'), False, BYTES]}},
OrderedDict([(_b('foo'), _b('bar')), (123, 456), (_b('blob'), BYTES)])
]
# Both keep=True and keep=False should work because the BYTES data is
# already bytes.
ret = salt.utils.data.encode(
self.test_data, preserve_dict_class=True, preserve_tuples=True)
self.test_data,
keep=True,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
ret = salt.utils.data.encode(
self.test_data,
keep=False,
preserve_dict_class=True,
preserve_tuples=True)
self.assertEqual(ret, expected)
# Now munge the expected data so that we get what we would expect if we
# disable preservation of dict class and tuples
expected[8] = [987, 654.321, _b('яйца'), None, [True, False]]
expected[9][_b('subdict')][_b('tuple')] = [123, _b('hello'), _b('world'), True]
expected[10] = {_b('foo'): _b('bar'), 123: 456}
expected[9] = [987, 654.321, _b('яйца'), None, [True, False, BYTES]]
expected[10][_b('subdict')][_b('tuple')] = [
123, _b('hello'), _b('world'), True, BYTES
]
expected[11] = {_b('foo'): _b('bar'), 123: 456, _b('blob'): BYTES}
ret = salt.utils.data.encode(
self.test_data, preserve_dict_class=False, preserve_tuples=False)
self.test_data,
keep=True,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
ret = salt.utils.data.encode(
self.test_data,
keep=False,
preserve_dict_class=False,
preserve_tuples=False)
self.assertEqual(ret, expected)
# Now test single non-string, non-data-structure items, these should
@ -303,6 +363,70 @@ class DataTestCase(TestCase):
self.assertEqual(salt.utils.data.encode('foo'), _b('foo'))
self.assertEqual(salt.utils.data.encode(_b('bar')), _b('bar'))
# Test binary blob, nothing should happen even when keep=False since
# the data is already bytes
self.assertEqual(salt.utils.data.encode(BYTES, keep=True), BYTES)
self.assertEqual(salt.utils.data.encode(BYTES, keep=False), BYTES)
def test_encode_keep(self):
'''
Whereas we tested the keep argument in test_decode, it is much easier
to do a more comprehensive test of keep in its own function where we
can force the encoding.
'''
unicode_str = 'питон'
encoding = 'ascii'
# Test single string
self.assertEqual(
salt.utils.data.encode(unicode_str, encoding, keep=True),
unicode_str)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
unicode_str,
encoding,
keep=False)
data = [
unicode_str,
[b'foo', [unicode_str], {b'key': unicode_str}, (unicode_str,)],
{b'list': [b'foo', unicode_str],
b'dict': {b'key': unicode_str},
b'tuple': (b'foo', unicode_str)},
([b'foo', unicode_str], {b'key': unicode_str}, (unicode_str,))
]
# Since everything was a bytestring aside from the bogus data, the
# return data should be identical. We don't need to test recursive
# decoding, that has already been tested in test_encode.
self.assertEqual(
salt.utils.data.encode(data, encoding,
keep=True, preserve_tuples=True),
data
)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
data,
encoding,
keep=False,
preserve_tuples=True)
for index, item in enumerate(data):
self.assertEqual(
salt.utils.data.encode(data[index], encoding,
keep=True, preserve_tuples=True),
data[index]
)
self.assertRaises(
UnicodeEncodeError,
salt.utils.data.encode,
data[index],
encoding,
keep=False,
preserve_tuples=True)
@skipIf(NO_MOCK, NO_MOCK_REASON)
def test_encode_fallback(self):
'''