salt/tests/integration/__init__.py
Pedro Algarvio fc3b272809 Migrated test.integration.states.pip to use SaltReturnAssertsMixIn.
* Added `assertSaltCommentRegexpMatches()` to `SaltReturnAssertsMixIn` to add the ability to match salt comments using regular expressions.
2012-12-07 16:59:24 +00:00

983 lines
33 KiB
Python

'''
Set up the Salt integration test suite
'''
# Import Python libs
import optparse
import multiprocessing
import os
import sys
import shutil
import tempfile
import time
import signal
from hashlib import md5
from subprocess import PIPE, Popen
from datetime import datetime, timedelta
try:
import pwd
except ImportError:
pass
# Import Salt libs
import salt
import salt.config
import salt.master
import salt.minion
import salt.runner
from salt.utils import fopen, get_colors
from salt.utils.verify import verify_env
from saltunittest import TestCase, RedirectStdStreams
try:
import console
width, height = console.getTerminalSize()
PNUM = width
except:
PNUM = 70
INTEGRATION_TEST_DIR = os.path.dirname(
os.path.normpath(os.path.abspath(__file__))
)
CODE_DIR = os.path.dirname(os.path.dirname(INTEGRATION_TEST_DIR))
SCRIPT_DIR = os.path.join(CODE_DIR, 'scripts')
PYEXEC = 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1])
SYS_TMP_DIR = tempfile.gettempdir()
TMP = os.path.join(SYS_TMP_DIR, 'salt-tests-tmpdir')
FILES = os.path.join(INTEGRATION_TEST_DIR, 'files')
MOCKBIN = os.path.join(INTEGRATION_TEST_DIR, 'mockbin')
def print_header(header, sep='~', top=True, bottom=True, inline=False,
centered=False):
'''
Allows some pretty printing of headers on the console, either with a
"ruler" on bottom and/or top, inline, centered, etc.
'''
if top and not inline:
print(sep * PNUM)
if centered and not inline:
fmt = u'{0:^{width}}'
elif inline and not centered:
fmt = u'{0:{sep}<{width}}'
elif inline and centered:
fmt = u'{0:{sep}^{width}}'
else:
fmt = u'{0}'
print(fmt.format(header, sep=sep, width=PNUM))
if bottom and not inline:
print(sep * PNUM)
def run_tests(TestCase):
'''
Run integration tests for a chosen test case.
Function uses optparse to set up test environment
'''
from saltunittest import TestLoader, TextTestRunner
opts = parse_opts()
loader = TestLoader()
tests = loader.loadTestsFromTestCase(TestCase)
print('Setting up Salt daemons to execute tests')
with TestDaemon(clean=opts.clean):
runner = TextTestRunner(verbosity=opts.verbosity).run(tests)
sys.exit(runner.wasSuccessful())
def parse_opts():
'''
Parse command line options for running integration tests
'''
parser = optparse.OptionParser()
parser.add_option('-v',
'--verbose',
dest='verbosity',
default=1,
action='count',
help='Verbose test runner output')
parser.add_option('--clean',
dest='clean',
default=True,
action='store_true',
help=('Clean up test environment before and after '
'integration testing (default behaviour)'))
parser.add_option('--no-clean',
dest='clean',
action='store_false',
help=('Don\'t clean up test environment before and after '
'integration testing (speed up test process)'))
options, _ = parser.parse_args()
return options
class TestDaemon(object):
'''
Set up the master and minion daemons, and run related cases
'''
MINIONS_CONNECT_TIMEOUT = MINIONS_SYNC_TIMEOUT = 120
def __init__(self, opts=None):
self.opts = opts
self.colors = get_colors(opts.no_colors is False)
def __enter__(self):
'''
Start a master and minion
'''
self.master_opts = salt.config.master_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
self.minion_opts = salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'minion')
)
#if sys.version_info < (2, 7):
# self.minion_opts['multiprocessing'] = False
self.sub_minion_opts = salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'sub_minion')
)
#if sys.version_info < (2, 7):
# self.sub_minion_opts['multiprocessing'] = False
self.smaster_opts = salt.config.master_config(
os.path.join(
INTEGRATION_TEST_DIR, 'files', 'conf', 'syndic_master'
)
)
self.syndic_opts = salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'syndic'))
self.syndic_opts['_master_conf_file'] = os.path.join(
INTEGRATION_TEST_DIR,
'files/conf/master'
)
# Set up config options that require internal data
self.master_opts['pillar_roots'] = {
'base': [os.path.join(FILES, 'pillar', 'base')]
}
self.master_opts['file_roots'] = {
'base': [os.path.join(FILES, 'file', 'base')]
}
self.master_opts['ext_pillar'] = [
{'cmd_yaml': 'cat {0}'.format(
os.path.join(
FILES,
'ext.yaml'
)
)}
]
# clean up the old files
self._clean()
# Point the config values to the correct temporary paths
for name in ('hosts', 'aliases'):
optname = '{0}.file'.format(name)
optname_path = os.path.join(TMP, name)
self.master_opts[optname] = optname_path
self.minion_opts[optname] = optname_path
self.sub_minion_opts[optname] = optname_path
verify_env([os.path.join(self.master_opts['pki_dir'], 'minions'),
os.path.join(self.master_opts['pki_dir'], 'minions_pre'),
os.path.join(self.master_opts['pki_dir'],
'minions_rejected'),
os.path.join(self.master_opts['cachedir'], 'jobs'),
os.path.join(self.smaster_opts['pki_dir'], 'minions'),
os.path.join(self.smaster_opts['pki_dir'], 'minions_pre'),
os.path.join(self.smaster_opts['pki_dir'],
'minions_rejected'),
os.path.join(self.smaster_opts['cachedir'], 'jobs'),
os.path.dirname(self.master_opts['log_file']),
self.minion_opts['extension_modules'],
self.sub_minion_opts['extension_modules'],
self.sub_minion_opts['pki_dir'],
self.master_opts['sock_dir'],
self.smaster_opts['sock_dir'],
self.sub_minion_opts['sock_dir'],
self.minion_opts['sock_dir'],
TMP
],
pwd.getpwuid(os.getuid()).pw_name)
# Set up PATH to mockbin
self._enter_mockbin()
master = salt.master.Master(self.master_opts)
self.master_process = multiprocessing.Process(target=master.start)
self.master_process.start()
minion = salt.minion.Minion(self.minion_opts)
self.minion_process = multiprocessing.Process(target=minion.tune_in)
self.minion_process.start()
sub_minion = salt.minion.Minion(self.sub_minion_opts)
self.sub_minion_process = multiprocessing.Process(
target=sub_minion.tune_in
)
self.sub_minion_process.start()
smaster = salt.master.Master(self.smaster_opts)
self.smaster_process = multiprocessing.Process(target=smaster.start)
self.smaster_process.start()
syndic = salt.minion.Syndic(self.syndic_opts)
self.syndic_process = multiprocessing.Process(target=syndic.tune_in)
self.syndic_process.start()
#if os.environ.get('DUMP_SALT_CONFIG', None) is not None:
# try:
# import yaml
# os.makedirs('/tmp/salttest/conf')
# except OSError:
# pass
# self.master_opts['user'] = pwd.getpwuid(os.getuid()).pw_name
# self.minion_opts['user'] = pwd.getpwuid(os.getuid()).pw_name
# open('/tmp/salttest/conf/master', 'w').write(
# yaml.dump(self.master_opts)
# )
# open('/tmp/salttest/conf/minion', 'w').write(
# yaml.dump(self.minion_opts)
# )
# Let's create a local client to ping and sync minions
self.client = salt.client.LocalClient(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
self.minion_targets = set(['minion', 'sub_minion'])
self.pre_setup_minions()
self.setup_minions()
if self.opts.sysinfo:
from salt import version
print_header('~~~~~~~ Versions Report ', inline=True)
print('\n'.join(version.versions_report()))
print_header(
'~~~~~~~ Minion Grains Information ', inline=True,
)
grains = self.client.cmd('minion', 'grains.items')
import pprint
pprint.pprint(grains['minion'])
print_header('', sep='=', inline=True)
try:
return self
finally:
self.post_setup_minions()
def __exit__(self, type, value, traceback):
'''
Kill the minion and master processes
'''
self.sub_minion_process.terminate()
self.minion_process.terminate()
self.master_process.terminate()
self.syndic_process.terminate()
self.smaster_process.terminate()
self._exit_mockbin()
self._clean()
def pre_setup_minions(self):
"""
Subclass this method for additional minion setups.
"""
def setup_minions(self):
# Wait for minions to connect back
wait_minion_connections = multiprocessing.Process(
target=self.wait_for_minion_connections,
args=(self.minion_targets, self.MINIONS_CONNECT_TIMEOUT)
)
wait_minion_connections.start()
wait_minion_connections.join()
wait_minion_connections.terminate()
if wait_minion_connections.exitcode > 0:
print(
'\n {RED_BOLD}*{ENDC} ERROR: Minions failed to connect'.format(
**self.colors
)
)
return False
del(wait_minion_connections)
sync_needed = self.opts.clean
if self.opts.clean is False:
def sumfile(fpath):
# Since we will be do'in this for small files, it should be ok
fobj = fopen(fpath)
m = md5()
while True:
d = fobj.read(8096)
if not d:
break
m.update(d)
return m.hexdigest()
# Since we're not cleaning up, let's see if modules are already up
# to date so we don't need to re-sync them
modules_dir = os.path.join(FILES, 'file', 'base', '_modules')
for fname in os.listdir(modules_dir):
if not fname.endswith('.py'):
continue
dfile = os.path.join(
'/tmp/salttest/cachedir/extmods/modules/', fname
)
if not os.path.exists(dfile):
sync_needed = True
break
sfile = os.path.join(modules_dir, fname)
if sumfile(sfile) != sumfile(dfile):
sync_needed = True
break
if sync_needed:
# Wait for minions to "sync_all"
sync_minions = multiprocessing.Process(
target=self.sync_minion_modules,
args=(self.minion_targets, self.MINIONS_SYNC_TIMEOUT)
)
sync_minions.start()
sync_minions.join()
if sync_minions.exitcode > 0:
return False
sync_minions.terminate()
del(sync_minions)
return True
def post_setup_minions(self):
"""
Subclass this method to execute code after the minions have been setup
"""
def _enter_mockbin(self):
path = os.environ.get('PATH', '')
path_items = path.split(os.pathsep)
if MOCKBIN not in path_items:
path_items.insert(0, MOCKBIN)
os.environ['PATH'] = os.pathsep.join(path_items)
def _exit_mockbin(self):
path = os.environ.get('PATH', '')
path_items = path.split(os.pathsep)
try:
path_items.remove(MOCKBIN)
except ValueError:
pass
os.environ['PATH'] = os.pathsep.join(path_items)
def _clean(self):
'''
Clean out the tmp files
'''
if not self.opts.clean:
return
if os.path.isdir(self.sub_minion_opts['root_dir']):
shutil.rmtree(self.sub_minion_opts['root_dir'])
if os.path.isdir(self.master_opts['root_dir']):
shutil.rmtree(self.master_opts['root_dir'])
if os.path.isdir(self.smaster_opts['root_dir']):
shutil.rmtree(self.smaster_opts['root_dir'])
if os.path.isdir(TMP):
shutil.rmtree(TMP)
def wait_for_jid(self, targets, jid, timeout=120):
time.sleep(1) # Allow some time for minions to accept jobs
now = datetime.now()
expire = now + timedelta(seconds=timeout)
job_finished = False
while now <= expire:
running = self.__client_job_running(targets, jid)
sys.stdout.write('\r' + ' ' * PNUM + '\r')
if not running and job_finished is False:
# Let's not have false positives and wait one more seconds
job_finished = True
elif not running and job_finished is True:
return True
elif running and job_finished is True:
job_finished = False
if job_finished is False:
sys.stdout.write(
' * {YELLOW}[Quit in {0}]{ENDC} Waiting for {1}'.format(
'{0}'.format(expire - now).rsplit('.', 1)[0],
', '.join(running),
**self.colors
)
)
sys.stdout.flush()
time.sleep(1)
now = datetime.now()
else:
sys.stdout.write(
'\n {RED_BOLD}*{ENDC} ERROR: Failed to get information '
'back\n'.format(**self.colors)
)
sys.stdout.flush()
return False
def __client_job_running(self, targets, jid):
running = self.client.cmd(
','.join(targets), 'saltutil.running', expr_form='list'
)
return [
k for (k, v) in running.iteritems() if v and v[0]['jid'] == jid
]
def wait_for_minion_connections(self, targets, timeout):
sys.stdout.write(
' {LIGHT_BLUE}*{ENDC} Waiting at most {0} for minions({1}) to '
'connect back\n'.format(
(timeout > 60 and
timedelta(seconds=timeout) or
'{0} secs'.format(timeout)),
', '.join(targets),
**self.colors
)
)
sys.stdout.flush()
expected_connections = set(targets)
now = datetime.now()
expire = now + timedelta(seconds=timeout)
while now <= expire:
sys.stdout.write('\r' + ' ' * PNUM + '\r')
sys.stdout.write(
' * {YELLOW}[Quit in {0}]{ENDC} Waiting for {1}'.format(
'{0}'.format(expire - now).rsplit('.', 1)[0],
', '.join(expected_connections),
**self.colors
)
)
sys.stdout.flush()
responses = self.client.cmd(
','.join(expected_connections), 'test.ping', expr_form='list',
)
for target in responses:
if target not in expected_connections:
# Someone(minion) else "listening"?
print target
continue
expected_connections.remove(target)
sys.stdout.write('\r' + ' ' * PNUM + '\r')
sys.stdout.write(
' {LIGHT_GREEN}*{ENDC} {0} connected.\n'.format(
target, **self.colors
)
)
sys.stdout.flush()
if not expected_connections:
return
time.sleep(1)
now = datetime.now()
else:
print(
'\n {RED_BOLD}*{ENDC} WARNING: Minions failed to connect '
'back. Tests requiring them WILL fail'.format(**self.colors)
)
print_header('=', sep='=', inline=True)
raise SystemExit()
def sync_minion_modules(self, targets, timeout=120):
# Let's sync all connected minions
print(
' {LIGHT_BLUE}*{ENDC} Syncing minion\'s modules '
'(saltutil.sync_modules)'.format(
', '.join(targets),
**self.colors
)
)
syncing = set(targets)
jid_info = self.client.run_job(
','.join(targets), 'saltutil.sync_modules',
expr_form='list',
timeout=9999999999999999,
)
if self.wait_for_jid(targets, jid_info['jid'], timeout) is False:
print(
' {RED_BOLD}*{ENDC} WARNING: Minions failed to sync modules. '
'Tests requiring these modules WILL fail'.format(**self.colors)
)
raise SystemExit()
while syncing:
rdata = self.client.get_returns(jid_info['jid'], syncing, 1)
if rdata:
for name, output in rdata.iteritems():
print(
' {LIGHT_GREEN}*{ENDC} Synced {0} modules: '
'{1}'.format(name, ', '.join(output), **self.colors)
)
# Synced!
try:
syncing.remove(name)
except KeyError:
print(
' {RED_BOLD}*{ENDC} {0} already synced??? '
'{1}'.format(name, output, **self.colors)
)
return True
class SaltClientTestCaseMixIn(object):
_salt_client_config_file_name_ = 'master'
__slots__ = ('client', '_salt_client_config_file_name_')
@property
def client(self):
return salt.client.LocalClient(
os.path.join(
INTEGRATION_TEST_DIR, 'files', 'conf',
self._salt_client_config_file_name_
)
)
class ModuleCase(TestCase, SaltClientTestCaseMixIn):
'''
Execute a module function
'''
def minion_run(self, _function, *args, **kw):
'''
Run a single salt function on the 'minion' target and condition
the return down to match the behavior of the raw function call
'''
return self.run_function(_function, args, **kw)
def run_function(self, function, arg=(), minion_tgt='minion', **kwargs):
'''
Run a single salt function and condition the return down to match the
behavior of the raw function call
'''
know_to_return_none = ('file.chown', 'file.chgrp')
orig = self.client.cmd(
minion_tgt, function, arg, timeout=500, kwarg=kwargs
)
if minion_tgt not in orig:
self.skipTest(
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get a reply '
'from the minion \'{0}\'. Command output: {1}'.format(
minion_tgt, orig
)
)
elif orig[minion_tgt] is None and function not in know_to_return_none:
self.skipTest(
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get \'{0}\' from '
'the minion \'{1}\'. Command output: {2}'.format(
function, minion_tgt, orig
)
)
return orig[minion_tgt]
def state_result(self, ret, raw=False):
'''
Return the result data from a single state return
'''
res = ret[next(iter(ret))]
if raw:
return res
return res['result']
def run_state(self, function, **kwargs):
'''
Run the state.single command and return the state return structure
'''
return self.run_function('state.single', [function], **kwargs)
@property
def minion_opts(self):
'''
Return the options used for the minion
'''
return salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'minion')
)
@property
def sub_minion_opts(self):
'''
Return the options used for the minion
'''
return salt.config.minion_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'sub_minion')
)
@property
def master_opts(self):
'''
Return the options used for the minion
'''
return salt.config.master_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
def assert_success(self, ret):
try:
res = self.state_result(ret, raw=True)
except TypeError:
pass
else:
if isinstance(res, dict):
if res['result'] is True:
return
if 'comment' in res:
raise AssertionError(res['comment'])
ret = res
raise AssertionError('bad result: %r' % (ret))
class SyndicCase(TestCase, SaltClientTestCaseMixIn):
'''
Execute a syndic based execution test
'''
_salt_client_config_file_name_ = 'syndic_master'
def run_function(self, function, arg=()):
'''
Run a single salt function and condition the return down to match the
behavior of the raw function call
'''
orig = self.client.cmd('minion', function, arg, timeout=500)
if 'minion' not in orig:
self.skipTest(
'WARNING(SHOULD NOT HAPPEN #1935): Failed to get a reply '
'from the minion. Command output: {0}'.format(orig)
)
return orig['minion']
class ShellCase(TestCase):
'''
Execute a test for a shell command
'''
def run_script(self, script, arg_str, catch_stderr=False, timeout=None):
'''
Execute a script with the given argument string
'''
path = os.path.join(SCRIPT_DIR, script)
if not os.path.isfile(path):
return False
ppath = 'PYTHONPATH={0}:{1}'.format(CODE_DIR, ':'.join(sys.path[1:]))
cmd = '{0} {1} {2} {3}'.format(ppath, PYEXEC, path, arg_str)
popen_kwargs = {
'shell': True,
'stdout': PIPE
}
if catch_stderr is True:
popen_kwargs['stderr'] = PIPE
if not sys.platform.lower().startswith('win'):
popen_kwargs['close_fds'] = True
def detach_from_parent_group():
# detach from parent group (no more inherited signals!)
os.setpgrp()
popen_kwargs['preexec_fn'] = detach_from_parent_group
elif sys.platform.lower().startswith('win') and timeout is not None:
raise RuntimeError('Timeout is not supported under windows')
process = Popen(cmd, **popen_kwargs)
if timeout is not None:
stop_at = datetime.now() + timedelta(seconds=timeout)
term_sent = False
while True:
process.poll()
if process.returncode is not None:
break
if datetime.now() > stop_at:
if term_sent is False:
# Kill the process group since sending the term signal
# would only terminate the shell, not the command
# executed in the shell
os.killpg(os.getpgid(process.pid), signal.SIGINT)
term_sent = True
continue
# As a last resort, kill the process group
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
out = [
'Process took more than {0} seconds to complete. '
'Process Killed!'.format(timeout)
]
if catch_stderr:
return out, [
'Process killed, unable to catch stderr output'
]
return out
if catch_stderr:
if sys.version_info < (2, 7):
# On python 2.6, the subprocess'es communicate() method uses
# select which, is limited by the OS to 1024 file descriptors
# We need more available descriptors to run the tests which
# need the stderr output.
# So instead of .communicate() we wait for the process to
# finish, but, as the python docs state "This will deadlock
# when using stdout=PIPE and/or stderr=PIPE and the child
# process generates enough output to a pipe such that it
# blocks waiting for the OS pipe buffer to accept more data.
# Use communicate() to avoid that." <- a catch, catch situation
#
# Use this work around were it's needed only, python 2.6
process.wait()
out = process.stdout.read()
err = process.stderr.read()
else:
out, err = process.communicate()
# Force closing stderr/stdout to release file descriptors
process.stdout.close()
process.stderr.close()
try:
return out.splitlines(), err.splitlines()
finally:
try:
process.terminate()
except OSError, err:
# process already terminated
pass
data = process.communicate()
process.stdout.close()
try:
return data[0].splitlines()
finally:
try:
process.terminate()
except OSError, err:
# process already terminated
pass
def run_salt(self, arg_str):
'''
Execute salt
'''
mconf = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
arg_str = '-c {0} {1}'.format(mconf, arg_str)
return self.run_script('salt', arg_str)
def run_run(self, arg_str):
'''
Execute salt-run
'''
mconf = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
arg_str = '-c {0} {1}'.format(mconf, arg_str)
return self.run_script('salt-run', arg_str)
def run_run_plus(self, fun, options='', *arg):
'''
Execute Salt run and the salt run function and return the data from
each in a dict
'''
ret = {}
ret['out'] = self.run_run(
'{0} {1} {2}'.format(options, fun, ' '.join(arg))
)
opts = salt.config.master_config(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
opts.update({'doc': False, 'fun': fun, 'arg': arg})
with RedirectStdStreams():
runner = salt.runner.Runner(opts)
ret['fun'] = runner.run()
return ret
def run_key(self, arg_str, catch_stderr=False):
'''
Execute salt-key
'''
mconf = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
arg_str = '-c {0} {1}'.format(mconf, arg_str)
return self.run_script('salt-key', arg_str, catch_stderr=catch_stderr)
def run_cp(self, arg_str):
'''
Execute salt-cp
'''
mconf = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
arg_str = '--config-dir {0} {1}'.format(mconf, arg_str)
return self.run_script('salt-cp', arg_str)
def run_call(self, arg_str):
mconf = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
arg_str = '--config-dir {0} {1}'.format(mconf, arg_str)
return self.run_script('salt-call', arg_str)
class ShellCaseCommonTestsMixIn(object):
def test_deprecated_config(self):
"""
test for the --config deprecation warning
Once --config is fully deprecated, this test can be removed
"""
if getattr(self, '_call_binary_', None) is None:
self.skipTest("'_call_binary_' not defined.")
cfgfile = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
out, err = self.run_script(
self._call_binary_,
'--config {0}'.format(cfgfile),
catch_stderr=True
)
self.assertIn('Usage: {0}'.format(self._call_binary_), '\n'.join(err))
self.assertIn('deprecated', '\n'.join(err))
def test_version_includes_binary_name(self):
if getattr(self, '_call_binary_', None) is None:
self.skipTest("'_call_binary_' not defined.")
out = '\n'.join(self.run_script(self._call_binary_, "--version"))
self.assertIn(self._call_binary_, out)
self.assertIn(salt.__version__, out)
class SaltReturnAssertsMixIn(object):
def assertReturnSaltType(self, ret):
try:
self.assertTrue(isinstance(ret, dict))
except AssertionError:
raise AssertionError(
'{0} is not dict. Salt returned: {1}'.format(
type(ret).__name__, ret
)
)
def assertReturnNonEmptySaltType(self, ret):
self.assertReturnSaltType(ret)
try:
self.assertNotEqual(ret, {})
except AssertionError:
raise AssertionError(
'{} is equal to {}. Salt returned an empty dictionary.'
)
def __assertReturn(self, ret, which_case):
self.assertReturnNonEmptySaltType(ret)
for part in ret.itervalues():
self.assertReturnNonEmptySaltType(part)
try:
self.assertTrue(isinstance(part, dict))
except AssertionError:
raise AssertionError(
'{0} is not dict. Salt returned: {1}'.format(
type(part), part
)
)
try:
if which_case is True:
self.assertTrue(part['result'])
elif which_case is False:
self.assertFalse(part['result'])
elif which_case is None:
self.assertIsNone(part['result'])
except AssertionError:
raise AssertionError(
'{result} is not {0}. Salt Comment:\n{comment}'.format(
which_case, **part
)
)
def assertSaltTrueReturn(self, ret):
self.__assertReturn(ret, True)
def assertSaltFalseReturn(self, ret):
self.__assertReturn(ret, False)
def assertSaltNoneReturn(self, ret):
self.__assertReturn(ret, None)
def assertInSaltComment(self, ret, in_comment):
self.assertReturnSaltType(ret)
for part in ret.itervalues():
if 'comment' in part:
return self.assertIn(in_comment, part['comment'])
else:
raise AssertionError(
'There\'s no comment key in any of salt\'s return parts'
)
def assertNotInSaltComment(self, ret, not_in_comment):
self.assertReturnSaltType(ret)
no_comment = True
for part in ret.itervalues():
if 'comment' in part:
if no_comment is True:
no_comment = False
if not_in_comment in part['comment']:
raise AssertionError(
'{0} is in {part}'.format(not_in_comment, **part)
)
if no_comment is True:
raise AssertionError(
'There\'s no comment key in any of salt\'s return parts'
)
return True
def assertSaltCommentRegexpMatches(self, ret, pattern):
self.assertReturnSaltType(ret)
for part in ret.itervalues():
if 'comment' in part:
return self.assertRegexpMatches(part['comment'], pattern)
else:
raise AssertionError(
'There\'s no comment key in any of salt\'s return parts'
)
def __assertSaltStateChanges(self, ret, keys=()):
self.assertSaltTrueReturn(ret)
if keys and isinstance(keys, tuple):
keys = list(keys)
elif keys and isinstance(keys, string):
keys = [keys]
elif keys and not isinstance(keys, list):
raise RuntimeError('The passed keys need to be a list')
for part in ret.itervalues():
changes = part['changes']
okeys = keys[:]
while okeys:
try:
changes = changes[okeys.pop(0)]
except (KeyError, TypeError):
raise AssertionError(
'Could not find state{0} in the state state '
'return: {1}'.format(
''.join(['[{0!r}]'.format(k) for k in keys]), ret
)
)
yield changes
def assertSaltStateChangesEqual(self, ret, comparison, keys=()):
for change in self.__assertSaltStateChanges(ret, keys):
self.assertEqual(change, comparison)
def assertSaltStateChangesNotEqual(self, ret, comparison, keys=()):
for change in self.__assertSaltStateChanges(ret, keys):
self.assertNotEqual(change, comparison)