Merge branch '2018.3' into merge-fluorine

Conflicts:
   - salt/states/win_lgpo.py
   - salt/utils/minions.py
This commit is contained in:
Gareth J. Greenaway 2019-01-01 13:50:01 -08:00
commit 09815e7f77
No known key found for this signature in database
GPG Key ID: 10B62F8A7CAD7A41
88 changed files with 735 additions and 206 deletions

View File

@ -98,7 +98,7 @@ pipeline {
stage('linting all') {
// perform a full linit if this is a merge forward and the change only lint passed.
when {
expression { return params.CHANGE_BRANCH =~ /(?i)^merge[._-]/ }
expression { return env.CHANGE_BRANCH =~ /(?i)^merge[._-]/ }
}
parallel {
stage('setup full') {

View File

@ -23,7 +23,7 @@ python-dateutil==2.6.1
python-gnupg==0.4.1
PyYAML==3.12
pyzmq==17.0.0
requests==2.18.4
requests==2.21.0
singledispatch==3.4.0.3
six==1.11.0
smmap==0.9.0

View File

@ -31,7 +31,7 @@ pythonnet==2.3.0
pywin32==223
PyYAML==3.12
pyzmq==16.0.3
requests==2.18.4
requests==2.21.0
singledispatch==3.4.0.3
smmap==0.9.0
timelib==0.2.4

View File

@ -230,81 +230,7 @@ class IPv6InterfaceScoped(ipaddress.IPv6Interface, IPv6AddressScoped):
self.hostmask = self.network.hostmask
def ip_address(address):
"""Take an IP string/int and return an object of the correct type.
Args:
address: A string or integer, the IP address. Either IPv4 or
IPv6 addresses may be supplied; integers less than 2**32 will
be considered to be IPv4 by default.
Returns:
An IPv4Address or IPv6Address object.
Raises:
ValueError: if the *address* passed isn't either a v4 or a v6
address
"""
try:
return ipaddress.IPv4Address(address)
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError) as err:
log.debug('Error while parsing IPv4 address: %s', address)
log.debug(err)
try:
return IPv6AddressScoped(address)
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError) as err:
log.debug('Error while parsing IPv6 address: %s', address)
log.debug(err)
if isinstance(address, bytes):
raise ipaddress.AddressValueError('{} does not appear to be an IPv4 or IPv6 address. '
'Did you pass in a bytes (str in Python 2) instead '
'of a unicode object?'.format(repr(address)))
raise ValueError('{} does not appear to be an IPv4 or IPv6 address'.format(repr(address)))
def ip_interface(address):
"""Take an IP string/int and return an object of the correct type.
Args:
address: A string or integer, the IP address. Either IPv4 or
IPv6 addresses may be supplied; integers less than 2**32 will
be considered to be IPv4 by default.
Returns:
An IPv4Interface or IPv6Interface object.
Raises:
ValueError: if the string passed isn't either a v4 or a v6
address.
Notes:
The IPv?Interface classes describe an Address on a particular
Network, so they're basically a combination of both the Address
and Network classes.
"""
try:
return ipaddress.IPv4Interface(address)
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError) as err:
log.debug('Error while getting IPv4 interface for address %s', address)
log.debug(err)
try:
return ipaddress.IPv6Interface(address)
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError) as err:
log.debug('Error while getting IPv6 interface for address %s', address)
log.debug(err)
raise ValueError('{} does not appear to be an IPv4 or IPv6 interface'.format(address))
if ipaddress:
ipaddress.IPv6Address = IPv6AddressScoped
if sys.version_info.major == 2:
ipaddress.IPv6Interface = IPv6InterfaceScoped
ipaddress.ip_address = ip_address
ipaddress.ip_interface = ip_interface

View File

@ -41,5 +41,6 @@ def run(start=None):
suite = loader.discover(start, 'test_*.py', top)
unittest.TextTestRunner(verbosity=2).run(suite)
if __name__ == "__main__":
run()

View File

@ -78,6 +78,7 @@ def test():
minion = salt.daemons.flo.IofloMinion(opts=opts)
minion.start(behaviors=['raet.flo.behaving'])
if __name__ == '__main__':
console.reinit(verbosity=console.Wordage.concise)
test()

View File

@ -2224,6 +2224,7 @@ def runAll():
unittest.TextTestRunner(verbosity=2).run(suite)
if __name__ == '__main__' and __package__ is None:
#console.reinit(verbosity=console.Wordage.concise)

View File

@ -63,6 +63,7 @@ from salt.ext import six
def __virtual__():
return HAS_HYPCHAT
log = logging.getLogger(__name__)
_DEFAULT_API_URL = 'https://api.hipchat.com'

View File

@ -41,6 +41,7 @@ def __virtual__():
if logstash is not None \
else (False, 'python-logstash not installed')
log = logging.getLogger(__name__)

View File

@ -48,6 +48,7 @@ def __virtual__():
else:
return True
log = logging.getLogger(__name__)

View File

@ -56,7 +56,7 @@ def execute(opts, data, func, args, kwargs):
'salt-call',
'--out', 'json',
'--metadata',
'-c', salt.syspaths.CONFIG_DIR,
'-c', opts.get('config_dir'),
'--',
data.get('fun')]
if data['fun'] in ('state.sls', 'state.highstate', 'state.apply'):

View File

@ -166,6 +166,7 @@ def is_mp_logging_configured():
def is_extended_logging_configured():
return __EXTERNAL_LOGGERS_CONFIGURED
# Store a reference to the temporary queue logging handler
LOGGING_NULL_HANDLER = __NullLoggingHandler(logging.WARNING)

View File

