Use decorators for temp files/dirs in test suite

This adds a new decorator which creates a temporary directory and cleans
it after the test completes. It also modifies an existing decorator for
creating temporary files so that it accepts arguments, which will be
passed through to salt.utils.files.mkstemp().
This commit is contained in:
Erik Johnson 2018-04-13 21:41:02 -05:00
parent b46365614b
commit 879c557264
No known key found for this signature in database
GPG Key ID: 5E5583C437808F3F
14 changed files with 1777 additions and 2202 deletions

View File

@ -1,3 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file:
- touch

View File

@ -1,4 +0,0 @@
issue-2227:
file.append:
- name: {{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}
- text: HISTTIMEFORMAT='%F %T '

View File

@ -1,4 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/firstif

View File

@ -1,3 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/secondif

View File

@ -44,7 +44,7 @@ class CPModuleTest(ModuleCase):
super(CPModuleTest, self).run_function(*args, **kwargs)
)
@with_tempfile
@with_tempfile()
def test_get_file(self, tgt):
'''
cp.get_file
@ -76,7 +76,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_file_templated_paths(self, tgt):
'''
cp.get_file
@ -94,7 +94,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('Gromit', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_file_gzipped(self, tgt):
'''
cp.get_file
@ -137,7 +137,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_template(self, tgt):
'''
cp.get_template
@ -186,7 +186,7 @@ class CPModuleTest(ModuleCase):
# cp.get_url tests
@with_tempfile
@with_tempfile()
def test_get_url(self, tgt):
'''
cp.get_url with salt:// source given
@ -277,7 +277,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_url_https(self, tgt):
'''
cp.get_url with https:// source given
@ -619,7 +619,7 @@ class CPModuleTest(ModuleCase):
self.assertEqual(
sha256_hash['hsum'], hashlib.sha256(data).hexdigest())
@with_tempfile
@with_tempfile()
def test_get_file_from_env_predefined(self, tgt):
'''
cp.get_file
@ -634,7 +634,7 @@ class CPModuleTest(ModuleCase):
finally:
os.unlink(tgt)
@with_tempfile
@with_tempfile()
def test_get_file_from_env_in_url(self, tgt):
tgt = os.path.join(paths.TMP, 'cheese')
try:

View File

@ -11,6 +11,7 @@ import time
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.helpers import with_tempdir
from tests.support.unit import skipIf
from tests.support.paths import TMP, TMP_PILLAR_TREE
from tests.support.mixins import SaltReturnAssertsMixin
@ -187,21 +188,24 @@ class StateModuleTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_function('state.run_request')
self.assertEqual(ret, {})
def test_issue_1896_file_append_source(self):
@with_tempdir()
def test_issue_1896_file_append_source(self, base_dir):
'''
Verify that we can append a file's contents
'''
testfile = os.path.join(TMP, 'test.append')
if os.path.isfile(testfile):
os.unlink(testfile)
testfile = os.path.join(base_dir, 'test.append')
ret = self.run_function('state.sls', mods='testappend')
ret = self.run_state('file.touch', name=testfile)
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-1')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/firstif')
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-2')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/secondif')
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(testfile, 'r') as fp_:
@ -224,14 +228,17 @@ class StateModuleTest(ModuleCase, SaltReturnAssertsMixin):
contents = os.linesep.join(new_contents)
contents += os.linesep
self.assertMultiLineEqual(
contents, testfile_contents)
self.assertMultiLineEqual(contents, testfile_contents)
# Re-append switching order
ret = self.run_function('state.sls', mods='testappend.step-2')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/secondif')
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-1')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/firstif')
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(testfile, 'r') as fp_:

File diff suppressed because it is too large Load Diff

View File

@ -5,17 +5,15 @@ Tests for the Git state
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import errno
import functools
import inspect
import os
import shutil
import socket
import string
import tempfile
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.helpers import with_tempdir
from tests.support.paths import TMP
from tests.support.mixins import SaltReturnAssertsMixin
@ -98,173 +96,150 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
# Reset the dns timeout after the test is over
socket.setdefaulttimeout(None)
def test_latest(self):
@with_tempdir(create=False)
def test_latest(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_with_rev_and_submodules(self):
@with_tempdir(create=False)
def test_latest_with_rev_and_submodules(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_failure(self):
@with_tempdir(create=False)
def test_latest_failure(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://youSpelledGitHubWrong.com/saltstack/salt-test-repo.git',
rev='develop',
target=name,
submodules=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://youSpelledGitHubWrong.com/saltstack/salt-test-repo.git',
rev='develop',
target=target,
submodules=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isdir(os.path.join(target, '.git')))
def test_latest_empty_dir(self):
@with_tempdir()
def test_latest_empty_dir(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_unless_no_cwd_issue_6800(self):
@with_tempdir(create=False)
def test_latest_unless_no_cwd_issue_6800(self, target):
'''
cwd=target was being passed to _run_check which blew up if
target dir did not already exist.
'''
name = os.path.join(TMP, 'salt_repo')
if os.path.isdir(name):
shutil.rmtree(name)
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
unless='test -e {0}'.format(name),
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
unless='test -e {0}'.format(target),
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_numeric_rev(self):
@with_tempdir(create=False)
def test_numeric_rev(self, target):
'''
git.latest with numeric revision
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=0.11,
target=name,
submodules=True,
timeout=120
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=0.11,
target=target,
submodules=True,
timeout=120
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_with_local_changes(self):
@with_tempdir(create=False)
def test_latest_with_local_changes(self, target):
'''
Ensure that we fail the state when there are local changes and succeed
when force_reset is True.
'''
name = os.path.join(TMP, 'salt_repo')
try:
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
# Make change to LICENSE file.
with salt.utils.files.fopen(os.path.join(name, 'LICENSE'), 'a') as fp_:
fp_.write('Lorem ipsum dolor blah blah blah....\n')
# Make change to LICENSE file.
with salt.utils.files.fopen(os.path.join(target, 'LICENSE'), 'a') as fp_:
fp_.write('Lorem ipsum dolor blah blah blah....\n')
# Make sure that we now have uncommitted changes
self.assertTrue(self.run_function('git.diff', [name, 'HEAD']))
# Make sure that we now have uncommitted changes
self.assertTrue(self.run_function('git.diff', [target, 'HEAD']))
# Re-run state with force_reset=False
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name,
force_reset=False
)
self.assertSaltTrueReturn(ret)
self.assertEqual(
ret[next(iter(ret))]['comment'],
('Repository {0} is up-to-date, but with local changes. Set '
'\'force_reset\' to True to purge local changes.'.format(name))
)
# Re-run state with force_reset=False
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target,
force_reset=False
)
self.assertSaltTrueReturn(ret)
self.assertEqual(
ret[next(iter(ret))]['comment'],
('Repository {0} is up-to-date, but with local changes. Set '
'\'force_reset\' to True to purge local changes.'.format(target))
)
# Now run the state with force_reset=True
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name,
force_reset=True
)
self.assertSaltTrueReturn(ret)
# Now run the state with force_reset=True
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target,
force_reset=True
)
self.assertSaltTrueReturn(ret)
# Make sure that we no longer have uncommitted changes
self.assertFalse(self.run_function('git.diff', [name, 'HEAD']))
finally:
shutil.rmtree(name, ignore_errors=True)
# Make sure that we no longer have uncommitted changes
self.assertFalse(self.run_function('git.diff', [target, 'HEAD']))
@uses_git_opts
def test_latest_fast_forward(self):
@with_tempdir(create=False)
@with_tempdir(create=False)
@with_tempdir(create=False)
def test_latest_fast_forward(self, mirror_dir, admin_dir, clone_dir):
'''
Test running git.latest state a second time after changes have been
made to the remote repo.
@ -273,98 +248,87 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
return self.run_function('git.rev_parse', [cwd, 'HEAD'])
repo_url = 'https://{0}/saltstack/salt-test-repo.git'.format(self.__domain)
mirror_dir = os.path.join(TMP, 'salt_repo_mirror')
mirror_url = 'file://' + mirror_dir
admin_dir = os.path.join(TMP, 'salt_repo_admin')
clone_dir = os.path.join(TMP, 'salt_repo')
try:
# Mirror the repo
self.run_function(
'git.clone', [mirror_dir], url=repo_url, opts='--mirror')
# Mirror the repo
self.run_function(
'git.clone', [mirror_dir], url=repo_url, opts='--mirror')
# Make sure the directory for the mirror now exists
self.assertTrue(os.path.exists(mirror_dir))
# Make sure the directory for the mirror now exists
self.assertTrue(os.path.exists(mirror_dir))
# Clone the mirror twice, once to the admin location and once to
# the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=admin_dir)
self.assertSaltTrueReturn(ret)
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Clone the mirror twice, once to the admin location and once to
# the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=admin_dir)
self.assertSaltTrueReturn(ret)
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Make a change to the repo by editing the file in the admin copy
# of the repo and committing.
head_pre = _head(admin_dir)
with salt.utils.files.fopen(os.path.join(admin_dir, 'LICENSE'), 'a') as fp_:
fp_.write('Hello world!')
self.run_function(
'git.commit', [admin_dir, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Make sure HEAD is pointing to a new SHA so we know we properly
# committed our change.
head_post = _head(admin_dir)
self.assertNotEqual(head_pre, head_post)
# Make a change to the repo by editing the file in the admin copy
# of the repo and committing.
head_pre = _head(admin_dir)
with salt.utils.files.fopen(os.path.join(admin_dir, 'LICENSE'), 'a') as fp_:
fp_.write('Hello world!')
self.run_function(
'git.commit', [admin_dir, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Make sure HEAD is pointing to a new SHA so we know we properly
# committed our change.
head_post = _head(admin_dir)
self.assertNotEqual(head_pre, head_post)
# Push the change to the mirror
# NOTE: the test will fail if the salt-test-repo's default branch
# is changed.
self.run_function('git.push', [admin_dir, 'origin', 'develop'])
# Push the change to the mirror
# NOTE: the test will fail if the salt-test-repo's default branch
# is changed.
self.run_function('git.push', [admin_dir, 'origin', 'develop'])
# Re-run the git.latest state on the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Re-run the git.latest state on the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Make sure that the clone_dir now has the correct SHA
self.assertEqual(head_post, _head(clone_dir))
# Make sure that the clone_dir now has the correct SHA
self.assertEqual(head_post, _head(clone_dir))
finally:
for path in (mirror_dir, admin_dir, clone_dir):
shutil.rmtree(path, ignore_errors=True)
def _changed_local_branch_helper(self, rev, hint):
@with_tempdir(create=False)
def _changed_local_branch_helper(self, target, rev, hint):
'''
We're testing two almost identical cases, the only thing that differs
is the rev used for the git.latest state.
'''
name = os.path.join(TMP, 'salt_repo')
try:
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=name
)
self.assertSaltTrueReturn(ret)
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=target
)
self.assertSaltTrueReturn(ret)
# Check out a new branch in the clone and make a commit, to ensure
# that when we re-run the state, it is not a fast-forward change
self.run_function('git.checkout', [name, 'new_branch'], opts='-b')
with salt.utils.files.fopen(os.path.join(name, 'foo'), 'w'):
pass
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'add file'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Check out a new branch in the clone and make a commit, to ensure
# that when we re-run the state, it is not a fast-forward change
self.run_function('git.checkout', [target, 'new_branch'], opts='-b')
with salt.utils.files.fopen(os.path.join(target, 'foo'), 'w'):
pass
self.run_function('git.add', [target, '.'])
self.run_function(
'git.commit', [target, 'add file'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Re-run the state, this should fail with a specific hint in the
# comment field.
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=name
)
self.assertSaltFalseReturn(ret)
# Re-run the state, this should fail with a specific hint in the
# comment field.
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=target
)
self.assertSaltFalseReturn(ret)
comment = ret[next(iter(ret))]['comment']
self.assertTrue(hint in comment)
finally:
shutil.rmtree(name, ignore_errors=True)
comment = ret[next(iter(ret))]['comment']
self.assertTrue(hint in comment)
@uses_git_opts
def test_latest_changed_local_branch_rev_head(self):
@ -375,7 +339,7 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
This test will fail if the default branch for the salt-test-repo is
ever changed.
'''
self._changed_local_branch_helper(
self._changed_local_branch_helper( # pylint: disable=no-value-for-parameter
'HEAD',
'The default remote branch (develop) differs from the local '
'branch (new_branch)'
@ -387,162 +351,136 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
Test for presence of hint in failure message when the local branch has
been changed and a non-HEAD rev is specified
'''
self._changed_local_branch_helper(
self._changed_local_branch_helper( # pylint: disable=no-value-for-parameter
'develop',
'The desired rev (develop) differs from the name of the local '
'branch (new_branch)'
)
@uses_git_opts
def test_latest_updated_remote_rev(self):
@with_tempdir(create=False)
@with_tempdir()
def test_latest_updated_remote_rev(self, name, target):
'''
Ensure that we don't exit early when checking for a fast-forward
'''
name = tempfile.mkdtemp(dir=TMP)
target = os.path.join(TMP, 'test_latest_updated_remote_rev')
# Initialize a new git repository
self.run_function('git.init', [name])
try:
# Add and commit a file
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Hello world\n')
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'initial commit'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Add and commit a file
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Hello world\n')
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'initial commit'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Run the state to clone the repo we just created
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
# Run the state to clone the repo we just created
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
# Add another commit
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Added a line\n')
self.run_function(
'git.commit', [name, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Add another commit
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Added a line\n')
self.run_function(
'git.commit', [name, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Run the state again. It should pass, if it doesn't then there was
# a problem checking whether or not the change is a fast-forward.
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
finally:
for path in (name, target):
try:
shutil.rmtree(path)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise exc
# Run the state again. It should pass, if it doesn't then there was
# a problem checking whether or not the change is a fast-forward.
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
def test_latest_depth(self):
@with_tempdir(create=False)
def test_latest_depth(self, target):
'''
Test running git.latest state using the "depth" argument to limit the
history. See #45394.
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='HEAD',
target=name,
depth=1
)
# HEAD is not a branch, this should fail
self.assertSaltFalseReturn(ret)
self.assertIn(
'must be set to the name of a branch',
ret[next(iter(ret))]['comment']
)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='HEAD',
target=target,
depth=1
)
# HEAD is not a branch, this should fail
self.assertSaltFalseReturn(ret)
self.assertIn(
'must be set to the name of a branch',
ret[next(iter(ret))]['comment']
)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='non-default-branch',
target=name,
depth=1
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='non-default-branch',
target=target,
depth=1
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_present(self):
@with_tempdir(create=False)
def test_present(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
def test_present_failure(self):
@with_tempdir()
def test_present_failure(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
fname = os.path.join(name, 'stoptheprocess')
fname = os.path.join(name, 'stoptheprocess')
with salt.utils.files.fopen(fname, 'a'):
pass
with salt.utils.files.fopen(fname, 'a'):
pass
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isfile(os.path.join(name, 'HEAD')))
def test_present_empty_dir(self):
@with_tempdir()
def test_present_empty_dir(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
def test_config_set_value_with_space_character(self):
@with_tempdir()
def test_config_set_value_with_space_character(self, name):
'''
git.config
'''
name = tempfile.mkdtemp(dir=TMP)
self.addCleanup(shutil.rmtree, name, ignore_errors=True)
self.run_function('git.init', [name])
ret = self.run_state(
@ -560,17 +498,14 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
'''
Tests which do no require connectivity to github.com
'''
def test_renamed_default_branch(self):
@with_tempdir()
@with_tempdir()
@with_tempdir()
def test_renamed_default_branch(self, repo, admin, target):
'''
Test the case where the remote branch has been removed
https://github.com/saltstack/salt/issues/36242
'''
repo = tempfile.mkdtemp(dir=TMP)
admin = tempfile.mkdtemp(dir=TMP)
name = tempfile.mkdtemp(dir=TMP)
for dirname in (repo, admin, name):
self.addCleanup(shutil.rmtree, dirname, ignore_errors=True)
# Create bare repo
self.run_function('git.init', [repo], bare=True)
# Clone bare repo
@ -596,7 +531,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
)
self.assertSaltFalseReturn(ret)
@ -610,11 +545,11 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
'(which will ensure that the named branch is created '
'if it does not already exist).\n\n'
'Changes already made: {0} cloned to {1}'
.format(repo, name)
.format(repo, target)
)
self.assertEqual(
ret[next(iter(ret))]['changes'],
{'new': '{0} => {1}'.format(repo, name)}
{'new': '{0} => {1}'.format(repo, target)}
)
# Run git.latest state again. This should fail again, with a different
@ -622,7 +557,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
)
self.assertSaltFalseReturn(ret)
@ -644,7 +579,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
branch='develop',
)

