mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
3184168365
This PR is part of what will be an ongoing effort to use explicit unicode strings in Salt. Because Python 3 does not suport Python 2's raw unicode string syntax (i.e. `ur'\d+'`), we must use `salt.utils.locales.sdecode()` to ensure that the raw string is unicode. However, because of how `salt/utils/__init__.py` has evolved into the hulking monstrosity it is today, this means importing a large module in places where it is not needed, which could negatively impact performance. For this reason, this PR also breaks out some of the functions from `salt/utils/__init__.py` into new/existing modules under `salt/utils/`. The long term goal will be that the modules within this directory do not depend on importing `salt.utils`. A summary of the changes in this PR is as follows: * Moves the following functions from `salt.utils` to new locations (including a deprecation warning if invoked from `salt.utils`): `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`, `dequote`, `is_hex`, `is_bin_str`, `rand_string`, `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`, `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`, `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`, `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`, `is_openbsd`, `is_aix` * Moves the functions already deprecated by @rallytime to the bottom of `salt/utils/__init__.py` for better organization, so we can keep the deprecated ones separate from the ones yet to be deprecated as we continue to break up `salt.utils` * Updates `salt/*.py` and all files under `salt/client/` to use explicit unicode string literals. * Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils import foo` becomes `import salt.utils.foo as foo`). * Renames the `test.rand_str` function to `test.random_hash` to more accurately reflect what it does * Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`) such that it returns a string matching the passed size. Previously this function would get `size` bytes from `os.urandom()`, base64-encode it, and return the result, which would in most cases not be equal to the passed size.
139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
'''
|
||
:codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)`
|
||
|
||
|
||
tests.unit.utils.cloud_test
|
||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||
|
||
Test the salt-cloud utilities module
|
||
'''
|
||
|
||
# Import Python libs
|
||
from __future__ import absolute_import
|
||
import os
|
||
import tempfile
|
||
|
||
# Import Salt Testing libs
|
||
from tests.support.unit import TestCase, skipIf
|
||
from tests.support.paths import TMP, CODE_DIR
|
||
|
||
# Import salt libs
|
||
import salt.utils.cloud as cloud
|
||
|
||
GPG_KEYDIR = os.path.join(TMP, 'gpg-keydir')
|
||
|
||
# The keyring library uses `getcwd()`, let's make sure we in a good directory
|
||
# before importing keyring
|
||
if not os.path.isdir(GPG_KEYDIR):
|
||
os.makedirs(GPG_KEYDIR)
|
||
|
||
os.chdir(GPG_KEYDIR)
|
||
|
||
# Import external deps
|
||
try:
|
||
import keyring
|
||
import keyring.backend
|
||
|
||
class TestKeyring(keyring.backend.KeyringBackend):
|
||
'''
|
||
A test keyring which always outputs same password
|
||
'''
|
||
def __init__(self):
|
||
self.__storage = {}
|
||
|
||
def supported(self):
|
||
return 0
|
||
|
||
def set_password(self, servicename, username, password):
|
||
self.__storage.setdefault(servicename, {}).update({username: password})
|
||
return 0
|
||
|
||
def get_password(self, servicename, username):
|
||
return self.__storage.setdefault(servicename, {}).get(username, None)
|
||
|
||
def delete_password(self, servicename, username):
|
||
self.__storage.setdefault(servicename, {}).pop(username, None)
|
||
return 0
|
||
|
||
# set the keyring for keyring lib
|
||
keyring.set_keyring(TestKeyring())
|
||
HAS_KEYRING = True
|
||
except ImportError:
|
||
HAS_KEYRING = False
|
||
|
||
os.chdir(CODE_DIR)
|
||
|
||
|
||
class CloudUtilsTestCase(TestCase):
|
||
|
||
def test_ssh_password_regex(self):
|
||
'''Test matching ssh password patterns'''
|
||
for pattern in ('Password for root@127.0.0.1:',
|
||
'root@127.0.0.1 Password:',
|
||
' Password:'):
|
||
self.assertNotEqual(
|
||
cloud.SSH_PASSWORD_PROMP_RE.match(pattern), None
|
||
)
|
||
self.assertNotEqual(
|
||
cloud.SSH_PASSWORD_PROMP_RE.match(pattern.lower()), None
|
||
)
|
||
self.assertNotEqual(
|
||
cloud.SSH_PASSWORD_PROMP_RE.match(pattern.strip()), None
|
||
)
|
||
self.assertNotEqual(
|
||
cloud.SSH_PASSWORD_PROMP_RE.match(pattern.lower().strip()), None
|
||
)
|
||
|
||
@skipIf(HAS_KEYRING is False, 'The python keyring library is not installed')
|
||
def test__save_password_in_keyring(self):
|
||
'''
|
||
Test storing password in the keyring
|
||
'''
|
||
cloud._save_password_in_keyring(
|
||
'salt.cloud.provider.test_case_provider',
|
||
'fake_username',
|
||
'fake_password_c8231'
|
||
)
|
||
stored_pw = keyring.get_password(
|
||
'salt.cloud.provider.test_case_provider',
|
||
'fake_username',
|
||
)
|
||
keyring.delete_password(
|
||
'salt.cloud.provider.test_case_provider',
|
||
'fake_username',
|
||
)
|
||
self.assertEqual(stored_pw, 'fake_password_c8231')
|
||
|
||
@skipIf(HAS_KEYRING is False, 'The python keyring library is not installed')
|
||
def test_retrieve_password_from_keyring(self):
|
||
keyring.set_password(
|
||
'salt.cloud.provider.test_case_provider',
|
||
'fake_username',
|
||
'fake_password_c8231'
|
||
)
|
||
pw_in_keyring = cloud.retrieve_password_from_keyring(
|
||
'salt.cloud.provider.test_case_provider',
|
||
'fake_username')
|
||
self.assertEqual(pw_in_keyring, 'fake_password_c8231')
|
||
|
||
def test_sftp_file_with_content_under_python3(self):
|
||
with self.assertRaises(Exception) as context:
|
||
cloud.sftp_file("/tmp/test", "ТЕСТ test content")
|
||
# we successful pass the place with os.write(tmpfd, ...
|
||
self.assertNotEqual("a bytes-like object is required, not 'str'", str(context.exception))
|
||
|
||
def test_check_key_path_and_mode(self):
|
||
with tempfile.NamedTemporaryFile() as f:
|
||
key_file = f.name
|
||
|
||
os.chmod(key_file, 0o644)
|
||
self.assertFalse(cloud.check_key_path_and_mode('foo', key_file))
|
||
os.chmod(key_file, 0o600)
|
||
self.assertTrue(cloud.check_key_path_and_mode('foo', key_file))
|
||
os.chmod(key_file, 0o400)
|
||
self.assertTrue(cloud.check_key_path_and_mode('foo', key_file))
|
||
|
||
# tmp file removed
|
||
self.assertFalse(cloud.check_key_path_and_mode('foo', key_file))
|