@ -936,7 +936,7 @@ def cmd_unzip(zip_file,
if password:
cmd.extend(['-P', password])
if options:
cmd.append('{0}'.format(options))
cmd.extend(shlex.split(options))
cmd.extend(['{0}'.format(zip_file), '-d', '{0}'.format(dest)])
if excludes is not None:

View File

@ -64,6 +64,7 @@ def __virtual__():
else:
return (False, 'The capirca module (capirca_acl) cannot be loaded.')
# ------------------------------------------------------------------------------
# module globals
# ------------------------------------------------------------------------------

View File

@ -708,6 +708,7 @@ def rm_job(user,
return comdat['stderr']
return ret
rm = salt.utils.functools.alias_function(rm_job, 'rm')

View File

@ -302,6 +302,7 @@ def TXT(host, nameserver=None):
return [i for i in cmd['stdout'].split('\n')]
# Let lowercase work, since that is the convention for Salt functions
a = A
aaaa = AAAA

View File

@ -199,6 +199,7 @@ def latest_version(*names, **kwargs):
'''
return '' if len(names) == 1 else dict((x, '') for x in names)
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')
@ -485,6 +486,7 @@ def remove(name=None, pkgs=None, **kwargs):
return ret
# Support pkg.delete to remove packages to more closely match pkg_delete
delete = salt.utils.functools.alias_function(remove, 'delete')
# No equivalent to purge packages, use remove instead

View File

@ -1372,6 +1372,7 @@ def config_get_regexp(key,
ret.setdefault(param, []).append(value)
return ret
config_get_regex = salt.utils.functools.alias_function(config_get_regexp, 'config_get_regex')

View File

@ -212,6 +212,7 @@ def list_tab(user):
ret['pre'].append(line)
return ret
# For consistency's sake
ls = salt.utils.functools.alias_function(list_tab, 'ls')
@ -317,4 +318,5 @@ def rm_job(user,
return ret
rm = salt.utils.functools.alias_function(rm_job, 'rm')

View File

@ -26,6 +26,7 @@ def long_range(start, end):
yield start
start += 1
_IPSET_FAMILIES = {
'ipv4': 'inet',
'ip4': 'inet',

View File

@ -87,6 +87,7 @@ def __virtual__():
return 'keystone'
return (False, 'keystone execution module cannot be loaded: keystoneclient python library not available.')
__opts__ = {}

View File

@ -2599,6 +2599,7 @@ def destroy(name, stop=False, path=None):
)
return _change_state('lxc-destroy', name, None, path=path)
# Compatibility between LXC and nspawn
remove = salt.utils.functools.alias_function(destroy, 'remove')
@ -2943,6 +2944,7 @@ def set_password(name, users, password, encrypted=True, path=None):
)
return True
set_pass = salt.utils.functools.alias_function(set_password, 'set_pass')
@ -4209,6 +4211,7 @@ def copy_to(name, source, dest, overwrite=False, makedirs=False, path=None):
overwrite=overwrite,
makedirs=makedirs)
cp = salt.utils.functools.alias_function(copy_to, 'cp')

View File

@ -212,6 +212,7 @@ def latest_version(*names, **kwargs):
else:
return versions_dict
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -179,6 +179,7 @@ def latest_version(*names, **kwargs):
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -238,6 +238,7 @@ def increment(key, delta=1, host=DEFAULT_HOST, port=DEFAULT_PORT):
except ValueError:
raise SaltInvocationError('Delta value must be an integer')
incr = salt.utils.functools.alias_function(increment, 'incr')
@ -269,4 +270,5 @@ def decrement(key, delta=1, host=DEFAULT_HOST, port=DEFAULT_PORT):
except ValueError:
raise SaltInvocationError('Delta value must be an integer')
decr = salt.utils.functools.alias_function(decrement, 'decr')

View File

@ -1035,6 +1035,7 @@ def hw_addr(iface):
'''
return salt.utils.network.hw_addr(iface)
# Alias hwaddr to preserve backward compat
hwaddr = salt.utils.functools.alias_function(hw_addr, 'hwaddr')
@ -1213,6 +1214,7 @@ def ip_addrs6(interface=None, include_loopback=False, cidr=None):
else:
return addrs
ipaddrs6 = salt.utils.functools.alias_function(ip_addrs6, 'ipaddrs6')

View File

@ -882,6 +882,7 @@ def list_running():
pass
return sorted(ret)
# 'machinectl list' shows only running containers, so allow this to work as an
# alias to nspawn.list_running
list_ = salt.utils.functools.alias_function(list_running, 'list_')
@ -1313,6 +1314,7 @@ def copy_to(name, source, dest, overwrite=False, makedirs=False):
overwrite=overwrite,
makedirs=makedirs)
cp = salt.utils.functools.alias_function(copy_to, 'cp')
@ -1478,4 +1480,5 @@ def pull_dkr(url, name, index):
'''
return _pull_image('dkr', url, name, index=index)
pull_docker = salt.utils.functools.alias_function(pull_dkr, 'pull_docker')

View File

@ -106,6 +106,7 @@ def latest_version(*names, **kwargs):
return ret[names[0]]
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -277,6 +277,7 @@ def items(*args, **kwargs):
return pillar.compile_pillar()
# Allow pillar.data to also be used to return pillar data
data = salt.utils.functools.alias_function(items, 'data')

View File

@ -240,6 +240,7 @@ def latest_version(*names, **kwargs):
return ret[names[0]]
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -385,6 +385,7 @@ def latest_version(*names, **kwargs):
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -164,6 +164,7 @@ def latest_version(*names, **kwargs):
return ret[names[0]]
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')

View File

@ -1227,7 +1227,7 @@ def set_known_host(user=None,
if fingerprint and fingerprint not in known_fingerprints:
return {'status': 'error',
'error': ('Remote host public keys found but none of their'
'error': ('Remote host public keys found but none of their '
'fingerprints match the one you have provided')}
if check_required:

View File

@ -307,5 +307,6 @@ def _register_functions():
__all__.append(mod_name)
globals()[mod_name] = mod_func
if TESTINFRA_PRESENT:
_register_functions()

View File

@ -25,7 +25,9 @@ from salt.ext import six
HAS_LIBS = False
try:
import twilio
if twilio.__version__ > 5:
# Grab version, ensure elements are ints
twilio_version = tuple([int(x) for x in twilio.__version_info__])
if twilio_version > (5, ):
TWILIO_5 = False
from twilio.rest import Client as TwilioRestClient
from twilio.rest import TwilioException as TwilioRestException

View File

@ -198,6 +198,7 @@ def __virtual__():
return __virtualname__
__outputter__ = {
'touch': 'txt',
'append': 'txt',

View File

@ -318,6 +318,7 @@ def hw_addr(iface):
'''
return salt.utils.network.hw_addr(iface)
# Alias hwaddr to preserve backward compat
hwaddr = salt.utils.functools.alias_function(hw_addr, 'hwaddr')
@ -390,6 +391,7 @@ def ip_addrs(interface=None, include_loopback=False, cidr=None, type=None):
else:
return addrs
ipaddrs = salt.utils.functools.alias_function(ip_addrs, 'ipaddrs')
@ -423,6 +425,7 @@ def ip_addrs6(interface=None, include_loopback=False, cidr=None):
else:
return addrs
ipaddrs6 = salt.utils.functools.alias_function(ip_addrs6, 'ipaddrs6')

View File

@ -158,6 +158,7 @@ def __virtual__():
return __virtualname__
__func_alias__ = {
'time_': 'time'
}

View File

@ -6,20 +6,22 @@ Microsoft Update files management via wusa.exe
:platform: Windows
:depends: PowerShell
.. versionadded:: Neon
.. versionadded:: 2018.3.4
'''
# Import python libs
from __future__ import absolute_import, unicode_literals
import logging
import os
# Import salt libs
import salt.utils.platform
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
# Define the module's virtual name
__virtualname__ = 'win_wusa'
__virtualname__ = 'wusa'
def __virtual__():
@ -36,58 +38,189 @@ def __virtual__():
return __virtualname__
def is_installed(kb):
def _pshell_json(cmd, cwd=None):
'''
Execute the desired powershell command and ensure that it returns data
in JSON format and load that into python
'''
if 'convertto-json' not in cmd.lower():
cmd = '{0} | ConvertTo-Json'.format(cmd)
log.debug('PowerShell: %s', cmd)
ret = __salt__['cmd.run_all'](cmd, shell='powershell', cwd=cwd)
if 'pid' in ret:
del ret['pid']
if ret.get('stderr', ''):
error = ret['stderr'].splitlines()[0]
raise CommandExecutionError(error, info=ret)
if 'retcode' not in ret or ret['retcode'] != 0:
# run_all logs an error to log.error, fail hard back to the user
raise CommandExecutionError(
'Issue executing PowerShell {0}'.format(cmd), info=ret)
# Sometimes Powershell returns an empty string, which isn't valid JSON
if ret['stdout'] == '':
ret['stdout'] = '{}'
try:
ret = salt.utils.json.loads(ret['stdout'], strict=False)
except ValueError:
raise CommandExecutionError(
'No JSON results from PowerShell', info=ret)
return ret
def is_installed(name):
'''
Check if a specific KB is installed.
Args:
name (str):
The name of the KB to check
Returns:
bool: ``True`` if installed, otherwise ``False``
CLI Example:
.. code-block:: bash
salt '*' win_wusa.is_installed KB123456
salt '*' wusa.is_installed KB123456
'''
get_hotfix_result = __salt__['cmd.powershell_all']('Get-HotFix -Id {0}'.format(kb), ignore_retcode=True)
return get_hotfix_result['retcode'] == 0
return __salt__['cmd.retcode'](cmd='Get-HotFix -Id {0}'.format(name),
shell='powershell',
ignore_retcode=True) == 0
def install(path):
def install(path, restart=False):
'''
Install a KB from a .msu file.
Some KBs will need a reboot, but this function does not manage it.
You may have to manage reboot yourself after installation.
Args:
path (str):
The full path to the msu file to install
restart (bool):
``True`` to force a restart if required by the installation. Adds
the ``/forcerestart`` switch to the ``wusa.exe`` command. ``False``
will add the ``/norestart`` switch instead. Default is ``False``
Returns:
bool: ``True`` if successful, otherwise ``False``
Raise:
CommandExecutionError: If the package is already installed or an error
is encountered
CLI Example:
.. code-block:: bash
salt '*' win_wusa.install C:/temp/KB123456.msu
salt '*' wusa.install C:/temp/KB123456.msu
'''
return __salt__['cmd.run_all']('wusa.exe {0} /quiet /norestart'.format(path), ignore_retcode=True)
# Build the command
cmd = ['wusa.exe', path, '/quiet']
if restart:
cmd.append('/forcerestart')
else:
cmd.append('/norestart')
# Run the command
ret_code = __salt__['cmd.retcode'](cmd, ignore_retcode=True)
# Check the ret_code
file_name = os.path.basename(path)
errors = {2359302: '{0} is already installed'.format(file_name),
87: 'Unknown error'}
if ret_code in errors:
raise CommandExecutionError(errors[ret_code])
elif ret_code:
raise CommandExecutionError('Unknown error: {0}'.format(ret_code))
return True
def uninstall(kb):
def uninstall(path, restart=False):
'''
Uninstall a specific KB.
CLI Example:
Args:
.. code-block:: bash
path (str):
The full path to the msu file to uninstall. This can also be just
the name of the KB to uninstall
salt '*' win_wusa.uninstall KB123456
'''
return __salt__['cmd.run_all']('wusa.exe /uninstall /kb:{0} /quiet /norestart'.format(kb[2:]), ignore_retcode=True)
restart (bool):
``True`` to force a restart if required by the installation. Adds
the ``/forcerestart`` switch to the ``wusa.exe`` command. ``False``
will add the ``/norestart`` switch instead. Default is ``False``
Returns:
bool: ``True`` if successful, otherwise ``False``
def list_kbs():
'''
Return a list of dictionaries, one dictionary for each installed KB.
The HotFixID key contains the ID of the KB.
Raises:
CommandExecutionError: If an error is encountered
CLI Example:
.. code-block:: bash
salt '*' win_wusa.list_kbs
salt '*' wusa.uninstall KB123456
# or
salt '*' wusa.uninstall C:/temp/KB123456.msu
'''
return __salt__['cmd.powershell']('Get-HotFix')
# Build the command
cmd = ['wusa.exe', '/uninstall', '/quiet']
kb = os.path.splitext(os.path.basename(path))[0]
if os.path.exists(path):
cmd.append(path)
else:
cmd.append(
'/kb:{0}'.format(kb[2:] if kb.lower().startswith('kb') else kb))
if restart:
cmd.append('/forcerestart')
else:
cmd.append('/norestart')
# Run the command
ret_code = __salt__['cmd.retcode'](cmd, ignore_retcode=True)
# Check the ret_code
# If you pass /quiet and specify /kb, you'll always get retcode 87 if there
# is an error. Use the actual file to get a more descriptive error
errors = {-2145116156: '{0} does not support uninstall'.format(kb),
2359303: '{0} not installed'.format(kb),
87: 'Unknown error. Try specifying an .msu file'}
if ret_code in errors:
raise CommandExecutionError(errors[ret_code])
elif ret_code:
raise CommandExecutionError('Unknown error: {0}'.format(ret_code))
return True
def list():
'''
Get a list of updates installed on the machine
Returns:
list: A list of installed updates
CLI Example:
.. code-block:: bash
salt '*' wusa.list
'''
kbs = []
ret = _pshell_json('Get-HotFix | Select HotFixID')
for item in ret:
kbs.append(item['HotFixID'])
return kbs

View File

@ -559,6 +559,7 @@ def latest_version(*names, **kwargs):
return ret[names[0]]
return ret
# available_version is being deprecated
available_version = salt.utils.functools.alias_function(latest_version, 'available_version')
@ -979,6 +980,7 @@ def list_upgrades(refresh=True, **kwargs):
return dict([(x.name, x.version) for x in _yum_pkginfo(out['stdout'])])
# Preserve expected CLI usage (yum list updates)
list_updates = salt.utils.functools.alias_function(list_upgrades, 'list_updates')
@ -2304,6 +2306,7 @@ def list_holds(pattern=__HOLD_PATTERN, full=True):
ret.append(match)
return ret
get_locked_packages = salt.utils.functools.alias_function(list_holds, 'get_locked_packages')
@ -2610,6 +2613,7 @@ def group_install(name,
return install(pkgs=pkgs, **kwargs)
groupinstall = salt.utils.functools.alias_function(group_install, 'groupinstall')

View File

@ -51,6 +51,7 @@ def __virtual__():
return False, 'The \'{0}\' module could not be loaded: ' \
'\'requests\' is not installed.'.format(__virtualname__)
ROUTERS = {'MessagingRouter': 'messaging',
'EventsRouter': 'evconsole',
'ProcessRouter': 'process',

View File

@ -471,6 +471,7 @@ def list_upgrades(refresh=True, **kwargs):
return ret
# Provide a list_updates function for those used to using zypper list-updates
list_updates = salt.utils.functools.alias_function(list_upgrades, 'list_updates')

View File

@ -204,6 +204,7 @@ class NetapiClient(object):
wheel = salt.wheel.WheelClient(self.opts)
return wheel.cmd_async(kwargs)
CLIENTS = [
name for name, _
in inspect.getmembers(NetapiClient, predicate=inspect.ismethod if six.PY2 else None)

View File

@ -81,4 +81,5 @@ def get_application(*args):
return wsgi_app
application = get_application()

View File

@ -81,6 +81,7 @@ def __virtual__():
return False
return 'mongo'
# Set up logging
log = logging.getLogger(__name__)

View File

@ -135,6 +135,7 @@ def _get_options(ret):
return _options
#
# Most email readers to not support <style> tag.
# The following dict and a function provide a primitive styler

View File

@ -25,6 +25,7 @@ def pause(jid, state_id=None, duration=None):
minion = salt.minion.MasterMinion(__opts__)
minion.functions['state.pause'](jid, state_id, duration)
set_pause = salt.utils.functools.alias_function(pause, 'set_pause')
@ -35,6 +36,7 @@ def resume(jid, state_id=None):
minion = salt.minion.MasterMinion(__opts__)
minion.functions['state.resume'](jid, state_id)
rm_pause = salt.utils.functools.alias_function(resume, 'rm_pause')
@ -132,6 +134,7 @@ def orchestrate(mods,
ret['retcode'] = 1
return ret
# Aliases for orchestrate runner
orch = salt.utils.functools.alias_function(orchestrate, 'orch')
sls = salt.utils.functools.alias_function(orchestrate, 'sls')
@ -241,6 +244,7 @@ def orchestrate_show_sls(mods,
ret = {minion.opts['id']: running}
return ret
orch_show_sls = salt.utils.functools.alias_function(orchestrate_show_sls, 'orch_show_sls')

View File

@ -121,6 +121,7 @@ class Dumper(BaseDumper): # pylint: disable=W0232
'''Overwrites Dumper as not for pollute legacy Dumper'''
pass
Dumper.add_multi_representer(EncryptedString, EncryptedString.yaml_dumper)
Dumper.add_multi_representer(type(None), Dumper.represent_none)
Dumper.add_multi_representer(str, Dumper.represent_str)

View File

@ -2197,7 +2197,7 @@ def managed(name,
contents_pillar
.. versionadded:: 0.17.0
.. versionchanged: 2016.11.0
.. versionchanged:: 2016.11.0
contents_pillar can also be a list, and the pillars will be
concatinated together to form one file.

View File

@ -202,7 +202,7 @@ def diff(name, d_id):
return ret
def cli(name, format='text', **kwargs):
def cli(name, **kwargs):
'''
Executes the CLI commands and reuturns the text output.
@ -218,10 +218,10 @@ def cli(name, format='text', **kwargs):
* command:
The command that need to be executed on Junos CLI. (default = None)
Optional
* format:
Format in which to get the CLI output. (text or xml, \
default = 'text')
* kwargs: Keyworded arguments which can be provided like-
* format:
Format in which to get the CLI output. (text or xml, \
default = 'text')
* timeout:
Set NETCONF RPC timeout. Can be used for commands which
take a while to execute. (default = 30 seconds)
@ -230,7 +230,7 @@ def cli(name, format='text', **kwargs):
(default = None)
'''
ret = {'name': name, 'changes': {}, 'result': True, 'comment': ''}
ret['changes'] = __salt__['junos.cli'](name, format, **kwargs)
ret['changes'] = __salt__['junos.cli'](name, **kwargs)
return ret

View File

@ -213,6 +213,7 @@ def wait(name, **kwargs):
'result': True,
'comment': ''}
# Alias module.watch to module.wait
watch = salt.utils.functools.alias_function(wait, 'watch')
@ -552,4 +553,5 @@ def _get_dict_result(node):
break
return ret
mod_watch = salt.utils.functools.alias_function(run, 'mod_watch')

View File

@ -16,4 +16,5 @@ def _no_op(name, **kwargs):
'''
return dict(name=name, result=True, changes={}, comment='')
set = context = _no_op # pylint: disable=C0103

View File

@ -336,4 +336,5 @@ def managed(name,
'old': old if old else ''}
return ret
manage = salt.utils.functools.alias_function(managed, 'manage')

View File

@ -261,7 +261,7 @@ def set_(name,
for p_name in current_policy[policy_data['output_section']]:
if policy_name.lower() == p_name.lower():
currently_set = True
pol_id = p_name
pol_id = policy_name
break
# Check aliases
else:

View File

@ -81,6 +81,7 @@ def computer_desc(name):
'\'{0}\''.format(name))
return ret
computer_description = salt.utils.functools.alias_function(computer_desc, 'computer_description')

View File

@ -2,10 +2,10 @@
'''
Microsoft Updates (KB) Management
This module provides the ability to enforce KB installations
from files (.msu), without WSUS.
This module provides the ability to enforce KB installations from files (.msu),
without WSUS or Windows Update
.. versionadded:: Neon
.. versionadded:: 2018.3.4
'''
# Import python libs
@ -14,13 +14,14 @@ import logging
# Import salt libs
import salt.utils.platform
import salt.exceptions
import salt.utils.url
from salt.exceptions import SaltInvocationError
log = logging.getLogger(__name__)
# Define the module's virtual name
__virtualname__ = 'win_wusa'
__virtualname__ = 'wusa'
def __virtual__():
@ -35,81 +36,113 @@ def __virtual__():
def installed(name, source):
'''
Enforce the installed state of a KB
Ensure an update is installed on the minion
name
Name of the Windows KB ("KB123456")
source
Source of .msu file corresponding to the KB
Args:
name(str):
Name of the Windows KB ("KB123456")
source (str):
Source of .msu file corresponding to the KB
Example:
.. code-block:: yaml
KB123456:
wusa.installed:
- source: salt://kb123456.msu
'''
ret = {
'name': name,
'changes': {},
'result': False,
'comment': '',
}
ret = {'name': name,
'changes': {},
'result': False,
'comment': ''}
# Start with basic error-checking. Do all the passed parameters make sense
# and agree with each-other?
if not name or not source:
raise salt.exceptions.SaltInvocationError(
'Arguments "name" and "source" are mandatory.')
# Input validation
if not name:
raise SaltInvocationError('Must specify a KB "name"')
if not source:
raise SaltInvocationError('Must specify a "source" file to install')
# Check the current state of the system. Does anything need to change?
current_state = __salt__['win_wusa.is_installed'](name)
if current_state:
# Is the KB already installed
if __salt__['wusa.is_installed'](name):
ret['result'] = True
ret['comment'] = 'KB already installed'
ret['comment'] = '{0} already installed'.format(name)
return ret
# The state of the system does need to be changed. Check if we're running
# in ``test=true`` mode.
# Check for test=True
if __opts__['test'] is True:
ret['comment'] = 'The KB "{0}" will be installed.'.format(name)
ret['changes'] = {
'old': current_state,
'new': True,
}
# Return ``None`` when running with ``test=true``.
ret['result'] = None
ret['comment'] = '{0} would be installed'.format(name)
ret['result'] = None
return ret
try:
result = __states__['file.cached'](source,
skip_verify=True,
saltenv=__env__)
except Exception as exc:
msg = 'Failed to cache {0}: {1}'.format(
salt.utils.url.redact_http_basic_auth(source),
exc.__str__())
log.exception(msg)
# Cache the file
cached_source_path = __salt__['cp.cache_file'](path=source, saltenv=__env__)
if not cached_source_path:
msg = 'Unable to cache {0} from saltenv "{1}"'.format(
salt.utils.url.redact_http_basic_auth(source), __env__)
ret['comment'] = msg
return ret
if result['result']:
# Get the path of the file in the minion cache
cached = __salt__['cp.is_cached'](source, saltenv=__env__)
# Install the KB
__salt__['wusa.install'](cached_source_path)
# Verify successful install
if __salt__['wusa.is_installed'](name):
ret['comment'] = '{0} was installed'.format(name)
ret['changes'] = {'old': False, 'new': True}
ret['result'] = True
else:
log.debug(
'failed to download %s',
salt.utils.url.redact_http_basic_auth(source)
)
return result
# Finally, make the actual change and return the result.
new_state = __salt__['win_wusa.install'](cached)
ret['comment'] = 'The KB "{0}" was installed!'.format(name)
ret['changes'] = {
'old': current_state,
'new': new_state,
}
ret['result'] = True
ret['comment'] = '{0} failed to install'.format(name)
return ret
def uninstalled(name):
'''
Ensure an update is uninstalled from the minion
Args:
name(str):
Name of the Windows KB ("KB123456")
Example:
.. code-block:: yaml
KB123456:
wusa.uninstalled
'''
ret = {'name': name,
'changes': {},
'result': False,
'comment': ''}
# Is the KB already uninstalled
if not __salt__['wusa.is_installed'](name):
ret['result'] = True
ret['comment'] = '{0} already uninstalled'.format(name)
return ret
# Check for test=True
if __opts__['test'] is True:
ret['result'] = None
ret['comment'] = '{0} would be uninstalled'.format(name)
ret['result'] = None
return ret
# Uninstall the KB
__salt__['wusa.uninstall'](name)
# Verify successful uninstall
if not __salt__['wusa.is_installed'](name):
ret['comment'] = '{0} was uninstalled'.format(name)
ret['changes'] = {'old': True, 'new': False}
ret['result'] = True
else:
ret['comment'] = '{0} failed to uninstall'.format(name)
return ret

View File

@ -67,6 +67,7 @@ def __virtual__():
return False
return 'mongo'
# Set up logging
log = logging.getLogger(__name__)

View File

@ -303,5 +303,6 @@ def run(extension=None, name=None, description=None, salt_dir=None, merge=False,
log.info('New module stored in %s', path)
return path
if __name__ == '__main__':
run()

View File

@ -723,5 +723,6 @@ def _main():
for result in finder.find(path):
print(result)
if __name__ == '__main__':
_main()

View File

@ -729,34 +729,13 @@ class CkMinions(object):
_res = {'minions': [], 'missing': []}
return _res
def _expand_matching(self, auth_entry):
ref = {'G': 'grain',
'P': 'grain_pcre',
'I': 'pillar',
'J': 'pillar_pcre',
'L': 'list',
'S': 'ipcidr',
'E': 'pcre',
'N': 'node',
None: 'compound'}
target_info = parse_target(auth_entry)
if not target_info:
log.error('Failed to parse valid target "%s"', auth_entry)
v_matcher = ref.get(target_info['engine'])
v_expr = target_info['pattern']
_res = self.check_minions(v_expr, v_matcher)
return set(_res['minions'])
def validate_tgt(self, valid, expr, tgt_type, minions=None):
def validate_tgt(self, valid, expr, tgt_type, minions=None, expr_form=None):
'''
Return a Bool. This function returns if the expression sent in is
within the scope of the valid expression
'''
v_minions = self._expand_matching(valid)
v_minions = set(self.check_minions(valid, 'compound').get('minions', []))
if minions is None:
_res = self.check_minions(expr, tgt_type)
minions = set(_res['minions'])
@ -896,7 +875,7 @@ class CkMinions(object):
continue
allowed_minions.update(set(auth_list_entry.keys()))
for key in auth_list_entry:
for match in self._expand_matching(key):
for match in set(self.check_minions(key, 'compound')):
if match in auth_dictionary:
auth_dictionary[match].extend(auth_list_entry[key])
else:
@ -904,7 +883,7 @@ class CkMinions(object):
allowed_minions_from_auth_list = set()
for next_entry in allowed_minions:
allowed_minions_from_auth_list.update(self._expand_matching(next_entry))
allowed_minions_from_auth_list.update(set(self.check_minions(next_entry, 'compound')))
# 'minions' here are all the names of minions matched by the target
# if we take out all the allowed minions, and there are any left, then
# the target includes minions that are not allowed by eauth

View File

@ -403,6 +403,7 @@ of a field to null.
def _failing_new(*args, **kwargs):
raise TypeError('Can\'t create another NullSentinel instance')
NullSentinel.__new__ = staticmethod(_failing_new)
del _failing_new

View File

@ -500,7 +500,7 @@ def valid_id(opts, id_):
if any(x in id_ for x in ('/', '\\', str('\0'))):
return False
return bool(clean_path(opts['pki_dir'], id_))
except (AttributeError, KeyError, TypeError):
except (AttributeError, KeyError, TypeError, UnicodeDecodeError):
return False

View File

@ -28,6 +28,7 @@ class DuplicateKeyWarning(RuntimeWarning):
Warned when duplicate keys exist
'''
warnings.simplefilter('always', category=DuplicateKeyWarning)

View File

@ -158,5 +158,6 @@ def main(argv=None):
else:
print(count_results(data, counts))
if __name__ == "__main__":
sys.exit(main())

View File

@ -28,6 +28,7 @@ def _random_name(size=6):
for x in range(size)
)
# Create the cloud instance name to be used throughout the tests
INSTANCE_NAME = _random_name()
PROVIDER_NAME = 'dimensiondata'

View File

@ -33,8 +33,6 @@ class EC2Test(ShellCase):
'''
Integration tests for the EC2 cloud provider in Salt-Cloud
'''
def _installer_name(self):
'''
Determine the downloaded installer name by searching the files

View File

@ -28,6 +28,7 @@ def __random_name(size=6):
for x in range(size)
)
# Create the cloud instance name to be used throughout the tests
INSTANCE_NAME = __random_name()

View File

@ -32,6 +32,7 @@ def __random_string(size=6):
for x in range(size)
)
# Create user strings for tests
ADD_USER = __random_string()
DEL_USER = __random_string()