View File

@ -12,7 +12,7 @@
# pylint: disable=repr-flag-used-in-string,wrong-import-order
# Import Python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import base64
import errno
import functools
@ -20,6 +20,7 @@ import inspect
import logging
import os
import random
import shutil
import signal
import socket
import string
@ -53,6 +54,9 @@ from tests.support.unit import skip, _id
from tests.support.mock import patch
from tests.support.paths import FILES, TMP
# Import Salt libs
import salt.utils.files
log = logging.getLogger(__name__)
@ -955,22 +959,61 @@ def with_system_user_and_group(username, group,
return decorator
def with_tempfile(func):
'''
Generates a tempfile and cleans it up when test completes.
'''
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
fd_, name = tempfile.mkstemp(prefix='__salt.test.', dir=TMP)
os.close(fd_)
del fd_
ret = func(self, name, *args, **kwargs)
try:
class WithTempfile(object):
def __init__(self, **kwargs):
self.create = kwargs.pop('create', True)
if 'dir' not in kwargs:
kwargs['dir'] = TMP
if 'prefix' not in kwargs:
kwargs['prefix'] = '__salt.test.'
self.kwargs = kwargs
def __call__(self, func):
self.func = func
return functools.wraps(func)(
lambda testcase, *args, **kwargs: self.wrap(testcase, *args, **kwargs) # pylint: disable=W0108
)
def wrap(self, testcase, *args, **kwargs):
name = salt.utils.files.mkstemp(**self.kwargs)
if not self.create:
os.remove(name)
except Exception:
pass
return ret
return wrapper
try:
return self.func(testcase, name, *args, **kwargs)
finally:
try:
os.remove(name)
except OSError:
pass
with_tempfile = WithTempfile
class WithTempdir(object):
def __init__(self, **kwargs):
self.create = kwargs.pop('create', True)
if 'dir' not in kwargs:
kwargs['dir'] = TMP
self.kwargs = kwargs
def __call__(self, func):
self.func = func
return functools.wraps(func)(
lambda testcase, *args, **kwargs: self.wrap(testcase, *args, **kwargs) # pylint: disable=W0108
)
def wrap(self, testcase, *args, **kwargs):
tempdir = tempfile.mkdtemp(**self.kwargs)
if not self.create:
os.rmdir(tempdir)
try:
return self.func(testcase, tempdir, *args, **kwargs)
finally:
shutil.rmtree(tempdir, ignore_errors=True)
with_tempdir = WithTempdir
def requires_system_grains(func):

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,6 @@ from __future__ import absolute_import, unicode_literals, print_function
# Import Python libs
import shutil
import tempfile
import os
import logging
try:
# We're not going to actually use OpenSSL, we just want to check that
@ -20,6 +17,7 @@ except Exception:
NO_PYOPENSSL = True
# Import Salt Testing Libs
from tests.support.helpers import with_tempdir
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
@ -309,367 +307,327 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
remove_not_in_result(ret, result)
self.assertEqual(result, ret)
def test_create_ca(self):
@with_tempdir()
def test_create_ca(self, ca_path):
'''
Test creating CA cert
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_ca(self):
@with_tempdir()
def test_recreate_ca(self, ca_path):
'''
Test creating CA cert when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)), \
patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)), \
patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_csr(self):
@with_tempdir()
def test_create_csr(self, ca_path):
'''
Test creating certificate signing request
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret, 'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret, 'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_csr(self):
@with_tempdir()
def test_recreate_csr(self, ca_path):
'''
Test creating certificate signing request when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_self_signed_cert(self):
@with_tempdir()
def test_create_self_signed_cert(self, ca_path):
'''
Test creating self signed certificate
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_self_signed_cert(self):
@with_tempdir()
def test_recreate_self_signed_cert(self, ca_path):
'''
Test creating self signed certificate when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_ca_signed_cert(self):
@with_tempdir()
def test_create_ca_signed_cert(self, ca_path):
'''
Test signing certificate from request
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN']),
ret)
def test_recreate_ca_signed_cert(self):
@with_tempdir()
def test_recreate_ca_signed_cert(self, ca_path):
'''
Test signing certificate from request when certificate exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
replace=True),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
replace=True),
ret)
def test_create_pkcs12(self):
@with_tempdir()
def test_create_pkcs12(self, ca_path):
'''
Test creating pkcs12
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password'),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password'),
ret)
def test_recreate_pkcs12(self):
@with_tempdir()
def test_recreate_pkcs12(self, ca_path):
'''
Test creating pkcs12 when it already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password')
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password')
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA[
'create_ca']['CN'],
'password',
replace=True),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
_TLS_TEST_DATA[
'create_ca']['CN'],
'password',
replace=True),
ret)
def test_pyOpenSSL_version(self):
'''
@ -701,13 +659,13 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(tls.get_extensions('server'), pillarval)
self.assertEqual(tls.get_extensions('client'), pillarval)
def test_pyOpenSSL_version_destructive(self):
@with_tempdir()
def test_pyOpenSSL_version_destructive(self, ca_path):
'''
Test extension logic with different pyOpenSSL versions
'''
pillarval = {'csr': {'extendedKeyUsage': 'serverAuth'}}
mock_pgt = MagicMock(return_value=pillarval)
ca_path = tempfile.mkdtemp(dir=TMP)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
@ -721,105 +679,97 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
try:
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret}):
with patch.dict(tls.__opts__, {
'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'],
{'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret}):
with patch.dict(tls.__opts__, {
'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'],
{'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
try:
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}):
with patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'], {
'subjectAltName': 'DNS:foo.bar',
'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertRaises(ValueError,
tls.create_csr,
ca_name,
**_TLS_TEST_DATA['create_ca'])
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}):
with patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'], {
'subjectAltName': 'DNS:foo.bar',
'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertRaises(ValueError,
tls.create_csr,
ca_name,
**_TLS_TEST_DATA['create_ca'])
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)

View File

@ -10,6 +10,7 @@ import textwrap
import copy
# Import Salt Testing libs
from tests.support.helpers import with_tempdir
from tests.support.unit import TestCase
from tests.support.paths import TMP
@ -34,6 +35,7 @@ class CommonTestCaseBoilerplate(TestCase):
def setUp(self):
self.root_dir = tempfile.mkdtemp(dir=TMP)
self.addCleanup(shutil.rmtree, self.root_dir, ignore_errors=True)
self.state_tree_dir = os.path.join(self.root_dir, 'state_tree')
self.cache_dir = os.path.join(self.root_dir, 'cachedir')
if not os.path.isdir(self.root_dir):
@ -291,167 +293,130 @@ class PyDSLRendererTestCase(CommonTestCaseBoilerplate):
self.assertEqual(result['C']['cmd'][1]['require'][0]['cmd'], 'A')
self.assertEqual(result['B']['file'][1]['require'][0]['cmd'], 'C')
def test_pipe_through_stateconf(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_pipe_through_stateconf(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'xxx.sls'), textwrap.dedent(
'''#!stateconf -os yaml . jinja
.X:
cmd.run:
- name: echo X >> {0}
- cwd: /
.Y:
cmd.run:
- name: echo Y >> {0}
- cwd: /
.Z:
cmd.run:
- name: echo Z >> {0}
- cwd: /
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'yyy.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
write_to(os.path.join(dirpath, 'xxx.sls'), textwrap.dedent(
'''#!stateconf -os yaml . jinja
.X:
cmd.run:
- name: echo X >> {0}
- cwd: /
.Y:
cmd.run:
- name: echo Y >> {0}
- cwd: /
.Z:
cmd.run:
- name: echo Z >> {0}
- cwd: /
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'yyy.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
__pydsl__.set(ordered=True)
state('.D').cmd.run('echo D >> {0}', cwd='/')
state('.E').cmd.run('echo E >> {0}', cwd='/')
state('.F').cmd.run('echo F >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
__pydsl__.set(ordered=True)
state('.D').cmd.run('echo D >> {0}', cwd='/')
state('.E').cmd.run('echo E >> {0}', cwd='/')
state('.F').cmd.run('echo F >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
include('xxx', 'yyy')
include('xxx', 'yyy')
# make all states in xxx run BEFORE states in this sls.
extend(state('.start').stateconf.require(stateconf='xxx::goal'))
# make all states in xxx run BEFORE states in this sls.
extend(state('.start').stateconf.require(stateconf='xxx::goal'))
# make all states in yyy run AFTER this sls.
extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
# make all states in yyy run AFTER this sls.
extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
__pydsl__.set(ordered=True)
__pydsl__.set(ordered=True)
state('.A').cmd.run('echo A >> {0}', cwd='/')
state('.B').cmd.run('echo B >> {0}', cwd='/')
state('.C').cmd.run('echo C >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
state('.A').cmd.run('echo A >> {0}', cwd='/')
state('.B').cmd.run('echo B >> {0}', cwd='/')
state('.C').cmd.run('echo C >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(output, 'r') as f:
self.assertEqual(''.join(f.read().split()), "XYZABCDEF")
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(output, 'r') as f:
self.assertEqual(''.join(f.read().split()), "XYZABCDEF")
finally:
shutil.rmtree(dirpath, ignore_errors=True)
def test_compile_time_state_execution(self):
@with_tempdir()
def test_compile_time_state_execution(self, dirpath):
if not sys.stdin.isatty():
self.skipTest('Not attached to a TTY')
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
try:
# The Windows shell will include any spaces before the redirect
# in the text that is redirected.
# For example: echo hello > test.txt will contain "hello "
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
# The Windows shell will include any spaces before the redirect
# in the text that is redirected.
# For example: echo hello > test.txt will contain "hello "
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__pydsl__.set(ordered=True)
A = state('A')
A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
A()
A()
__pydsl__.set(ordered=True)
A = state('A')
A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
A()
A()
state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
A()
'''.format(dirpath.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(os.path.join(dirpath, 'yyy.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep + 'hoho' + os.linesep)
with salt.utils.files.fopen(os.path.join(dirpath, 'xxx.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
A()
'''.format(dirpath.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(os.path.join(dirpath, 'yyy.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep + 'hoho' + os.linesep)
with salt.utils.files.fopen(os.path.join(dirpath, 'xxx.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep)
def test_nested_high_state_execution(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_nested_high_state_execution(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__salt__['state.sls']('bbb')
state().cmd.run('echo bbbbbb', cwd='/')
'''))
write_to(os.path.join(dirpath, 'bbb.sls'), textwrap.dedent(
'''
# {{ salt['state.sls']('ccc') }}
test:
cmd.run:
- name: echo bbbbbbb
- cwd: /
'''))
write_to(os.path.join(dirpath, 'ccc.sls'), textwrap.dedent(
'''
#!pydsl
state().cmd.run('echo ccccc', cwd='/')
'''))
self.state_highstate({'base': ['aaa']}, dirpath)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__salt__['state.sls']('bbb')
state().cmd.run('echo bbbbbb', cwd='/')
'''))
write_to(os.path.join(dirpath, 'bbb.sls'), textwrap.dedent(
'''
# {{ salt['state.sls']('ccc') }}
test:
cmd.run:
- name: echo bbbbbbb
- cwd: /
'''))
write_to(os.path.join(dirpath, 'ccc.sls'), textwrap.dedent(
'''
#!pydsl
state().cmd.run('echo ccccc', cwd='/')
'''))
self.state_highstate({'base': ['aaa']}, dirpath)
def test_repeat_includes(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_repeat_includes(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'b.sls'), textwrap.dedent('''\
#!pydsl
include('c')
include('d')
'''))
write_to(os.path.join(dirpath, 'c.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'd.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'e.sls'), textwrap.dedent('''\
#!pydsl
success = True
'''))
self.state_highstate({'base': ['b']}, dirpath)
self.state_highstate({'base': ['c', 'd']}, dirpath)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
write_to(os.path.join(dirpath, 'b.sls'), textwrap.dedent('''\
#!pydsl
include('c')
include('d')
'''))
write_to(os.path.join(dirpath, 'c.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'd.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'e.sls'), textwrap.dedent('''\
#!pydsl
success = True
'''))
self.state_highstate({'base': ['b']}, dirpath)
self.state_highstate({'base': ['c', 'd']}, dirpath)
def write_to(fpath, content):

View File

@ -6,14 +6,13 @@ Unit Tests for functions located in salt.utils.files.py.
# Import python libs
from __future__ import absolute_import, unicode_literals, print_function
import os
import shutil
import tempfile
# Import Salt libs
import salt.utils.files
from salt.ext import six
# Import Salt Testing libs
from tests.support.helpers import with_tempdir
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
@ -43,33 +42,30 @@ class FilesUtilTestCase(TestCase):
error = True
self.assertFalse(error, 'salt.utils.files.safe_rm raised exception when it should not have')
def test_safe_walk_symlink_recursion(self):
tmp = tempfile.mkdtemp(dir=TMP)
try:
if os.stat(tmp).st_ino == 0:
self.skipTest('inodes not supported in {0}'.format(tmp))
os.mkdir(os.path.join(tmp, 'fax'))
os.makedirs(os.path.join(tmp, 'foo/bar'))
os.symlink('../..', os.path.join(tmp, 'foo/bar/baz'))
os.symlink('foo', os.path.join(tmp, 'root'))
expected = [
(os.path.join(tmp, 'root'), ['bar'], []),
(os.path.join(tmp, 'root/bar'), ['baz'], []),
(os.path.join(tmp, 'root/bar/baz'), ['fax', 'foo', 'root'], []),
(os.path.join(tmp, 'root/bar/baz/fax'), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
paths.append((root, sorted(dirs), names))
if paths != expected:
raise AssertionError(
'\n'.join(
['got:'] + [repr(p) for p in paths] +
['', 'expected:'] + [repr(p) for p in expected]
)
@with_tempdir()
def test_safe_walk_symlink_recursion(self, tmp):
if os.stat(tmp).st_ino == 0:
self.skipTest('inodes not supported in {0}'.format(tmp))
os.mkdir(os.path.join(tmp, 'fax'))
os.makedirs(os.path.join(tmp, 'foo/bar'))
os.symlink('../..', os.path.join(tmp, 'foo/bar/baz'))
os.symlink('foo', os.path.join(tmp, 'root'))
expected = [
(os.path.join(tmp, 'root'), ['bar'], []),
(os.path.join(tmp, 'root/bar'), ['baz'], []),
(os.path.join(tmp, 'root/bar/baz'), ['fax', 'foo', 'root'], []),
(os.path.join(tmp, 'root/bar/baz/fax'), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
paths.append((root, sorted(dirs), names))
if paths != expected:
raise AssertionError(
'\n'.join(
['got:'] + [repr(p) for p in paths] +
['', 'expected:'] + [repr(p) for p in expected]
)
finally:
shutil.rmtree(tmp)
)
@skipIf(not six.PY3, 'This test only applies to Python 3')
def test_fopen_with_disallowed_fds(self):

View File

@ -132,7 +132,7 @@ class JSONTestCase(TestCase):
# Loading it should be equal to the original data
self.assertEqual(salt.utils.json.loads(ret), self.data)
@with_tempfile
@with_tempfile()
def test_dump_load(self, json_out):
'''
Test dumping to and loading from a file handle