Merge pull request #44277 from rallytime/merge-develop

[develop] Merge forward from 2017.7 to develop
This commit is contained in:
Erik Johnson 2017-10-27 10:16:00 -05:00 committed by GitHub
commit 17d8e46abf
14 changed files with 777 additions and 159 deletions

View File

@ -465,18 +465,54 @@ def create(vm_):
return ret
except Exception as e: # pylint: disable=broad-except
# Try to clean up in as much cases as possible
log.info('Cleaning up after exception clean up items: {0}'.format(cleanup))
for leftover in cleanup:
what = leftover['what']
item = leftover['item']
if what == 'domain':
destroy_domain(conn, item)
if what == 'volume':
item.delete()
do_cleanup(cleanup)
# throw the root cause after cleanup
raise e
def do_cleanup(cleanup):
'''
Clean up clone domain leftovers as much as possible.
Extra robust clean up in order to deal with some small changes in libvirt
behavior over time. Passed in volumes and domains are deleted, any errors
are ignored. Used when cloning/provisioning a domain fails.
:param cleanup: list containing dictonaries with two keys: 'what' and 'item'.
If 'what' is domain the 'item' is a libvirt domain object.
If 'what' is volume then the item is a libvirt volume object.
Returns:
none
.. versionadded: 2017.7.3
'''
log.info('Cleaning up after exception')
for leftover in cleanup:
what = leftover['what']
item = leftover['item']
if what == 'domain':
log.info('Cleaning up {0} {1}'.format(what, item.name()))
try:
item.destroy()
log.debug('{0} {1} forced off'.format(what, item.name()))
except libvirtError:
pass
try:
item.undefineFlags(libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE+
libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA+
libvirt.VIR_DOMAIN_UNDEFINE_NVRAM)
log.debug('{0} {1} undefined'.format(what, item.name()))
except libvirtError:
pass
if what == 'volume':
try:
item.delete()
log.debug('{0} {1} cleaned up'.format(what, item.name()))
except libvirtError:
pass
def destroy(name, call=None):
"""
This function irreversibly destroys a virtual machine on the cloud provider.

View File

@ -1389,7 +1389,7 @@ def _parse_settings_eth(opts, iface_type, enabled, iface):
for opt in ['up_cmds', 'pre_up_cmds', 'post_up_cmds',
'down_cmds', 'pre_down_cmds', 'post_down_cmds']:
if opt in opts:
iface_data['inet'][opt] = opts[opt]
iface_data[def_addrfam][opt] = opts[opt]
for addrfam in ['inet', 'inet6']:
if 'addrfam' in iface_data[addrfam] and iface_data[addrfam]['addrfam'] == addrfam:

View File

@ -155,6 +155,7 @@ def _config_logic(napalm_device,
loaded_result['diff'] = None
loaded_result['result'] = False
loaded_result['comment'] = _compare.get('comment')
__context__['retcode'] = 1
return loaded_result
_loaded_res = loaded_result.get('result', False)
@ -174,12 +175,15 @@ def _config_logic(napalm_device,
# make sure it notifies
# that something went wrong
_explicit_close(napalm_device)
__context__['retcode'] = 1
return loaded_result
loaded_result['comment'] += 'Configuration discarded.'
# loaded_result['result'] = False not necessary
# as the result can be true when test=True
_explicit_close(napalm_device)
if not loaded_result['result']:
__context__['retcode'] = 1
return loaded_result
if not test and commit_config:
@ -210,10 +214,13 @@ def _config_logic(napalm_device,
loaded_result['result'] = False
# notify if anything goes wrong
_explicit_close(napalm_device)
__context__['retcode'] = 1
return loaded_result
loaded_result['already_configured'] = True
loaded_result['comment'] = 'Already configured.'
_explicit_close(napalm_device)
if not loaded_result['result']:
__context__['retcode'] = 1
return loaded_result

View File

@ -36,6 +36,60 @@ def __virtual__():
return (False, "Module win_groupadd: module only works on Windows systems")
def _get_computer_object():
'''
A helper function to get the object for the local machine
Returns:
object: Returns the computer object for the local machine
'''
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
return nt.GetObject('', 'WinNT://.,computer')
def _get_group_object(name):
'''
A helper function to get a specified group object
Args:
name (str): The name of the object
Returns:
object: The specified group object
'''
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
return nt.GetObject('', 'WinNT://./' + name + ',group')
def _get_all_groups():
'''
A helper function that gets a list of group objects for all groups on the
machine
Returns:
iter: A list of objects for all groups on the machine
'''
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
results = nt.GetObject('', 'WinNT://.')
results.Filter = ['group']
return results
def _get_username(member):
'''
Resolve the username from the member object returned from a group query
Returns:
str: The username converted to domain\\username format
'''
return member.ADSPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace')
def add(name, **kwargs):
'''
Add the specified group
@ -60,10 +114,8 @@ def add(name, **kwargs):
'comment': ''}
if not info(name):
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
compObj = _get_computer_object()
try:
compObj = nt.GetObject('', 'WinNT://.,computer')
newGroup = compObj.Create('group', name)
newGroup.SetInfo()
ret['changes'].append('Successfully created group {0}'.format(name))
@ -104,10 +156,8 @@ def delete(name, **kwargs):
'comment': ''}
if info(name):
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
compObj = _get_computer_object()
try:
compObj = nt.GetObject('', 'WinNT://.,computer')
compObj.Delete('group', name)
ret['changes'].append(('Successfully removed group {0}').format(name))
except pywintypes.com_error as com_err:
@ -144,17 +194,10 @@ def info(name):
salt '*' group.info foo
'''
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
try:
groupObj = nt.GetObject('', 'WinNT://./' + name + ',group')
groupObj = _get_group_object(name)
gr_name = groupObj.Name
gr_mem = []
for member in groupObj.members():
gr_mem.append(
member.ADSPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace'))
gr_mem = [_get_username(x) for x in groupObj.members()]
except pywintypes.com_error:
return False
@ -193,20 +236,12 @@ def getent(refresh=False):
ret = []
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
results = _get_all_groups()
results = nt.GetObject('', 'WinNT://.')
results.Filter = ['group']
for result in results:
member_list = []
for member in result.members():
member_list.append(
member.AdsPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace'))
group = {'gid': __salt__['file.group_to_gid'](result.name),
'members': member_list,
'name': result.name,
group = {'gid': __salt__['file.group_to_gid'](result.Name),
'members': [_get_username(x) for x in result.members()],
'name': result.Name,
'passwd': 'x'}
ret.append(group)
__context__['group.getent'] = ret
@ -240,17 +275,21 @@ def adduser(name, username, **kwargs):
'changes': {'Users Added': []},
'comment': ''}
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
groupObj = nt.GetObject('', 'WinNT://./' + name + ',group')
existingMembers = []
for member in groupObj.members():
existingMembers.append(
member.ADSPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace').lower())
try:
groupObj = _get_group_object(name)
except pywintypes.com_error as com_err:
if len(com_err.excepinfo) >= 2:
friendly_error = com_err.excepinfo[2].rstrip('\r\n')
ret['result'] = False
ret['comment'] = 'Failure accessing group {0}. {1}' \
''.format(name, friendly_error)
return ret
existingMembers = [_get_username(x) for x in groupObj.members()]
username = salt.utils.win_functions.get_sam_name(username)
try:
if salt.utils.win_functions.get_sam_name(username).lower() not in existingMembers:
if username not in existingMembers:
if not __opts__['test']:
groupObj.Add('WinNT://' + username.replace('\\', '/'))
@ -299,17 +338,20 @@ def deluser(name, username, **kwargs):
'changes': {'Users Removed': []},
'comment': ''}
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
groupObj = nt.GetObject('', 'WinNT://./' + name + ',group')
existingMembers = []
for member in groupObj.members():
existingMembers.append(
member.ADSPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace').lower())
try:
groupObj = _get_group_object(name)
except pywintypes.com_error as com_err:
if len(com_err.excepinfo) >= 2:
friendly_error = com_err.excepinfo[2].rstrip('\r\n')
ret['result'] = False
ret['comment'] = 'Failure accessing group {0}. {1}' \
''.format(name, friendly_error)
return ret
existingMembers = [_get_username(x) for x in groupObj.members()]
try:
if salt.utils.win_functions.get_sam_name(username).lower() in existingMembers:
if salt.utils.win_functions.get_sam_name(username) in existingMembers:
if not __opts__['test']:
groupObj.Remove('WinNT://' + username.replace('\\', '/'))
@ -365,10 +407,8 @@ def members(name, members_list, **kwargs):
ret['comment'].append('Members is not a list object')
return ret
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
try:
groupObj = nt.GetObject('', 'WinNT://./' + name + ',group')
groupObj = _get_group_object(name)
except pywintypes.com_error as com_err:
if len(com_err.excepinfo) >= 2:
friendly_error = com_err.excepinfo[2].rstrip('\r\n')
@ -377,12 +417,7 @@ def members(name, members_list, **kwargs):
'Failure accessing group {0}. {1}'
).format(name, friendly_error))
return ret
existingMembers = []
for member in groupObj.members():
existingMembers.append(
member.ADSPath.replace('WinNT://', '').replace(
'/', '\\').encode('ascii', 'backslashreplace').lower())
existingMembers = [_get_username(x) for x in groupObj.members()]
existingMembers.sort()
members_list.sort()
@ -448,18 +483,14 @@ def list_groups(refresh=False):
salt '*' group.list_groups
'''
if 'group.list_groups' in __context__ and not refresh:
return __context__['group.getent']
return __context__['group.list_groups']
results = _get_all_groups()
ret = []
pythoncom.CoInitialize()
nt = win32com.client.Dispatch('AdsNameSpaces')
results = nt.GetObject('', 'WinNT://.')
results.Filter = ['group']
for result in results:
ret.append(result.name)
ret.append(result.Name)
__context__['group.list_groups'] = ret

View File

@ -1531,24 +1531,26 @@ def install(name=None,
to_install.append((pkgname, pkgstr))
break
else:
if re.match('kernel(-.+)?', name):
# kernel and its subpackages support multiple
# installs as their paths do not conflict.
# Performing a yum/dnf downgrade will be a no-op
# so just do an install instead. It will fail if
# there are other interdependencies that have
# conflicts, and that's OK. We don't want to force
# anything, we just want to properly handle it if
# someone tries to install a kernel/kernel-devel of
# a lower version than the currently-installed one.
# TODO: find a better way to determine if a package
# supports multiple installs.
to_install.append((pkgname, pkgstr))
else:
# None of the currently-installed versions are
# greater than the specified version, so this is a
# downgrade.
to_downgrade.append((pkgname, pkgstr))
if pkgname is not None:
if re.match('kernel(-.+)?', pkgname):
# kernel and its subpackages support multiple
# installs as their paths do not conflict.
# Performing a yum/dnf downgrade will be a
# no-op so just do an install instead. It will
# fail if there are other interdependencies
# that have conflicts, and that's OK. We don't
# want to force anything, we just want to
# properly handle it if someone tries to
# install a kernel/kernel-devel of a lower
# version than the currently-installed one.
# TODO: find a better way to determine if a
# package supports multiple installs.
to_install.append((pkgname, pkgstr))
else:
# None of the currently-installed versions are
# greater than the specified version, so this
# is a downgrade.
to_downgrade.append((pkgname, pkgstr))
def _add_common_args(cmd):
'''

View File

@ -599,10 +599,9 @@ class CkMinions(object):
if search is None:
return minions
addrs = salt.utils.network.local_port_tcp(int(self.opts['publish_port']))
if '127.0.0.1' in addrs or '0.0.0.0' in addrs:
# Add in possible ip addresses of a locally connected minion
if '127.0.0.1' in addrs:
# Add in the address of a possible locally-connected minion.
addrs.discard('127.0.0.1')
addrs.discard('0.0.0.0')
addrs.update(set(salt.utils.network.ip_addrs(include_loopback=include_localhost)))
if subset:
search = subset

View File

@ -67,3 +67,23 @@ class ProxyMinionSimpleTestCase(ModuleCase):
ret = self.run_function('service.start', ['samba'], minion_tgt='proxytest')
ret = self.run_function('service.status', ['samba'], minion_tgt='proxytest')
self.assertTrue(ret)
def test_service_get_all(self):
ret = self.run_function('service.get_all', minion_tgt='proxytest')
self.assertTrue(ret)
self.assertIn('samba', ' '.join(ret))
def test_grains_items(self):
ret = self.run_function('grains.items', minion_tgt='proxytest')
self.assertEqual(ret['kernel'], 'proxy')
self.assertEqual(ret['kernelrelease'], 'proxy')
def test_state_apply(self):
ret = self.run_function('state.apply', ['core'], minion_tgt='proxytest')
for key, value in ret.items():
self.assertTrue(value['result'])
def test_state_highstate(self):
ret = self.run_function('state.highstate', minion_tgt='proxytest')
for key, value in ret.items():
self.assertTrue(value['result'])

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
'''
Tests for the spm files utility
'''
# Import python libs
from __future__ import absolute_import
import os
import shutil
# Import Salt Testing libs
from tests.support.case import SPMCase
from tests.support.helpers import destructiveTest
@destructiveTest
class SPMFilesTest(SPMCase):
'''
Validate the spm files command
'''
def setUp(self):
self.config = self._spm_config()
self._spm_build_files(self.config)
def test_spm_files(self):
'''
test spm files
'''
self._spm_create_update_repo(self.config)
install = self.run_spm('install', self.config, 'apache')
get_files = self.run_spm('files', self.config, 'apache')
os.path.exists(os.path.join(self.config['formula_path'], 'apache',
'apache.sls'))
def tearDown(self):
shutil.rmtree(self._tmp_spm)

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
'''
Tests for the spm info utility
'''
# Import python libs
from __future__ import absolute_import
import shutil
# Import Salt Testing libs
from tests.support.case import SPMCase
from tests.support.helpers import destructiveTest
@destructiveTest
class SPMInfoTest(SPMCase):
'''
Validate the spm info command
'''
def setUp(self):
self.config = self._spm_config()
self._spm_build_files(self.config)
def test_spm_info(self):
'''
test spm build
'''
self._spm_create_update_repo(self.config)
install = self.run_spm('install', self.config, 'apache')
get_info = self.run_spm('info', self.config, 'apache')
check_info = ['Supported OSes', 'Supported OS', 'installing Apache']
for info in check_info:
self.assertIn(info, ''.join(get_info))
def tearDown(self):
shutil.rmtree(self._tmp_spm)

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
'''
Tests for the spm install utility
'''
# Import python libs
from __future__ import absolute_import
import os
import shutil
# Import Salt Testing libs
from tests.support.case import SPMCase
from tests.support.helpers import destructiveTest
@destructiveTest
class SPMInstallTest(SPMCase):
'''
Validate the spm install command
'''
def setUp(self):
self.config = self._spm_config()
self._spm_build_files(self.config)
def test_spm_install_local_dir(self):
'''
test spm install from local directory
'''
build_spm = self.run_spm('build', self.config, self.formula_dir)
spm_file = os.path.join(self.config['spm_build_dir'],
'apache-201506-2.spm')
install = self.run_spm('install', self.config, spm_file)
sls = os.path.join(self.config['formula_path'], 'apache', 'apache.sls')
self.assertTrue(os.path.exists(sls))
def test_spm_install_from_repo(self):
'''
test spm install from repo
'''
self._spm_create_update_repo(self.config)
install = self.run_spm('install', self.config, 'apache')
sls = os.path.join(self.config['formula_path'], 'apache', 'apache.sls')
self.assertTrue(os.path.exists(sls))
def tearDown(self):
shutil.rmtree(self._tmp_spm)

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
'''
Tests for the spm repo
'''
# Import python libs
from __future__ import absolute_import
import os
import shutil
# Import Salt Testing libs
from tests.support.case import SPMCase
from tests.support.helpers import destructiveTest
@destructiveTest
class SPMRepoTest(SPMCase):
'''
Validate commands related to spm repo
'''
def setUp(self):
self.config = self._spm_config()
self._spm_build_files(self.config)
def test_spm_create_update_repo(self):
'''
test spm create_repo
'''
self._spm_create_update_repo(self.config)
self.assertTrue(os.path.exists(self.config['spm_db']))
l_repo_file = os.path.join(self.config['spm_cache_dir'], 'local_repo.p')
self.assertTrue(os.path.exists(l_repo_file))
def tearDown(self):
shutil.rmtree(self._tmp_spm)

View File

@ -130,6 +130,9 @@ TEST_SUITES = {
'returners':
{'display_name': 'Returners',
'path': 'integration/returners'},
'spm':
{'display_name': 'SPM',
'path': 'integration/spm'},
'loader':
{'display_name': 'Loader',
'path': 'integration/loader'},
@ -338,6 +341,13 @@ class SaltTestsuiteParser(SaltCoverageTestingParser):
action='store_true',
help='Run salt/returners/*.py tests'
)
self.test_selection_group.add_option(
'--spm',
dest='spm',
default=False,
action='store_true',
help='Run spm integration tests'
)
self.test_selection_group.add_option(
'-l',
'--loader',

View File

@ -22,6 +22,7 @@ import time
import stat
import errno
import signal
import textwrap
import logging
import tempfile
import subprocess
@ -564,11 +565,61 @@ class ShellCase(ShellTestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixi
timeout=timeout)
class SPMTestUserInterface(object):
'''
Test user interface to SPMClient
'''
def __init__(self):
self._status = []
self._confirm = []
self._error = []
def status(self, msg):
self._status.append(msg)
def confirm(self, action):
self._confirm.append(action)
def error(self, msg):
self._error.append(msg)
class SPMCase(TestCase, AdaptedConfigurationTestCaseMixin):
'''
Class for handling spm commands
'''
def _spm_build_files(self, config):
self.formula_dir = os.path.join(' '.join(config['file_roots']['base']), 'formulas')
self.formula_sls_dir = os.path.join(self.formula_dir, 'apache')
self.formula_sls = os.path.join(self.formula_sls_dir, 'apache.sls')
self.formula_file = os.path.join(self.formula_dir, 'FORMULA')
dirs = [self.formula_dir, self.formula_sls_dir]
for f_dir in dirs:
os.makedirs(f_dir)
# Late import
import salt.utils.files
with salt.utils.files.fopen(self.formula_sls, 'w') as fp:
fp.write(textwrap.dedent('''\
install-apache:
pkg.installed:
- name: apache2
'''))
with salt.utils.files.fopen(self.formula_file, 'w') as fp:
fp.write(textwrap.dedent('''\
name: apache
os: RedHat, Debian, Ubuntu, Suse, FreeBSD
os_family: RedHat, Debian, Suse, FreeBSD
version: 201506
release: 2
summary: Formula for installing Apache
description: Formula for installing Apache
'''))
def _spm_config(self):
self._tmp_spm = tempfile.mkdtemp()
config = self.get_temp_config('minion', **{
@ -595,16 +646,37 @@ class SPMCase(TestCase, AdaptedConfigurationTestCaseMixin):
})
return config
def _spm_create_update_repo(self, config):
build_spm = self.run_spm('build', self.config, self.formula_dir)
c_repo = self.run_spm('create_repo', self.config,
self.config['spm_build_dir'])
repo_conf_dir = self.config['spm_repos_config'] + '.d'
os.makedirs(repo_conf_dir)
# Late import
import salt.utils.files
with salt.utils.files.fopen(os.path.join(repo_conf_dir, 'spm.repo'), 'w') as fp:
fp.write(textwrap.dedent('''\
local_repo:
url: file://{0}
'''.format(self.config['spm_build_dir'])))
u_repo = self.run_spm('update_repo', self.config)
def _spm_client(self, config):
import salt.spm
ui = salt.spm.SPMCmdlineInterface()
client = salt.spm.SPMClient(ui, config)
self.ui = SPMTestUserInterface()
client = salt.spm.SPMClient(self.ui, config)
return client
def run_spm(self, cmd, config, arg=()):
def run_spm(self, cmd, config, arg=None):
client = self._spm_client(config)
spm_cmd = client.run([cmd, arg])
return spm_cmd
return self.ui._status
class ModuleCase(TestCase, SaltClientTestCaseMixin):

View File

@ -10,6 +10,8 @@ from __future__ import absolute_import
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
MagicMock,
Mock,
patch,
NO_MOCK,
NO_MOCK_REASON
@ -17,6 +19,7 @@ from tests.support.mock import (
# Import Salt Libs
import salt.modules.win_groupadd as win_groupadd
import salt.utils.win_functions
# Import Other Libs
# pylint: disable=unused-import
@ -24,12 +27,46 @@ try:
import win32com
import pythoncom
import pywintypes
PYWINTYPES_ERROR = pywintypes.com_error(
-1234, 'Exception occurred.', (0, None, 'C', None, 0, -4321), None)
HAS_WIN_LIBS = True
except ImportError:
HAS_WIN_LIBS = False
# pylint: enable=unused-import
class MockMember(object):
def __init__(self, name):
self.ADSPath = name
class MockGroupObj(object):
def __init__(self, ads_name, ads_users):
self._members = [MockMember(x) for x in ads_users]
self.Name = ads_name
def members(self):
return self._members
def Add(self, name):
'''
This should be a no-op unless we want to test raising an error, in
which case this should be overridden in a subclass.
'''
pass
def Remove(self, name):
'''
This should be a no-op unless we want to test raising an error, in
which case this should be overridden in a subclass.
'''
pass
if not NO_MOCK:
sam_mock = MagicMock(side_effect=lambda x: 'HOST\\' + x)
@skipIf(not HAS_WIN_LIBS, 'win_groupadd unit tests can only be run if win32com, pythoncom, and pywintypes are installed')
@skipIf(NO_MOCK, NO_MOCK_REASON)
class WinGroupTestCase(TestCase, LoaderModuleMockMixin):
@ -41,106 +78,352 @@ class WinGroupTestCase(TestCase, LoaderModuleMockMixin):
win_groupadd: {'__opts__': {'test': False}}
}
# 'add' function tests: 1
def test_add(self):
'''
Test if it add the specified group
Test adding a new group
'''
self.assertDictEqual(win_groupadd.add('foo'),
{'changes': [], 'name': 'foo', 'result': None,
'comment': 'The group foo already exists.'})
info = MagicMock(return_value=False)
with patch.object(win_groupadd, 'info', info),\
patch.object(win_groupadd, '_get_computer_object', Mock()):
self.assertDictEqual(win_groupadd.add('foo'),
{'changes': ['Successfully created group foo'],
'name': 'foo',
'result': True,
'comment': ''})
# 'delete' function tests: 1
def test_add_group_exists(self):
'''
Test adding a new group if the group already exists
'''
info = MagicMock(return_value={'name': 'foo',
'passwd': None,
'gid': None,
'members': ['HOST\\spongebob']})
with patch.object(win_groupadd, 'info', info),\
patch.object(win_groupadd, '_get_computer_object', Mock()):
self.assertDictEqual(win_groupadd.add('foo'),
{'changes': [], 'name': 'foo', 'result': None,
'comment': 'The group foo already exists.'})
def test_add_error(self):
'''
Test adding a group and encountering an error
'''
class CompObj(object):
def Create(self, type, name):
raise PYWINTYPES_ERROR
compobj_mock = MagicMock(return_value=CompObj())
info = MagicMock(return_value=False)
with patch.object(win_groupadd, 'info', info),\
patch.object(win_groupadd, '_get_computer_object', compobj_mock):
self.assertDictEqual(win_groupadd.add('foo'),
{'changes': [],
'name': 'foo',
'result': False,
'comment': 'Failed to create group foo. C'})
def test_delete(self):
'''
Test if it remove the specified group
Test removing a group
'''
self.assertDictEqual(win_groupadd.delete('foo'),
{'changes': [], 'name': 'foo', 'result': None,
'comment': 'The group foo does not exists.'})
info = MagicMock(return_value={'name': 'foo',
'passwd': None,
'gid': None,
'members': ['HOST\\spongebob']})
with patch.object(win_groupadd, 'info', info), \
patch.object(win_groupadd, '_get_computer_object', Mock()):
self.assertDictEqual(
win_groupadd.delete('foo'),
{'changes': ['Successfully removed group foo'],
'name': 'foo',
'result': True,
'comment': ''})
# 'info' function tests: 1
def test_delete_no_group(self):
'''
Test removing a group that doesn't exists
'''
info = MagicMock(return_value=False)
with patch.object(win_groupadd, 'info', info), \
patch.object(win_groupadd, '_get_computer_object', Mock()):
self.assertDictEqual(win_groupadd.delete('foo'),
{'changes': [], 'name': 'foo', 'result': None,
'comment': 'The group foo does not exists.'})
def test_delete_error(self):
'''
Test removing a group and encountering an error
'''
class CompObj(object):
def Delete(self, type, name):
raise PYWINTYPES_ERROR
compobj_mock = MagicMock(return_value=CompObj())
info = MagicMock(return_value={'name': 'foo',
'passwd': None,
'gid': None,
'members': ['HOST\\spongebob']})
with patch.object(win_groupadd, 'info', info),\
patch.object(win_groupadd, '_get_computer_object', compobj_mock):
self.assertDictEqual(
win_groupadd.delete('foo'),
{'changes': [],
'name': 'foo',
'result': False,
'comment': 'Failed to remove group foo. C'})
def test_info(self):
'''
Test if it return information about a group.
'''
with patch(win_groupadd.win32.client, 'flag', None):
self.assertDictEqual(win_groupadd.info('dc=salt'),
groupobj_mock = MagicMock(return_value=MockGroupObj('salt', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock):
self.assertDictEqual(win_groupadd.info('salt'),
{'gid': None,
'members': ['dc=\\user1'],
'members': ['HOST\\steve'],
'passwd': None,
'name': 'WinNT://./dc=salt,group'})
with patch(win_groupadd.win32.client, 'flag', 1):
self.assertFalse(win_groupadd.info('dc=salt'))
with patch(win_groupadd.win32.client, 'flag', 2):
self.assertFalse(win_groupadd.info('dc=salt'))
# 'getent' function tests: 1
'name': 'salt'})
def test_getent(self):
groupobj_mock = MagicMock(
return_value=[
MockGroupObj('salt', ['WinNT://HOST/steve']),
MockGroupObj('salty', ['WinNT://HOST/spongebob'])])
mock_g_to_g = MagicMock(side_effect=[1, 2])
with patch.object(win_groupadd, '_get_all_groups', groupobj_mock),\
patch.dict(win_groupadd.__salt__, {'file.group_to_gid': mock_g_to_g}):
self.assertListEqual(
win_groupadd.getent(),
[
{'gid': 1, 'members': ['HOST\\steve'], 'name': 'salt', 'passwd': 'x'},
{'gid': 2, 'members': ['HOST\\spongebob'], 'name': 'salty', 'passwd': 'x'}
])
def test_getent_context(self):
'''
Test if it return info on all groups
Test group.getent is using the values in __context__
'''
with patch.dict(win_groupadd.__context__, {'group.getent': True}):
self.assertTrue(win_groupadd.getent())
# 'adduser' function tests: 1
def test_adduser(self):
'''
Test if it add a user to a group
Test adding a user to a group
'''
with patch(win_groupadd.win32.client, 'flag', None):
self.assertDictEqual(win_groupadd.adduser('dc=foo', 'dc=\\username'),
{'changes': {'Users Added': ['dc=\\username']},
'comment': '', 'name': 'dc=foo', 'result': True})
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.adduser('foo', 'spongebob'),
{'changes': {'Users Added': ['HOST\\spongebob']},
'comment': '',
'name': 'foo',
'result': True})
with patch(win_groupadd.win32.client, 'flag', 1):
comt = ('Failed to add dc=\\username to group dc=foo. C')
self.assertDictEqual(win_groupadd.adduser('dc=foo', 'dc=\\username'),
{'changes': {'Users Added': []}, 'name': 'dc=foo',
'comment': comt, 'result': False})
def test_adduser_already_exists(self):
'''
Test adding a user that already exists
'''
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.adduser('foo', 'steve'),
{'changes': {'Users Added': []},
'comment': 'User HOST\\steve is already a member of foo',
'name': 'foo',
'result': None})
# 'deluser' function tests: 1
def test_adduser_error(self):
'''
Test adding a user and encountering an error
'''
# Create mock group object with mocked Add function which raises the
# exception we need in order to test the error case.
class GroupObj(MockGroupObj):
def Add(self, name):
raise PYWINTYPES_ERROR
groupobj_mock = MagicMock(return_value=GroupObj('foo', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.adduser('foo', 'username'),
{'changes': {'Users Added': []},
'name': 'foo',
'comment': 'Failed to add HOST\\username to group foo. C',
'result': False})
def test_adduser_group_does_not_exist(self):
groupobj_mock = MagicMock(side_effect=PYWINTYPES_ERROR)
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.adduser('foo', 'spongebob'),
{'changes': {'Users Added': []},
'name': 'foo',
'comment': 'Failure accessing group foo. C',
'result': False})
def test_deluser(self):
'''
Test if it remove a user to a group
Test removing a user from a group
'''
ret = {'changes': {'Users Removed': []},
'comment': 'User dc=\\username is not a member of dc=foo',
'name': 'dc=foo', 'result': None}
# Test removing a user
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', ['WinNT://HOST/spongebob']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
ret = {'changes': {'Users Removed': ['spongebob']},
'comment': '',
'name': 'foo',
'result': True}
self.assertDictEqual(win_groupadd.deluser('foo', 'spongebob'), ret)
self.assertDictEqual(win_groupadd.deluser('dc=foo', 'dc=\\username'),
ret)
def test_deluser_no_user(self):
'''
Test removing a user from a group and that user is not a member of the
group
'''
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
ret = {'changes': {'Users Removed': []},
'comment': 'User spongebob is not a member of foo',
'name': 'foo',
'result': None}
self.assertDictEqual(win_groupadd.deluser('foo', 'spongebob'), ret)
# 'members' function tests: 1
def test_deluser_error(self):
'''
Test removing a user and encountering an error
'''
class GroupObj(MockGroupObj):
def Remove(self, name):
raise PYWINTYPES_ERROR
groupobj_mock = MagicMock(return_value=GroupObj('foo', ['WinNT://HOST/spongebob']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.deluser('foo', 'spongebob'),
{'changes': {'Users Removed': []},
'name': 'foo',
'comment': 'Failed to remove spongebob from group foo. C',
'result': False})
def test_deluser_group_does_not_exist(self):
groupobj_mock = MagicMock(side_effect=PYWINTYPES_ERROR)
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.deluser('foo', 'spongebob'),
{'changes': {'Users Removed': []},
'name': 'foo',
'comment': 'Failure accessing group foo. C',
'result': False})
def test_members(self):
'''
Test if it remove a user to a group
Test adding a list of members to a group, all existing users removed
'''
comment = ['Failure accessing group dc=foo. C']
ret = {'name': 'dc=foo', 'result': False, 'comment': comment,
'changes': {'Users Added': [], 'Users Removed': []}}
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', ['WinNT://HOST/steve']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.members('foo', 'spongebob,patrick,squidward'),
{'changes': {
'Users Added': ['HOST\\patrick', 'HOST\\spongebob', 'HOST\\squidward'],
'Users Removed': ['HOST\\steve']
},
'comment': [],
'name': 'foo',
'result': True})
with patch(win_groupadd.win32.client, 'flag', 2):
self.assertDictEqual(win_groupadd.members
('dc=foo', 'dc=\\user1,dc=\\user2,dc=\\user3'),
ret)
def test_members_correct_membership(self):
'''
Test adding a list of users where the list of users already exists
'''
members_list = ['WinNT://HOST/spongebob',
'WinNT://HOST/squidward',
'WinNT://HOST/patrick']
groupobj_mock = MagicMock(return_value=MockGroupObj('foo', members_list))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.members('foo', 'spongebob,patrick,squidward'),
{'changes': {'Users Added': [], 'Users Removed': []},
'comment': ['foo membership is correct'],
'name': 'foo',
'result': None})
with patch(win_groupadd.win32.client, 'flag', 1):
comment = ['Failed to add dc=\\user2 to dc=foo. C',
'Failed to remove dc=\\user1 from dc=foo. C']
ret.update({'comment': comment, 'result': False})
self.assertDictEqual(win_groupadd.members('dc=foo', 'dc=\\user2'), ret)
def test_members_group_does_not_exist(self):
'''
Test adding a list of users where the group does not exist
'''
groupobj_mock = MagicMock(side_effect=PYWINTYPES_ERROR)
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.members('foo', 'spongebob'),
{'changes': {'Users Added': [], 'Users Removed': []},
'comment': ['Failure accessing group foo. C'],
'name': 'foo',
'result': False})
with patch(win_groupadd.win32.client, 'flag', None):
comment = ['dc=foo membership is correct']
ret.update({'comment': comment, 'result': None})
self.assertDictEqual(win_groupadd.members('dc=foo', 'dc=\\user1'), ret)
def test_members_fail_to_remove(self):
'''
Test adding a list of members and fail to remove members not in the list
'''
class GroupObj(MockGroupObj):
def Remove(self, name):
raise PYWINTYPES_ERROR
groupobj_mock = MagicMock(return_value=GroupObj('foo', ['WinNT://HOST/spongebob']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.members('foo', 'patrick'),
{'changes': {'Users Added': ['HOST\\patrick'], 'Users Removed': []},
'comment': ['Failed to remove HOST\\spongebob from foo. C'],
'name': 'foo',
'result': False})
def test_members_fail_to_add(self):
'''
Test adding a list of members and failing to add
'''
class GroupObj(MockGroupObj):
def Add(self, name):
raise PYWINTYPES_ERROR
groupobj_mock = MagicMock(return_value=GroupObj('foo', ['WinNT://HOST/spongebob']))
with patch.object(win_groupadd, '_get_group_object', groupobj_mock), \
patch.object(salt.utils.win_functions, 'get_sam_name', sam_mock):
self.assertDictEqual(
win_groupadd.members('foo', 'patrick'),
{'changes': {'Users Added': [], 'Users Removed': ['HOST\\spongebob']},
'comment': ['Failed to add HOST\\patrick to foo. C'],
'name': 'foo',
'result': False})
def test_list_groups(self):
'''
Test that list groups returns a list of groups by name
'''
groupobj_mock = MagicMock(
return_value=[
MockGroupObj('salt', ['WinNT://HOST/steve']),
MockGroupObj('salty', ['WinNT://HOST/Administrator'])])
with patch.object(win_groupadd, '_get_all_groups', groupobj_mock):
self.assertListEqual(win_groupadd.list_groups(),
['salt', 'salty'])
def test_list_groups_context(self):
'''
Test group.list_groups is using the values in __context__
'''
with patch.dict(win_groupadd.__context__, {'group.list_groups': True}):
self.assertTrue(win_groupadd.list_groups())