Merge pull request #49169 from terminalmage/merge-fluorine

[fluorine] Merge 2018.3 branch into fluorine
This commit is contained in:
Daniel Wallace 2018-08-17 13:59:03 -05:00 committed by GitHub
commit 949f877b60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
153 changed files with 9537 additions and 9696 deletions

View File

@ -298,9 +298,15 @@ def auth(username, password):
log.error('LDAP authentication requires python-ldap module')
return False
# If bind credentials are configured, use them instead of user's
# If bind credentials are configured, verify that we receive a valid bind
if _config('binddn', mandatory=False) and _config('bindpw', mandatory=False):
bind = _bind_for_search(anonymous=_config('anonymous', mandatory=False))
# If username & password are not None, attempt to verify they are valid
if bind and username and password:
bind = _bind(username, password,
anonymous=_config('auth_by_group_membership_only', mandatory=False)
and _config('anonymous', mandatory=False))
else:
bind = _bind(username, password,
anonymous=_config('auth_by_group_membership_only', mandatory=False)

View File

@ -2290,7 +2290,7 @@ def replace(path,
if prepend_if_not_found or append_if_not_found:
# Search for content, to avoid pre/appending the
# content if it was pre/appended in a previous run.
if re.search(salt.utils.stringutils.to_bytes('^{0}$'.format(re.escape(content))),
if re.search(salt.utils.stringutils.to_bytes('^{0}($|(?=\r\n))'.format(re.escape(content))),
r_data,
flags=flags_num):
# Content was found, so set found.
@ -4072,7 +4072,10 @@ def get_managed(
# If we have a source defined, let's figure out what the hash is
if source:
urlparsed_source = _urlparse(source)
parsed_scheme = urlparsed_source.scheme
if urlparsed_source.scheme in salt.utils.files.VALID_PROTOS:
parsed_scheme = urlparsed_source.scheme
else:
parsed_scheme = ''
parsed_path = os.path.join(
urlparsed_source.netloc, urlparsed_source.path).rstrip(os.sep)
unix_local_source = parsed_scheme in ('file', '')

View File

@ -70,19 +70,19 @@ def __virtual__():
return __virtualname__
def _get_service(name):
def _name_in_services(name, services):
'''
Get information about a service. If the service is not found, raise an
error
Checks to see if the given service is in the given services.
:param str name: Service label, file name, or full path
:return: The service information for the service, otherwise an Error
:param dict services: The currently available services.
:return: The service information for the service, otherwise
an empty dictionary
:rtype: dict
'''
services = __utils__['mac_utils.available_services']()
name = name.lower()
if name in services:
# Match on label
return services[name]
@ -96,8 +96,50 @@ def _get_service(name):
# Match on basename
return service
# Could not find service
raise CommandExecutionError('Service not found: {0}'.format(name))
return dict()
def _get_service(name):
'''
Get information about a service. If the service is not found, raise an
error
:param str name: Service label, file name, or full path
:return: The service information for the service, otherwise an Error
:rtype: dict
'''
services = __utils__['mac_utils.available_services']()
name = name.lower()
service = _name_in_services(name, services)
# if we would the service we can return it
if service:
return service
# if we got here our service is not available, now we can check to see if
# we received a cached batch of services, if not we did a fresh check
# so we need to raise that the service could not be found.
try:
if not __context__['using_cached_services']:
raise CommandExecutionError('Service not found: {0}'.format(name))
except KeyError:
pass
# we used a cached version to check, a service could have been made
# between now and then, we should refresh our available services.
services = __utils__['mac_utils.available_services'](refresh=True)
# check to see if we found the service we are looking for.
service = _name_in_services(name, services)
if not service:
# Could not find the service after refresh raise.
raise CommandExecutionError('Service not found: {0}'.format(name))
# found it :)
return service
def _always_running_service(name):
@ -562,7 +604,7 @@ def disabled(name, runas=None, domain='system'):
salt '*' service.disabled org.cups.cupsd
'''
ret = False
disabled = launchctl('print-disabled',
domain,
return_stdout=True,

View File

@ -16,6 +16,18 @@ Keys
Keys are the folders in the registry. Keys can have many nested subkeys. Keys
can have a value assigned to them under the (Default)
When passing a key on the CLI it must be quoted correctly depending on the
backslashes being used (``\`` vs ``\\``). The following are valid methods of
passing the the key on the CLI:
Using single backslashes:
``"SOFTWARE\Python"``
``'SOFTWARE\Python'`` (will not work on a Windows Master)
Using double backslashes:
``SOFTWARE\\Python``
-----------------
Values or Entries
-----------------
@ -169,7 +181,7 @@ def list_keys(hive, key=None, use_32bit_registry=False):
def list_values(hive, key=None, use_32bit_registry=False, include_default=True):
'''
r'''
Enumerates the values in a registry key or hive.
Args:
@ -397,7 +409,7 @@ def set_value(hive,
def delete_key_recursive(hive, key, use_32bit_registry=False):
'''
r'''
.. versionadded:: 2015.5.4
Delete a registry key to include all subkeys and value/data pairs.
@ -439,7 +451,7 @@ def delete_key_recursive(hive, key, use_32bit_registry=False):
def delete_value(hive, key, vname=None, use_32bit_registry=False):
'''
r'''
Delete a registry value entry or the default value for a key.
Args:
@ -464,7 +476,7 @@ def delete_value(hive, key, vname=None, use_32bit_registry=False):
Deletes the 32bit portion of the registry on 64bit installations. On
32bit machines this is ignored.
Return:
Returns:
bool: True if successful, otherwise False
CLI Example:

View File

@ -419,17 +419,29 @@ def uptime(human_readable=False):
'''
.. versionadded:: 2015.8.0
Return the system uptime for this machine in seconds
Return the system uptime for the machine
human_readable : False
If ``True``, then return uptime in years, days, and seconds.
Args:
human_readable (bool):
Return uptime in human readable format if ``True``, otherwise
return seconds. Default is ``False``
.. note::
Human readable format is ``days, hours:min:sec``. Days will only
be displayed if more than 0
Returns:
str:
The uptime in seconds or human readable format depending on the
value of ``human_readable``
CLI Example:
.. code-block:: bash
salt '*' status.uptime
salt '*' status.uptime human_readable=True
salt '*' status.uptime
salt '*' status.uptime human_readable=True
'''
# Get startup time
startup_time = datetime.datetime.fromtimestamp(psutil.boot_time())

View File

@ -24,6 +24,18 @@ Keys
Hives contain keys. These are basically the folders beneath the hives. They can
contain any number of subkeys.
When passing the hive\key values they must be quoted correctly depending on the
backslashes being used (``\`` vs ``\\``). The way backslashes are handled in
the state file is different from the way they are handled when working on the
CLI. The following are valid methods of passing the hive\key:
Using single backslashes:
HKLM\SOFTWARE\Python
'HKLM\SOFTWARE\Python'
Using double backslashes:
"HKLM\\SOFTWARE\\Python"
Values or Entries
-----------------
@ -446,7 +458,7 @@ def present(name,
def absent(name, vname=None, use_32bit_registry=False):
'''
r'''
Ensure a registry value is removed. To remove a key use key_absent.
Args:

View File

@ -847,14 +847,21 @@ class SerializerExtension(Extension, object):
return explore(data)
def format_json(self, value, sort_keys=True, indent=None):
return Markup(salt.utils.json.dumps(value, sort_keys=sort_keys, indent=indent).strip())
json_txt = salt.utils.json.dumps(value, sort_keys=sort_keys, indent=indent).strip()
try:
return Markup(json_txt)
except UnicodeDecodeError:
return Markup(salt.utils.stringutils.to_unicode(json_txt))
def format_yaml(self, value, flow_style=True):
yaml_txt = salt.utils.yaml.safe_dump(
value, default_flow_style=flow_style).strip()
if yaml_txt.endswith('\n...'):
if yaml_txt.endswith(str('\n...')): # future lint: disable=blacklisted-function
yaml_txt = yaml_txt[:len(yaml_txt)-4]
return Markup(yaml_txt)
try:
return Markup(yaml_txt)
except UnicodeDecodeError:
return Markup(salt.utils.stringutils.to_unicode(yaml_txt))
def format_xml(self, value):
"""Render a formatted multi-line XML string from a complex Python

View File

@ -20,7 +20,6 @@ except ImportError:
# Import Salt Libs
import salt.modules.cmdmod
import salt.utils.args
import salt.utils.decorators as decorators
import salt.utils.files
import salt.utils.path
import salt.utils.platform
@ -304,14 +303,18 @@ def launchctl(sub_cmd, *args, **kwargs):
return ret['stdout'] if return_stdout else True
def _available_services():
def _available_services(refresh=False):
'''
This is a helper function needed for testing. We are using the memoziation
decorator on the `available_services` function, which causes the function
to run once and then return the results of the first run on subsequent
calls. This causes problems when trying to test the functionality of the
`available_services` function.
This is a helper function for getting the available macOS services.
'''
try:
if __context__['available_services'] and not refresh:
log.debug('Found context for available services.')
__context__['using_cached_services'] = True
return __context__['available_services']
except KeyError:
pass
launchd_paths = [
'/Library/LaunchAgents',
'/Library/LaunchDaemons',
@ -373,14 +376,22 @@ def _available_services():
'file_path': true_path,
'plist': plist}
return _available_services
# put this in __context__ as this is a time consuming function.
# a fix for this issue. https://github.com/saltstack/salt/issues/48414
__context__['available_services'] = _available_services
# this is a fresh gathering of services, set cached to false
__context__['using_cached_services'] = False
return __context__['available_services']
@decorators.memoize
def available_services():
def available_services(refresh=False):
'''
Return a dictionary of all available services on the system
:param bool refresh: If you wish to refresh the available services
as this data is cached on the first run.
Returns:
dict: All available services
@ -391,7 +402,8 @@ def available_services():
import salt.utils.mac_service
salt.utils.mac_service.available_services()
'''
return _available_services()
log.debug('Loading available services')
return _available_services(refresh)
def console_user(username=False):

View File

@ -6,26 +6,43 @@ Tests for the Openstack Cloud Provider
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.case import ModuleCase, ShellCase
from tests.support.paths import FILES
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
from tests.support.helpers import destructiveTest, expensiveTest, generate_random_name
from tests.support.mixins import SaltReturnAssertsMixin
# Import Salt Libs
from salt.config import cloud_providers_config
log = logging.getLogger(__name__)
NO_KEYSTONE = False
try:
import keystoneclient # pylint: disable=import-error,unused-import
from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection
from libcloud.common.openstack_identity import OpenStackIdentityTokenScope
HAS_KEYSTONE = True
except ImportError:
NO_KEYSTONE = True
HAS_KEYSTONE = True
# Import Third-Party Libs
try:
import shade # pylint: disable=unused-import
HAS_SHADE = True
except ImportError:
HAS_SHADE = False
# Create the cloud instance name to be used throughout the tests
INSTANCE_NAME = generate_random_name('CLOUD-TEST-')
PROVIDER_NAME = 'openstack'
DRIVER_NAME = 'openstack'
@skipIf(
NO_KEYSTONE,
not HAS_KEYSTONE,
'Please install keystoneclient and a keystone server before running'
'openstack integration tests.'
)
@ -156,3 +173,81 @@ class OpenstackTest(ModuleCase, SaltReturnAssertsMixin):
tenant_name='admin')
driver.authenticate()
self.assertTrue(driver.auth_token)
@skipIf(not HAS_SHADE, 'openstack driver requires `shade`')
class RackspaceTest(ShellCase):
'''
Integration tests for the Rackspace cloud provider using the Openstack driver
'''
@expensiveTest
def setUp(self):
'''
Sets up the test requirements
'''
super(RackspaceTest, self).setUp()
# check if appropriate cloud provider and profile files are present
profile_str = 'openstack-config'
providers = self.run_cloud('--list-providers')
if profile_str + ':' not in providers:
self.skipTest(
'Configuration file for {0} was not found. Check {0}.conf files '
'in tests/integration/files/conf/cloud.*.d/ to run these tests.'
.format(PROVIDER_NAME)
)
# check if personal access token, ssh_key_file, and ssh_key_names are present
config = cloud_providers_config(
os.path.join(
FILES,
'conf',
'cloud.providers.d',
PROVIDER_NAME + '.conf'
)
)
region_name = config[profile_str][DRIVER_NAME].get('region_name')
auth = config[profile_str][DRIVER_NAME].get('auth')
cloud = config[profile_str][DRIVER_NAME].get('cloud')
if not region_name or not (auth or cloud):
self.skipTest(
'A region_name and (auth or cloud) must be provided to run these '
'tests. Check tests/integration/files/conf/cloud.providers.d/{0}.conf'
.format(PROVIDER_NAME)
)
def test_instance(self):
'''
Test creating an instance on rackspace with the openstack driver
'''
# check if instance with salt installed returned
try:
self.assertIn(
INSTANCE_NAME,
[i.strip() for i in self.run_cloud('-p rackspace-test {0}'.format(INSTANCE_NAME), timeout=500)]
)
except AssertionError:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
raise
# delete the instance
try:
self.assertIn(
INSTANCE_NAME + ':',
[i.strip() for i in self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)]
)
except AssertionError:
raise
def tearDown(self):
'''
Clean up after tests
'''
query = self.run_cloud('--query')
ret = ' {0}:'.format(INSTANCE_NAME)
# if test instance is still present, delete it
if ret in query:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)

View File

@ -31,7 +31,7 @@ peer:
- 'test.*'
ext_pillar:
- test_ext_pillar_opts:
- ext_pillar_opts:
- test_issue_5951_actual_file_roots_in_opts
config_opt:

View File

@ -15,11 +15,11 @@ import logging
log = logging.getLogger(__name__)
# DRY up the name we use
MY_NAME = 'test_ext_pillar_opts'
MY_NAME = 'ext_pillar_opts'
def __virtual__():
log.debug('Loaded external pillar {0} as {1}'.format(__name__, MY_NAME))
log.debug('Loaded external pillar %s as %s', __name__, MY_NAME)
return True

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k1': 'v1'}
return grains

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k2': 'v2'}
return grains

View File

@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k1': 'v1'}
return grains

View File

@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k2': 'v2'}
return grains

View File

@ -0,0 +1,6 @@
{% set result = {"Question": "Quieres Café?"} %}
test:
module.run:
- name: test.echo
- text: '{{ result | json }}'

View File

@ -0,0 +1,6 @@
{% set result = {"Question": "Quieres Café?"} %}
test:
module.run:
- name: test.echo
- text: "{{ result | yaml }}"

View File

@ -48,7 +48,7 @@ class PillarModuleTest(ModuleCase):
def test_issue_5951_actual_file_roots_in_opts(self):
self.assertIn(
TMP_STATE_TREE,
self.run_function('pillar.data')['test_ext_pillar_opts']['file_roots']['base']
self.run_function('pillar.data')['ext_pillar_opts']['file_roots']['base']
)
def test_pillar_items(self):

View File

@ -12,7 +12,7 @@ from salt.netapi.rest_tornado import saltnado
from salt.utils.versions import StrictVersion
# Import Salt Testing Libs
from tests.unit.netapi.rest_tornado.test_handlers import SaltnadoTestCase
from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase
from tests.support.helpers import flaky
from tests.support.unit import skipIf

View File

@ -363,7 +363,11 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
with salt.utils.files.fopen(grain_path, 'r') as fp_:
file_contents = fp_.readlines()
self.assertTrue(re.match('^minion$', file_contents[0]))
if salt.utils.platform.is_windows():
match = '^minion\r\n'
else:
match = '^minion\n'
self.assertTrue(re.match(match, file_contents[0]))
def test_managed_file_with_pillar_sls(self):
'''
@ -588,6 +592,9 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
name = os.path.join(TMP, 'local_source_with_source_hash')
local_path = os.path.join(BASE_FILES, 'grail', 'scene33')
actual_hash = '567fd840bf1548edc35c48eb66cdd78bfdfcccff'
if salt.utils.platform.is_windows():
# CRLF vs LF causes a differnt hash on windows
actual_hash = 'f658a0ec121d9c17088795afcc6ff3c43cb9842a'
# Reverse the actual hash
bad_hash = actual_hash[::-1]
@ -712,6 +719,9 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
'-{0}_|-managed'.format(name)
local_path = os.path.join(BASE_FILES, 'hello_world.txt')
actual_hash = 'c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31'
if salt.utils.platform.is_windows():
# CRLF vs LF causes a differnt hash on windows
actual_hash = '92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925'
uppercase_hash = actual_hash.upper()
try:
@ -893,10 +903,14 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
self.assertFalse(os.path.exists(straydir))
self.assertTrue(os.path.isdir(name))
@skipIf(salt.utils.platform.is_windows(), 'Skip on windows')
@with_tempdir()
def test_directory_clean_exclude(self, base_dir):
'''
file.directory with clean=True and exclude_pat set
Skipped on windows because clean and exclude_pat not supported by
salt.sates.file._check_directory_win
'''
name = os.path.join(base_dir, 'directory_clean_dir')
if not os.path.isdir(name):
@ -1258,6 +1272,7 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
self.assertTrue(os.path.isfile(os.path.join(name, '32', 'scene')))
self.assertTrue(os.path.isfile(os.path.join(name, 'scene34')))
@skipIf(salt.utils.platform.is_windows(), 'Skip on windows')
@with_tempdir()
def test_recurse_issue_34945(self, base_dir):
'''
@ -1273,6 +1288,8 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
repaired.
This was fixed in https://github.com/saltstack/salt/pull/35309
Skipped on windows because dir_mode is not supported.
'''
dir_mode = '2775'
issue_dir = 'issue-34945'
@ -1514,7 +1531,7 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
ret = []
for x in range(0, 3):
ret.append(self.run_state('file.replace',
name=path_test, pattern='^#foo=bar$', repl='foo=salt', append_if_not_found=True))
name=path_test, pattern='^#foo=bar($|(?=\r\n))', repl='foo=salt', append_if_not_found=True))
# ensure, the resulting file contains the expected lines
self.assertTrue(filecmp.cmp(path_test, path_out))
@ -1601,16 +1618,18 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
with salt.utils.files.fopen(path_test, 'r') as fp_:
serialized_file = fp_.read()
expected_file = '''{
"a_list": [
"first_element",
"second_element"
],
"description": "A basic test",
"finally": "the last item",
"name": "naive"
}
'''
expected_file = os.linesep.join([
'{',
' "a_list": [',
' "first_element",',
' "second_element"',
' ],',
' "description": "A basic test",',
' "finally": "the last item",',
' "name": "naive"',
'}',
'',
])
self.assertEqual(serialized_file, expected_file)
@with_tempdir()
@ -2014,6 +2033,10 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
def test_issue_8343_accumulated_require_in(self, base_dir):
template_path = os.path.join(TMP_STATE_TREE, 'issue-8343.sls')
testcase_filedest = os.path.join(base_dir, 'issue-8343.txt')
if os.path.exists(template_path):
os.remove(template_path)
if os.path.exists(testcase_filedest):
os.remove(testcase_filedest)
sls_template = [
'{0}:',
' file.managed:',
@ -3731,7 +3754,11 @@ class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
cls.webserver = Webserver()
cls.webserver.start()
cls.source = cls.webserver.url('grail/scene33')
cls.source_hash = 'd2feb3beb323c79fc7a0f44f1408b4a3'
if salt.utils.platform.is_windows():
# CRLF vs LF causes a differnt hash on windows
cls.source_hash = '21438b3d5fd2c0028bcab92f7824dc69'
else:
cls.source_hash = 'd2feb3beb323c79fc7a0f44f1408b4a3'
@classmethod
def tearDownClass(cls):

View File

@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
'''
Tests for the MySQL states
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
from tests.support.mixins import SaltReturnAssertsMixin
# Import salt libs
import salt.utils.path
from salt.ext import six
NO_MYSQL = False
try:
import MySQLdb # pylint: disable=import-error,unused-import
except ImportError:
NO_MYSQL = True
if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlDatabaseStateTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the mysql_database state
'''
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlDatabaseStateTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
def _test_database(self, db_name, second_db_name, test_conn, **kwargs):
'''
Create db two times, test conn, remove it two times
'''
# In case of...
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
ret = self.run_state('mysql_database.present',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'The database ' + db_name + ' has been created',
ret
)
#2nd run
ret = self.run_state('mysql_database.present',
name=second_db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is already present',
ret
)
if test_conn:
# test root connection
ret = self.run_function(
'mysql.query',
database=db_name,
query='SELECT 1',
**kwargs
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' on db \'{0}\': {1}').format(
db_name,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# Now removing databases
kwargs.pop('character_set')
kwargs.pop('collate')
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' has been removed',
ret
)
#2nd run
ret = self.run_state('mysql_database.absent',
name=second_db_name,
** kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is not present, so it cannot be removed',
ret
)
self.assertSaltStateChangesEqual(ret, {})
@destructiveTest
def test_present_absent(self):
'''
mysql_database.present
'''
self._test_database(
'testdb1',
'testdb1',
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# TODO: test with variations on collate and charset, check for db alter
# once it will be done in mysql_database.present state
@destructiveTest
def test_present_absent_fuzzy(self):
'''
mysql_database.present with utf-8 andf fuzzy db name
'''
# this is : ":() ;,?@=`&'\
dbname_fuzzy = '":() ;,?@=`&/\'\\'
# \xe6\xa8\x99\ = \u6a19 = 標
# this is : "();,?:@=`&/標'\
dbname_utf8 = '"();,?@=`&//\xe6\xa8\x99\'\\'
dbname_unicode = u'"();,?@=`&//\u6a19\'\\'
self._test_database(
dbname_fuzzy,
dbname_fuzzy,
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# FIXME: MySQLdb bugs on dbnames with utf-8?
self._test_database(
dbname_utf8,
dbname_unicode,
test_conn=False,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
#saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
@skipIf(True, 'This tests needs issue #8947 to be fixed first')
def test_utf8_from_sls_file(self):
'''
Try to create/destroy an utf-8 database name from an sls file #8947
'''
expected_result = {
'mysql_database_|-A_|-foo \xe6\xba\x96`bar_|-present': {
'__run_num__': 0,
'comment': 'The database foo \xe6\xba\x96`bar has been created',
'result': True},
'mysql_database_|-B_|-foo \xe6\xba\x96`bar_|-absent': {
'__run_num__': 1,
'comment': 'Database foo \xe6\xba\x96`bar has been removed',
'result': True},
}
result = {}
ret = self.run_function('state.sls', mods='mysql_utf8')
if not isinstance(ret, dict):
raise AssertionError(
('Unexpected result while testing external mysql utf8 sls'
': {0}').format(
repr(ret)
)
)
for item, descr in six.iteritems(ret):
result[item] = {
'__run_num__': descr['__run_num__'],
'comment': descr['comment'],
'result': descr['result']
}
self.assertEqual(expected_result, result)

View File

@ -30,211 +30,6 @@ if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlDatabaseStateTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the mysql_database state
'''
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlDatabaseStateTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
def _test_database(self, db_name, second_db_name, test_conn, **kwargs):
'''
Create db two times, test conn, remove it two times
'''
# In case of...
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
ret = self.run_state('mysql_database.present',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'The database ' + db_name + ' has been created',
ret
)
#2nd run
ret = self.run_state('mysql_database.present',
name=second_db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is already present',
ret
)
if test_conn:
# test root connection
ret = self.run_function(
'mysql.query',
database=db_name,
query='SELECT 1',
**kwargs
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' on db \'{0}\': {1}').format(
db_name,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# Now removing databases
kwargs.pop('character_set')
kwargs.pop('collate')
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' has been removed',
ret
)
#2nd run
ret = self.run_state('mysql_database.absent',
name=second_db_name,
** kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is not present, so it cannot be removed',
ret
)
self.assertSaltStateChangesEqual(ret, {})
@destructiveTest
def test_present_absent(self):
'''
mysql_database.present
'''
self._test_database(
'testdb1',
'testdb1',
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# TODO: test with variations on collate and charset, check for db alter
# once it will be done in mysql_database.present state
@destructiveTest
def test_present_absent_fuzzy(self):
'''
mysql_database.present with utf-8 andf fuzzy db name
'''
# this is : ":() ;,?@=`&'\
dbname_fuzzy = '":() ;,?@=`&/\'\\'
# \xe6\xa8\x99\ = \u6a19 = 標
# this is : "();,?:@=`&/標'\
dbname_utf8 = '"();,?@=`&//\xe6\xa8\x99\'\\'
dbname_unicode = u'"();,?@=`&//\u6a19\'\\'
self._test_database(
dbname_fuzzy,
dbname_fuzzy,
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# FIXME: MySQLdb bugs on dbnames with utf-8?
self._test_database(
dbname_utf8,
dbname_unicode,
test_conn=False,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
#saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
@skipIf(True, 'This tests needs issue #8947 to be fixed first')
def test_utf8_from_sls_file(self):
'''
Try to create/destroy an utf-8 database name from an sls file #8947
'''
expected_result = {
'mysql_database_|-A_|-foo \xe6\xba\x96`bar_|-present': {
'__run_num__': 0,
'comment': 'The database foo \xe6\xba\x96`bar has been created',
'result': True},
'mysql_database_|-B_|-foo \xe6\xba\x96`bar_|-absent': {
'__run_num__': 1,
'comment': 'Database foo \xe6\xba\x96`bar has been removed',
'result': True},
}
result = {}
ret = self.run_function('state.sls', mods='mysql_utf8')
if not isinstance(ret, dict):
raise AssertionError(
('Unexpected result while testing external mysql utf8 sls'
': {0}').format(
repr(ret)
)
)
for item, descr in six.iteritems(ret):
result[item] = {
'__run_num__': descr['__run_num__'],
'comment': descr['comment'],
'result': descr['result']
}
self.assertEqual(expected_result, result)
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
'''
Test the ssh_auth states
'''
# Import python libs
from __future__ import absolute_import, unicode_literals, print_function
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
from tests.support.runtests import RUNTIME_VARS
from tests.support.helpers import (
destructiveTest,
with_system_user,
skip_if_not_root
)
# Import salt libs
import salt.utils.files
class SSHAuthStateTests(ModuleCase, SaltReturnAssertsMixin):
@destructiveTest
@skip_if_not_root
@with_system_user('issue_7409', on_existing='delete', delete=True)
def test_issue_7409_no_linebreaks_between_keys(self, username):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
ret = self.run_state(
'file.managed',
name=authorized_keys_file,
user=username,
makedirs=True,
contents_newline=False,
# Explicit no ending line break
contents='ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root'
)
ret = self.run_state(
'ssh_auth.present',
name='AAAAB3NzaC1kcQ9J5bYTEyZ==',
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
self.assertSaltStateChangesEqual(
ret, {'AAAAB3NzaC1kcQ9J5bYTEyZ==': 'New'}
)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root\n'
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
@destructiveTest
@skip_if_not_root
@with_system_user('issue_10198', on_existing='delete', delete=True)
def test_issue_10198_keyfile_from_another_env(self, username=None):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
key_fname = 'issue_10198.id_rsa.pub'
# Create the keyfile that we expect to get back on the state call
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_PRODENV_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
# Create a bogus key file on base environment
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa BAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}?saltenv=prod'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
os.unlink(authorized_keys_file)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username,
saltenv='prod'
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
Test the ssh_known_hosts state
Test the ssh_known_hosts states
'''
# Import python libs
@ -12,15 +12,7 @@ import shutil
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
from tests.support.runtests import RUNTIME_VARS
from tests.support.helpers import (
destructiveTest,
with_system_user,
skip_if_binaries_missing,
skip_if_not_root
)
# Import salt libs
import salt.utils.files
from tests.support.helpers import skip_if_binaries_missing
KNOWN_HOSTS = os.path.join(RUNTIME_VARS.TMP, 'known_hosts')
GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
@ -154,98 +146,3 @@ class SSHKnownHostsStateTest(ModuleCase, SaltReturnAssertsMixin):
# test again
ret = self.run_state('ssh_known_hosts.absent', test=True, **kwargs)
self.assertSaltTrueReturn(ret)
class SSHAuthStateTests(ModuleCase, SaltReturnAssertsMixin):
@destructiveTest
@skip_if_not_root
@with_system_user('issue_7409', on_existing='delete', delete=True)
def test_issue_7409_no_linebreaks_between_keys(self, username):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
ret = self.run_state(
'file.managed',
name=authorized_keys_file,
user=username,
makedirs=True,
contents_newline=False,
# Explicit no ending line break
contents='ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root'
)
ret = self.run_state(
'ssh_auth.present',
name='AAAAB3NzaC1kcQ9J5bYTEyZ==',
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
self.assertSaltStateChangesEqual(
ret, {'AAAAB3NzaC1kcQ9J5bYTEyZ==': 'New'}
)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root\n'
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
@destructiveTest
@skip_if_not_root
@with_system_user('issue_10198', on_existing='delete', delete=True)
def test_issue_10198_keyfile_from_another_env(self, username=None):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
key_fname = 'issue_10198.id_rsa.pub'
# Create the keyfile that we expect to get back on the state call
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_PRODENV_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
# Create a bogus key file on base environment
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa BAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}?saltenv=prod'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
os.unlink(authorized_keys_file)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username,
saltenv='prod'
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)

View File

@ -160,7 +160,7 @@ TEST_SUITES = {
'path': 'integration/netapi'},
'cloud_provider':
{'display_name': 'Cloud Provider',
'path': 'integration/cloud/providers'},
'path': 'integration/cloud/clouds'},
'minion':
{'display_name': 'Minion',
'path': 'integration/minion'},

View File

@ -78,7 +78,10 @@ def no_symlinks():
return not HAS_SYMLINKS
output = ''
try:
output = subprocess.check_output('git config --get core.symlinks', shell=True)
output = subprocess.Popen(
['git', 'config', '--get', 'core.symlinks'],
cwd=TMP,
stdout=subprocess.PIPE).communicate()[0]
except OSError as exc:
if exc.errno != errno.ENOENT:
raise

View File

@ -713,3 +713,23 @@ class JinjaFiltersTest(object):
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)
def test_yaml(self):
'''
test yaml filter
'''
_expected = {'ret': "{Question: 'Quieres Café?'}"}
ret = self.run_function('state.sls', ['jinja_filters.yaml'])
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)
def test_json(self):
'''
test json filter
'''
_expected = {'ret': '{"Question": "Quieres Café?"}'}
ret = self.run_function('state.sls', ['jinja_filters.json'])
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)

View File

@ -3,7 +3,7 @@
:synopsis: Base class for kernelpkg modules
:platform: Linux
:maturity: develop
versionadded:: 2018.3.0
.. versionadded:: 2018.3.0
'''
# pylint: disable=invalid-name,no-member

View File

@ -7,6 +7,11 @@ from __future__ import absolute_import
from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON
from tests.support.mixins import LoaderModuleMockMixin
try:
from pyroute2 import IPDB
HAS_PYROUTE2 = True
except ImportError:
HAS_PYROUTE2 = False
# Salt libs
import salt.beacons.network_settings as network_settings
@ -43,3 +48,18 @@ class NetworkSettingsBeaconTestCase(TestCase, LoaderModuleMockMixin):
ret = network_settings.validate(config)
self.assertEqual(ret, (True, 'Valid beacon configuration'))
@skipIf(not HAS_PYROUTE2, 'no pyroute2 installed, skipping')
class Pyroute2TestCase(TestCase):
def test_interface_dict_fields(self):
with IPDB() as ipdb:
for attr in network_settings.ATTRS:
# ipdb.interfaces is a dict-like object, that
# contains interface definitions. Interfaces can
# be referenced both with indices and names.
#
# ipdb.interfaces[1] is an interface with index 1,
# that is the loopback interface.
self.assertIn(attr, ipdb.interfaces[1])

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
unit tests for the localfs cache
unit tests for salt.cache
'''
# Import Python libs

View File

@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Daniel Wallace <dwallace@saltstack.com`
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import os
import shutil
import tempfile
# Import Salt Testing libs
from tests.support.unit import skipIf, TestCase
from tests.support.case import ShellCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
from tests.support.paths import TMP
# Import Salt libs
import salt.config
import salt.roster
import salt.utils.files
import salt.utils.path
import salt.utils.thin
import salt.utils.yaml
from salt.client import ssh
ROSTER = '''
localhost:
host: 127.0.0.1
port: 2827
self:
host: 0.0.0.0
port: 42
'''
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not salt.utils.path.which('ssh'), "No ssh binary found in path")
class SSHPasswordTests(ShellCase):
def test_password_failure(self):
'''
Check password failures when trying to deploy keys
'''
opts = salt.config.client_config(self.get_config_file_path('master'))
opts['list_hosts'] = False
opts['argv'] = ['test.ping']
opts['selected_target_option'] = 'glob'
opts['tgt'] = 'localhost'
opts['arg'] = []
roster = os.path.join(self.config_dir, 'roster')
handle_ssh_ret = [
{'localhost': {'retcode': 255, 'stderr': u'Permission denied (publickey).\r\n', 'stdout': ''}},
]
expected = {'localhost': 'Permission denied (publickey)'}
display_output = MagicMock()
with patch('salt.roster.get_roster_file', MagicMock(return_value=roster)), \
patch('salt.client.ssh.SSH.handle_ssh', MagicMock(return_value=handle_ssh_ret)), \
patch('salt.client.ssh.SSH.key_deploy', MagicMock(return_value=expected)), \
patch('salt.output.display_output', display_output):
client = ssh.SSH(opts)
ret = next(client.run_iter())
with self.assertRaises(SystemExit):
client.run()
display_output.assert_called_once_with(expected, 'nested', opts)
self.assertIs(ret, handle_ssh_ret[0])
class SSHRosterDefaults(TestCase):
def test_roster_defaults_flat(self):
'''
Test Roster Defaults on the flat roster
'''
tempdir = tempfile.mkdtemp(dir=TMP)
expected = {
'self': {
'host': '0.0.0.0',
'user': 'daniel',
'port': 42,
},
'localhost': {
'host': '127.0.0.1',
'user': 'daniel',
'port': 2827,
},
}
try:
root_dir = os.path.join(tempdir, 'foo', 'bar')
os.makedirs(root_dir)
fpath = os.path.join(root_dir, 'config')
with salt.utils.files.fopen(fpath, 'w') as fp_:
fp_.write(
'''
roster_defaults:
user: daniel
'''
)
opts = salt.config.master_config(fpath)
with patch('salt.roster.get_roster_file', MagicMock(return_value=ROSTER)):
with patch('salt.template.compile_template', MagicMock(return_value=salt.utils.yaml.safe_load(ROSTER))):
roster = salt.roster.Roster(opts=opts)
self.assertEqual(roster.targets('*', 'glob'), expected)
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class SSHSingleTests(TestCase):
def setUp(self):
self.tmp_cachedir = tempfile.mkdtemp(dir=TMP)
def test_single_opts(self):
''' Sanity check for ssh.Single options
'''
argv = ['ssh.set_auth_key', 'root', 'hobn+amNAXSBTiOXEqlBjGB...rsa root@master']
opts = {
'argv': argv,
'__role': 'master',
'cachedir': self.tmp_cachedir,
'extension_modules': os.path.join(self.tmp_cachedir, 'extmods'),
}
target = {
'passwd': 'abc123',
'ssh_options': None,
'sudo': False,
'identities_only': False,
'host': 'login1',
'user': 'root',
'timeout': 65,
'remote_port_forwards': None,
'sudo_user': '',
'port': '22',
'priv': '/etc/salt/pki/master/ssh/salt-ssh.rsa'
}
single = ssh.Single(
opts,
opts['argv'],
'localhost',
mods={},
fsclient=None,
thin=salt.utils.thin.thin_path(opts['cachedir']),
mine=False,
**target)
self.assertEqual(single.shell._ssh_opts(), '')
self.assertEqual(single.shell._cmd_str('date +%s'), 'ssh login1 '
'-o KbdInteractiveAuthentication=no -o '
'PasswordAuthentication=yes -o ConnectTimeout=65 -o Port=22 '
'-o IdentityFile=/etc/salt/pki/master/ssh/salt-ssh.rsa '
'-o User=root date +%s')

View File

@ -1,140 +0,0 @@
# -*- coding: utf-8 -*-
'''
tests.unit.api_config_test
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.unit import skipIf, TestCase
from tests.support.helpers import destructiveTest
from tests.support.mock import (
MagicMock,
NO_MOCK,
NO_MOCK_REASON,
patch
)
# Import Salt libs
import salt.config
import salt.utils.platform
import salt.syspaths
MOCK_MASTER_DEFAULT_OPTS = {
'log_file': '{0}/var/log/salt/master'.format(salt.syspaths.ROOT_DIR),
'pidfile': '{0}/var/run/salt-master.pid'.format(salt.syspaths.ROOT_DIR),
'root_dir': format(salt.syspaths.ROOT_DIR)
}
if salt.utils.platform.is_windows():
MOCK_MASTER_DEFAULT_OPTS = {
'log_file': '{0}\\var\\log\\salt\\master'.format(
salt.syspaths.ROOT_DIR),
'pidfile': '{0}\\var\\run\\salt-master.pid'.format(
salt.syspaths.ROOT_DIR),
'root_dir': format(salt.syspaths.ROOT_DIR)
}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class APIConfigTestCase(TestCase):
'''
TestCase for the api_config function in salt.config.__init__.py
'''
def setUp(self):
# Copy DEFAULT_API_OPTS to restore after the test
self.default_api_opts = salt.config.DEFAULT_API_OPTS.copy()
def tearDown(self):
# Reset DEFAULT_API_OPTS settings as to not interfere with other unit tests
salt.config.DEFAULT_API_OPTS = self.default_api_opts
def test_api_config_log_file_values(self):
'''
Tests the opts value of the 'log_file' after running through the
various default dict updates. 'log_file' should be updated to match
the DEFAULT_API_OPTS 'api_logfile' value.
'''
with patch('salt.config.client_config', MagicMock(return_value=MOCK_MASTER_DEFAULT_OPTS)):
expected = '{0}/var/log/salt/api'.format(
salt.syspaths.ROOT_DIR if salt.syspaths.ROOT_DIR != '/' else '')
if salt.utils.platform.is_windows():
expected = '{0}\\var\\log\\salt\\api'.format(
salt.syspaths.ROOT_DIR)
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['log_file'], expected)
def test_api_config_pidfile_values(self):
'''
Tests the opts value of the 'pidfile' after running through the
various default dict updates. 'pidfile' should be updated to match
the DEFAULT_API_OPTS 'api_pidfile' value.
'''
with patch('salt.config.client_config', MagicMock(return_value=MOCK_MASTER_DEFAULT_OPTS)):
expected = '{0}/var/run/salt-api.pid'.format(
salt.syspaths.ROOT_DIR if salt.syspaths.ROOT_DIR != '/' else '')
if salt.utils.platform.is_windows():
expected = '{0}\\var\\run\\salt-api.pid'.format(
salt.syspaths.ROOT_DIR)
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['pidfile'], expected)
@destructiveTest
def test_master_config_file_overrides_defaults(self):
'''
Tests the opts value of the api config values after running through the
various default dict updates that should be overridden by settings in
the user's master config file.
'''
foo_dir = '/foo/bar/baz'
hello_dir = '/hello/world'
if salt.utils.platform.is_windows():
foo_dir = 'c:\\foo\\bar\\baz'
hello_dir = 'c:\\hello\\world'
mock_master_config = {
'api_pidfile': foo_dir,
'api_logfile': hello_dir,
'rest_timeout': 5
}
mock_master_config.update(MOCK_MASTER_DEFAULT_OPTS.copy())
with patch('salt.config.client_config',
MagicMock(return_value=mock_master_config)):
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['rest_timeout'], 5)
self.assertEqual(ret['api_pidfile'], foo_dir)
self.assertEqual(ret['pidfile'], foo_dir)
self.assertEqual(ret['api_logfile'], hello_dir)
self.assertEqual(ret['log_file'], hello_dir)
@destructiveTest
def test_api_config_prepend_root_dirs_return(self):
'''
Tests the opts value of the api_logfile, log_file, api_pidfile, and pidfile
when a custom root directory is used. This ensures that each of these
values is present in the list of opts keys that should have the root_dir
prepended when the api_config function returns the opts dictionary.
'''
mock_log = '/mock/root/var/log/salt/api'
mock_pid = '/mock/root/var/run/salt-api.pid'
mock_master_config = MOCK_MASTER_DEFAULT_OPTS.copy()
mock_master_config['root_dir'] = '/mock/root/'
if salt.utils.platform.is_windows():
mock_log = 'c:\\mock\\root\\var\\log\\salt\\api'
mock_pid = 'c:\\mock\\root\\var\\run\\salt-api.pid'
mock_master_config['root_dir'] = 'c:\\mock\\root'
with patch('salt.config.client_config',
MagicMock(return_value=mock_master_config)):
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['api_logfile'], mock_log)
self.assertEqual(ret['log_file'], mock_log)
self.assertEqual(ret['api_pidfile'], mock_pid)
self.assertEqual(ret['pidfile'], mock_pid)

File diff suppressed because it is too large Load Diff

View File

@ -1,361 +0,0 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: Mike Place <mp@saltstack.com>
'''
# Import Python libs
from __future__ import absolute_import
import errno
import logging
import os
import shutil
# Import Salt Testing libs
from tests.integration import AdaptedConfigurationTestCaseMixin
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
# Import salt libs
import salt.utils.files
from salt import fileclient
from salt.ext import six
log = logging.getLogger(__name__)
SALTENVS = ('base', 'dev')
FS_ROOT = os.path.join(TMP, 'fileclient_fs_root')
CACHE_ROOT = os.path.join(TMP, 'fileclient_cache_root')
SUBDIR = 'subdir'
SUBDIR_FILES = ('foo.txt', 'bar.txt', 'baz.txt')
def _get_file_roots():
return dict(
[(x, [os.path.join(FS_ROOT, x)]) for x in SALTENVS]
)
MOCKED_OPTS = {
'file_roots': _get_file_roots(),
'fileserver_backend': ['roots'],
'cachedir': CACHE_ROOT,
'file_client': 'local',
}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class FileClientTest(TestCase, AdaptedConfigurationTestCaseMixin, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {fileclient: {'__opts__': MOCKED_OPTS}}
def setUp(self):
self.file_client = fileclient.Client(self.master_opts)
def tearDown(self):
del self.file_client
def test_file_list_emptydirs(self):
'''
Ensure that the fileclient class won't allow a direct call to file_list_emptydirs()
'''
with self.assertRaises(NotImplementedError):
self.file_client.file_list_emptydirs()
def test_get_file(self):
'''
Ensure that the fileclient class won't allow a direct call to get_file()
'''
with self.assertRaises(NotImplementedError):
self.file_client.get_file(None)
def test_get_file_client(self):
minion_opts = self.get_temp_config('minion')
minion_opts['file_client'] = 'remote'
with patch('salt.fileclient.RemoteClient', MagicMock(return_value='remote_client')):
ret = fileclient.get_file_client(minion_opts)
self.assertEqual('remote_client', ret)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class FileclientCacheTest(TestCase, AdaptedConfigurationTestCaseMixin, LoaderModuleMockMixin):
'''
Tests for the fileclient caching. The LocalClient is the only thing we can
test as it is the only way we can mock the fileclient (the tests run from
the minion process, so the master cannot be mocked from test code).
'''
def setup_loader_modules(self):
return {fileclient: {'__opts__': MOCKED_OPTS}}
def setUp(self):
'''
No need to add a dummy foo.txt to muddy up the github repo, just make
our own fileserver root on-the-fly.
'''
def _new_dir(path):
'''
Add a new dir at ``path`` using os.makedirs. If the directory
already exists, remove it recursively and then try to create it
again.
'''
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
# Just in case a previous test was interrupted, remove the
# directory and try adding it again.
shutil.rmtree(path)
os.makedirs(path)
else:
raise
# Crete the FS_ROOT
for saltenv in SALTENVS:
saltenv_root = os.path.join(FS_ROOT, saltenv)
# Make sure we have a fresh root dir for this saltenv
_new_dir(saltenv_root)
path = os.path.join(saltenv_root, 'foo.txt')
with salt.utils.files.fopen(path, 'w') as fp_:
fp_.write(
'This is a test file in the \'{0}\' saltenv.\n'
.format(saltenv)
)
subdir_abspath = os.path.join(saltenv_root, SUBDIR)
os.makedirs(subdir_abspath)
for subdir_file in SUBDIR_FILES:
path = os.path.join(subdir_abspath, subdir_file)
with salt.utils.files.fopen(path, 'w') as fp_:
fp_.write(
'This is file \'{0}\' in subdir \'{1} from saltenv '
'\'{2}\''.format(subdir_file, SUBDIR, saltenv)
)
# Create the CACHE_ROOT
_new_dir(CACHE_ROOT)
def tearDown(self):
'''
Remove the directories created for these tests
'''
shutil.rmtree(FS_ROOT)
shutil.rmtree(CACHE_ROOT)
def test_cache_dir(self):
'''
Ensure entire directory is cached to correct location
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=None
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_dir_with_alternate_cachedir_and_absolute_path(self):
'''
Ensure entire directory is cached to correct location when an alternate
cachedir is specified and that cachedir is an absolute path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = os.path.join(TMP, 'abs_cachedir')
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=alt_cachedir
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(alt_cachedir,
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_dir_with_alternate_cachedir_and_relative_path(self):
'''
Ensure entire directory is cached to correct location when an alternate
cachedir is specified and that cachedir is a relative path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = 'foo'
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=alt_cachedir
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
alt_cachedir,
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_file(self):
'''
Ensure file is cached to correct location
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt', saltenv, cachedir=None)
)
cache_loc = os.path.join(
fileclient.__opts__['cachedir'], 'files', saltenv, 'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)
def test_cache_file_with_alternate_cachedir_and_absolute_path(self):
'''
Ensure file is cached to correct location when an alternate cachedir is
specified and that cachedir is an absolute path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = os.path.join(TMP, 'abs_cachedir')
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt',
saltenv,
cachedir=alt_cachedir)
)
cache_loc = os.path.join(alt_cachedir,
'files',
saltenv,
'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)
def test_cache_file_with_alternate_cachedir_and_relative_path(self):
'''
Ensure file is cached to correct location when an alternate cachedir is
specified and that cachedir is a relative path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = 'foo'
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt',
saltenv,
cachedir=alt_cachedir)
)
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
alt_cachedir,
'files',
saltenv,
'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)

View File

@ -1,145 +0,0 @@
# -*- coding: utf-8 -*-
'''
integration.loader.globals
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Salt's loader regarding globals that it should pack in
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import inspect
import sys
# Import Salt Testing libs
from tests.support.case import ModuleCase
# Import salt libs
import salt.loader
import salt.utils.yaml
# Import 3rd-party libs
from salt.ext import six
class LoaderGlobalsTest(ModuleCase):
'''
Test all of the globals that the loader is responsible for adding to modules
This shouldn't be done here, but should rather be done per module type (in the cases where they are used)
so they can check ALL globals that they have (or should have) access to.
This is intended as a shorter term way of testing these so we don't break the loader
'''
def _verify_globals(self, mod_dict):
'''
Verify that the globals listed in the doc string (from the test) are in these modules
'''
# find the globals
global_vars = []
for val in six.itervalues(mod_dict):
# only find salty globals
if val.__module__.startswith('salt.loaded'):
if hasattr(val, '__globals__'):
if '__wrapped__' in val.__globals__:
global_vars.append(sys.modules[val.__module__].__dict__)
else:
global_vars.append(val.__globals__)
# if we couldn't find any, then we have no modules -- so something is broken
self.assertNotEqual(global_vars, [], msg='No modules were loaded.')
# get the names of the globals you should have
func_name = inspect.stack()[1][3]
names = next(six.itervalues(salt.utils.yaml.safe_load(getattr(self, func_name).__doc__)))
# Now, test each module!
for item in global_vars:
for name in names:
self.assertIn(name, list(item.keys()))
def test_auth(self):
'''
Test that auth mods have:
- __pillar__
- __grains__
- __salt__
- __context__
'''
self._verify_globals(salt.loader.auth(self.master_opts))
def test_runners(self):
'''
Test that runners have:
- __pillar__
- __salt__
- __opts__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.runner(self.master_opts))
def test_returners(self):
'''
Test that returners have:
- __salt__
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.returners(self.master_opts, {}))
def test_pillars(self):
'''
Test that pillars have:
- __salt__
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.pillars(self.master_opts, {}))
def test_tops(self):
'''
Test that tops have: []
'''
self._verify_globals(salt.loader.tops(self.master_opts))
def test_outputters(self):
'''
Test that outputters have:
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.outputters(self.master_opts))
def test_serializers(self):
'''
Test that serializers have: []
'''
self._verify_globals(salt.loader.serializers(self.master_opts))
def test_states(self):
'''
Test that states:
- __pillar__
- __salt__
- __opts__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.states(self.master_opts, {}, {}, {}))
def test_renderers(self):
'''
Test that renderers have:
- __salt__ # Execution functions (i.e. __salt__['test.echo']('foo'))
- __grains__ # Grains (i.e. __grains__['os'])
- __pillar__ # Pillar data (i.e. __pillar__['foo'])
- __opts__ # Minion configuration options
- __context__ # Context dict shared amongst all modules of the same type
'''
self._verify_globals(salt.loader.render(self.master_opts, {}))

View File

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
'''
integration.loader.interfaces
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Salt's loader
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.unit import TestCase
# Import Salt libs
from salt.ext import six
from salt.config import minion_config
import salt.loader
# TODO: the rest of the public interfaces
class RawModTest(TestCase):
'''
Test the interface of raw_mod
'''
def setUp(self):
self.opts = minion_config(None)
def tearDown(self):
del self.opts
def test_basic(self):
testmod = salt.loader.raw_mod(self.opts, 'test', None)
for k, v in six.iteritems(testmod):
self.assertEqual(k.split('.')[0], 'test')
def test_bad_name(self):
testmod = salt.loader.raw_mod(self.opts, 'module_we_do_not_have', None)
self.assertEqual(testmod, {})

View File

@ -71,7 +71,7 @@ class InspectorCollectorTestCase(TestCase):
inspector = Inspector(cachedir=os.sep + 'test',
piddir=os.sep + 'test',
pidfilename='bar.pid')
tree_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'inspectlib', 'tree_test')
tree_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tree_test')
expected_tree = ([os.sep + os.sep.join(['a', 'a', 'dummy.a']),
os.sep + os.sep.join(['a', 'b', 'dummy.b']),
os.sep + os.sep.join(['b', 'b.1']),

View File

@ -3,7 +3,7 @@
:synopsis: Unit Tests for 'module.aptkernelpkg'
:platform: Linux
:maturity: develop
versionadded:: 2018.3.0
.. versionadded:: 2018.3.0
'''
# pylint: disable=invalid-name,no-member
@ -18,7 +18,7 @@ try:
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
# Import Salt Libs
from tests.unit.modules.test_kernelpkg import KernelPkgTestCase
from tests.support.kernelpkg import KernelPkgTestCase
import salt.modules.kernelpkg_linux_apt as kernelpkg
from salt.exceptions import CommandExecutionError
HAS_MODULES = True

Some files were not shown because too many files have changed in this diff Show More