View File

@ -1001,6 +1001,7 @@ def parse():
return options
if __name__ == '__main__':
exit_code = run(parse())
print('Exit Code: {0}'.format(exit_code))

View File

@ -79,5 +79,6 @@ def func_builder(testdir):
)
return func
for testdir in os.listdir(os.path.join(CURRENT_DIR, 'tests')):
setattr(KitchenTestCase, 'test_kitchen_{0}'.format(testdir), func_builder(testdir))

View File

@ -434,6 +434,7 @@ class MasterSwarm(Swarm):
)
print('Master killed')
# pylint: disable=C0103
if __name__ == '__main__':
swarm = Swarm(parse())

View File

@ -442,5 +442,6 @@ def main():
except KeyboardInterrupt:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -60,6 +60,7 @@ class DownloadArtifacts(object):
except IOError:
print('Failed to copy: {0}'.format(remote))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Jenkins Artifact Download Helper')
parser.add_argument(

View File

@ -184,6 +184,8 @@ class RuntimeVars(object):
object.__setattr__(self, name, value)
return
self._vars[name] = value
# <---- Helper Methods -----------------------------------------------------------------------------------------------
# ----- Global Variables -------------------------------------------------------------------------------------------->

View File

@ -55,6 +55,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -57,6 +57,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -181,6 +181,7 @@ class GlusterResults(object):
success_first_ip_from_second_second_time = success_reverse_already_peer[
'ip']
xml_peer_present = """
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>

View File

@ -40,6 +40,7 @@ def _test_hashlib():
else:
return True
SUPPORTED_HASHLIB = _test_hashlib()

View File

@ -335,6 +335,17 @@ class MySQLTestCase(TestCase, LoaderModuleMockMixin):
def test_query(self):
self._test_call(mysql.query, 'SELECT * FROM testdb', 'testdb', 'SELECT * FROM testdb')
def test_query_error(self):
connect_mock = MagicMock()
with patch.object(mysql, '_connect', connect_mock):
with patch.dict(mysql.__salt__, {'config.option': MagicMock()}):
side_effect = MySQLdb.OperationalError(9999, 'Something Went Wrong')
with patch.object(mysql, '_execute', MagicMock(side_effect=side_effect)):
mysql.query('testdb', 'SELECT * FROM testdb')
self.assertIn('mysql.error', mysql.__context__)
expected = 'MySQL Error 9999: Something Went Wrong'
self.assertEqual(mysql.__context__['mysql.error'], expected)
def _test_call(self, function, expected_sql, *args, **kwargs):
connect_mock = MagicMock()
with patch.object(mysql, '_connect', connect_mock):

View File

@ -44,6 +44,7 @@ class MockNapalmYangModule(object):
models = MockNapalmYangModels()
utils = MockUtils()
TEST_CONFIG = {
'comment': 'Configuration discarded.',
'already_configured': False,

View File

@ -22,7 +22,9 @@ import salt.modules.twilio_notify as twilio_notify
HAS_LIBS = False
try:
import twilio
if twilio.__version__ > 5:
# Grab version, ensure elements are ints
twilio_version = tuple([int(x) for x in twilio.__version_info__])
if twilio_version > (5, ):
TWILIO_5 = False
else:
TWILIO_5 = True

View File

@ -46,6 +46,7 @@ class RequestPutResponseMock(Mock):
def json(self):
return {'_id': 4321}
REQUEST_MOCK = RequestMock()

View File

@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
'''
Test the win_wusa execution module
'''
# Import Python Libs
from __future__ import absolute_import, unicode_literals, print_function
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
from tests.support.unit import TestCase, skipIf
# Import Salt Libs
import salt.utils.platform
import salt.modules.win_wusa as win_wusa
from salt.exceptions import CommandExecutionError
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not salt.utils.platform.is_windows(), 'System is not Windows')
class WinWusaTestCase(TestCase, LoaderModuleMockMixin):
'''
test the functions in the win_wusa execution module
'''
def setup_loader_modules(self):
return {win_wusa: {}}
def test_is_installed_false(self):
'''
test is_installed function when the KB is not installed
'''
mock_retcode = MagicMock(return_value=1)
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
self.assertFalse(win_wusa.is_installed('KB123456'))
def test_is_installed_true(self):
'''
test is_installed function when the KB is installed
'''
mock_retcode = MagicMock(return_value=0)
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
self.assertTrue(win_wusa.is_installed('KB123456'))
def test_list(self):
'''
test list function
'''
ret = {'pid': 1,
'retcode': 0,
'stderr': '',
'stdout': '[{"HotFixID": "KB123456"}, '
'{"HotFixID": "KB123457"}]'}
mock_all = MagicMock(return_value=ret)
with patch.dict(win_wusa.__salt__, {'cmd.run_all': mock_all}):
expected = ['KB123456', 'KB123457']
returned = win_wusa.list()
self.assertListEqual(expected, returned)
def test_install(self):
'''
test install function
'''
mock_retcode = MagicMock(return_value=0)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
self.assertTrue(win_wusa.install(path))
mock_retcode.assert_called_once_with(
['wusa.exe', path, '/quiet', '/norestart'], ignore_retcode=True)
def test_install_restart(self):
'''
test install function with restart=True
'''
mock_retcode = MagicMock(return_value=0)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
self.assertTrue(win_wusa.install(path, restart=True))
mock_retcode.assert_called_once_with(
['wusa.exe', path, '/quiet', '/forcerestart'], ignore_retcode=True)
def test_install_already_installed(self):
'''
test install function when KB already installed
'''
mock_retcode = MagicMock(return_value=2359302)
path = 'C:\\KB123456.msu'
name = 'KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
with self.assertRaises(CommandExecutionError) as excinfo:
win_wusa.install(path)
mock_retcode.assert_called_once_with(
['wusa.exe', path, '/quiet', '/norestart'], ignore_retcode=True)
self.assertEqual('{0} is already installed'.format(name),
excinfo.exception.strerror)
def test_install_error_87(self):
'''
test install function when error 87 returned
'''
mock_retcode = MagicMock(return_value=87)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
with self.assertRaises(CommandExecutionError) as excinfo:
win_wusa.install(path)
mock_retcode.assert_called_once_with(
['wusa.exe', path, '/quiet', '/norestart'], ignore_retcode=True)
self.assertEqual('Unknown error', excinfo.exception.strerror)
def test_install_error_other(self):
'''
test install function on other unknown error
'''
mock_retcode = MagicMock(return_value=1234)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
with self.assertRaises(CommandExecutionError) as excinfo:
win_wusa.install(path)
mock_retcode.assert_called_once_with(
['wusa.exe', path, '/quiet', '/norestart'], ignore_retcode=True)
self.assertEqual('Unknown error: 1234', excinfo.exception.strerror)
def test_uninstall_kb(self):
'''
test uninstall function passing kb name
'''
mock_retcode = MagicMock(return_value=0)
kb = 'KB123456'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}), \
patch("os.path.exists", MagicMock(return_value=False)):
self.assertTrue(win_wusa.uninstall(kb))
mock_retcode.assert_called_once_with(
['wusa.exe', '/uninstall', '/quiet', '/kb:{0}'.format(kb[2:]), '/norestart'],
ignore_retcode=True)
def test_uninstall_path(self):
'''
test uninstall function passing full path to .msu file
'''
mock_retcode = MagicMock(return_value=0)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}), \
patch("os.path.exists", MagicMock(return_value=True)):
self.assertTrue(win_wusa.uninstall(path))
mock_retcode.assert_called_once_with(
['wusa.exe', '/uninstall', '/quiet', path, '/norestart'],
ignore_retcode=True)
def test_uninstall_path_restart(self):
'''
test uninstall function with full path and restart=True
'''
mock_retcode = MagicMock(return_value=0)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}), \
patch("os.path.exists", MagicMock(return_value=True)):
self.assertTrue(win_wusa.uninstall(path, restart=True))
mock_retcode.assert_called_once_with(
['wusa.exe', '/uninstall', '/quiet', path, '/forcerestart'],
ignore_retcode=True)
def test_uninstall_already_uninstalled(self):
'''
test uninstall function when KB already uninstalled
'''
mock_retcode = MagicMock(return_value=2359303)
kb = 'KB123456'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}):
with self.assertRaises(CommandExecutionError) as excinfo:
win_wusa.uninstall(kb)
mock_retcode.assert_called_once_with(
['wusa.exe', '/uninstall', '/quiet', '/kb:{0}'.format(kb[2:]), '/norestart'],
ignore_retcode=True)
self.assertEqual('{0} not installed'.format(kb),
excinfo.exception.strerror)
def test_uninstall_path_error_other(self):
'''
test uninstall function with unknown error
'''
mock_retcode = MagicMock(return_value=1234)
path = 'C:\\KB123456.msu'
with patch.dict(win_wusa.__salt__, {'cmd.retcode': mock_retcode}), \
patch("os.path.exists", MagicMock(return_value=True)), \
self.assertRaises(CommandExecutionError) as excinfo:
win_wusa.uninstall(path)
mock_retcode.assert_called_once_with(
['wusa.exe', '/uninstall', '/quiet', path, '/norestart'],
ignore_retcode=True)
self.assertEqual('Unknown error: 1234', excinfo.exception.strerror)

View File

@ -60,6 +60,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -54,6 +54,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -58,6 +58,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -53,6 +53,7 @@ def _has_required_boto():
else:
return True
if _has_required_boto():
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'

View File

@ -0,0 +1,169 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import, unicode_literals, print_function
# Import Salt Libs
import salt.states.win_wusa as wusa
from salt.exceptions import SaltInvocationError
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase
from tests.support.mock import MagicMock, patch
class WinWusaTestCase(TestCase, LoaderModuleMockMixin):
'''
test the function in the win_wusa state module
'''
kb = 'KB123456'
def setup_loader_modules(self):
return {wusa: {'__opts__': {'test': False},
'__env__': 'base'}}
def test_installed_no_source(self):
'''
test wusa.installed without passing source
'''
with self.assertRaises(SaltInvocationError) as excinfo:
wusa.installed(name='KB123456', source=None)
self.assertEqual(excinfo.exception.strerror,
'Must specify a "source" file to install')
def test_installed_existing(self):
'''
test wusa.installed when the kb is already installed
'''
mock_installed = MagicMock(return_value=True)
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed}):
returned = wusa.installed(name=self.kb,
source='salt://{0}.msu'.format(self.kb))
expected = {'changes': {},
'comment': '{0} already installed'.format(self.kb),
'name': self.kb,
'result': True}
self.assertDictEqual(expected, returned)
def test_installed_test_true(self):
'''
test wusa.installed with test=True
'''
mock_installed = MagicMock(return_value=False)
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed}), \
patch.dict(wusa.__opts__, {'test': True}):
returned = wusa.installed(name=self.kb,
source='salt://{0}.msu'.format(self.kb))
expected = {'changes': {},
'comment': '{0} would be installed'.format(self.kb),
'name': self.kb,
'result': None}
self.assertDictEqual(expected, returned)
def test_installed_cache_fail(self):
'''
test wusa.install when it fails to cache the file
'''
mock_installed = MagicMock(return_value=False)
mock_cache = MagicMock(return_value='')
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed,
'cp.cache_file': mock_cache}):
returned = wusa.installed(name=self.kb,
source='salt://{0}.msu'.format(self.kb))
expected = {'changes': {},
'comment': 'Unable to cache salt://{0}.msu from '
'saltenv "base"'.format(self.kb),
'name': self.kb,
'result': False}
self.assertDictEqual(expected, returned)
def test_installed(self):
'''
test wusa.installed assuming success
'''
mock_installed = MagicMock(side_effect=[False, True])
mock_cache = MagicMock(return_value='C:\\{0}.msu'.format(self.kb))
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed,
'cp.cache_file': mock_cache,
'wusa.install': MagicMock()}):
returned = wusa.installed(name=self.kb,
source='salt://{0}.msu'.format(self.kb))
expected = {'changes': {'new': True, 'old': False},
'comment': '{0} was installed'.format(self.kb),
'name': self.kb,
'result': True}
self.assertDictEqual(expected, returned)
def test_installed_failed(self):
'''
test wusa.installed with a failure
'''
mock_installed = MagicMock(side_effect=[False, False])
mock_cache = MagicMock(return_value='C:\\{0}.msu'.format(self.kb))
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed,
'cp.cache_file': mock_cache,
'wusa.install': MagicMock()}):
returned = wusa.installed(name=self.kb,
source='salt://{0}.msu'.format(self.kb))
expected = {'changes': {},
'comment': '{0} failed to install'.format(self.kb),
'name': self.kb,
'result': False}
self.assertDictEqual(expected, returned)
def test_uninstalled_non_existing(self):
'''
test wusa.uninstalled when the kb is not installed
'''
mock_installed = MagicMock(return_value=False)
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed}):
returned = wusa.uninstalled(name=self.kb)
expected = {'changes': {},
'comment': '{0} already uninstalled'.format(self.kb),
'name': self.kb,
'result': True}
self.assertDictEqual(expected, returned)
def test_uninstalled_test_true(self):
'''
test wusa.uninstalled with test=True
'''
mock_installed = MagicMock(return_value=True)
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed}), \
patch.dict(wusa.__opts__, {'test': True}):
returned = wusa.uninstalled(name=self.kb)
expected = {'changes': {},
'comment': '{0} would be uninstalled'.format(self.kb),
'name': self.kb,
'result': None}
self.assertDictEqual(expected, returned)
def test_uninstalled(self):
'''
test wusa.uninstalled assuming success
'''
mock_installed = MagicMock(side_effect=[True, False])
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed,
'wusa.uninstall': MagicMock()}):
returned = wusa.uninstalled(name=self.kb)
expected = {'changes': {'new': False, 'old': True},
'comment': '{0} was uninstalled'.format(self.kb),
'name': self.kb,
'result': True}
self.assertDictEqual(expected, returned)
def test_uninstalled_failed(self):
'''
test wusa.uninstalled with a failure
'''
mock_installed = MagicMock(side_effect=[True, True])
with patch.dict(wusa.__salt__, {'wusa.is_installed': mock_installed,
'wusa.uninstall': MagicMock()}):
returned = wusa.uninstalled(name=self.kb)
expected = {'changes': {},
'comment': '{0} failed to uninstall'.format(self.kb),
'name': self.kb,
'result': False}
self.assertDictEqual(expected, returned)