salt/tests/integration/cloud/providers/test_virtualbox.py
Erik Johnson 3184168365 Use explicit unicode strings + break up salt.utils
This PR is part of what will be an ongoing effort to use explicit
unicode strings in Salt. Because Python 3 does not suport Python 2's raw
unicode string syntax (i.e. `ur'\d+'`), we must use
`salt.utils.locales.sdecode()` to ensure that the raw string is unicode.
However, because of how `salt/utils/__init__.py` has evolved into the
hulking monstrosity it is today, this means importing a large module in
places where it is not needed, which could negatively impact
performance. For this reason, this PR also breaks out some of the
functions from `salt/utils/__init__.py` into new/existing modules under
`salt/utils/`. The long term goal will be that the modules within this
directory do not depend on importing `salt.utils`.

A summary of the changes in this PR is as follows:

* Moves the following functions from `salt.utils` to new locations
  (including a deprecation warning if invoked from `salt.utils`):
  `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`,
  `dequote`, `is_hex`, `is_bin_str`, `rand_string`,
  `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`,
  `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`,
  `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`,
  `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`,
  `is_openbsd`, `is_aix`
* Moves the functions already deprecated by @rallytime to the bottom of
  `salt/utils/__init__.py` for better organization, so we can keep the
  deprecated ones separate from the ones yet to be deprecated as we
  continue to break up `salt.utils`
* Updates `salt/*.py` and all files under `salt/client/` to use explicit
  unicode string literals.
* Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils
  import foo` becomes `import salt.utils.foo as foo`).
* Renames the `test.rand_str` function to `test.random_hash` to more
  accurately reflect what it does
* Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`)
  such that it returns a string matching the passed size. Previously
  this function would get `size` bytes from `os.urandom()`,
  base64-encode it, and return the result, which would in most cases not
  be equal to the passed size.
2017-08-08 13:33:43 -05:00

489 lines
17 KiB
Python

# -*- coding: utf-8 -*-
# This code assumes vboxapi.py from VirtualBox distribution
# being in PYTHONPATH, or installed system-wide
# Import Python Libs
from __future__ import absolute_import
import os
import logging
import socket
# Import Salt Testing Libs
import tests.integration as integration
from tests.support.unit import TestCase, skipIf
from tests.integration.cloud.helpers.virtualbox import (VirtualboxTestCase,
VirtualboxCloudTestCase,
CONFIG_NAME,
PROVIDER_NAME,
PROFILE_NAME,
BASE_BOX_NAME,
INSTANCE_NAME,
BOOTABLE_BASE_BOX_NAME,
DEPLOY_PROFILE_NAME)
# Import Salt Libs
from salt.ext import six
from salt.ext.six.moves import range
from salt.config import cloud_providers_config, vm_profiles_config
from salt.utils.virtualbox import (vb_xpcom_to_attribute_dict,
vb_clone_vm,
vb_destroy_machine,
vb_create_machine,
vb_get_box,
vb_machine_exists,
XPCOM_ATTRIBUTES,
vb_start_vm,
vb_stop_vm,
vb_get_network_addresses,
vb_wait_for_network_address,
machine_get_machinestate_str,
HAS_LIBS)
log = logging.getLogger(__name__)
# As described in the documentation of list_nodes (this may change with time)
MINIMAL_MACHINE_ATTRIBUTES = [
"id",
"image",
"size",
"state",
"private_ips",
"public_ips",
]
class VirtualboxProviderTest(VirtualboxCloudTestCase):
"""
Integration tests for the Virtualbox cloud provider using the Virtualbox driver
"""
def run_cloud_destroy(self, machine_name):
"""
Calls salt-cloud to destroy a machine and returns the destroyed machine object (should be None)
@param machine_name:
@type str:
@return:
@rtype: dict
"""
output = self.run_cloud('-d {0} --assume-yes --log-level=debug'.format(machine_name))
return output.get(CONFIG_NAME, {}).get(PROVIDER_NAME, {})
def setUp(self):
"""
Sets up the test requirements
"""
super(VirtualboxProviderTest, self).setUp()
# check if appropriate cloud provider and profile files are present
profile_str = 'virtualbox-config'
providers = self.run_cloud('--list-providers')
log.debug("providers: %s", providers)
if profile_str not in providers:
self.skipTest(
'Configuration file for {0} was not found. Check {0}.conf files '
'in tests/integration/files/conf/cloud.*.d/ to run these tests.'.format(PROVIDER_NAME)
)
# check if personal access token, ssh_key_file, and ssh_key_names are present
config_path = os.path.join(
integration.FILES,
'conf',
'cloud.providers.d',
PROVIDER_NAME + '.conf'
)
log.debug("config_path: %s", config_path)
providers = cloud_providers_config(config_path)
log.debug("config: %s", providers)
config_path = os.path.join(
integration.FILES,
'conf',
'cloud.profiles.d',
PROVIDER_NAME + '.conf'
)
profiles = vm_profiles_config(config_path, providers)
profile = profiles.get(PROFILE_NAME)
if not profile:
self.skipTest(
'Profile {0} was not found. Check {1}.conf files '
'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'.format(PROFILE_NAME,
PROVIDER_NAME)
)
base_box_name = profile.get("clonefrom")
if base_box_name != BASE_BOX_NAME:
self.skipTest(
'Profile {0} does not have a base box to clone from. Check {1}.conf files '
'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'
'And add a "clone_from: {2}" to the profile'.format(PROFILE_NAME, PROVIDER_NAME, BASE_BOX_NAME)
)
@classmethod
def setUpClass(cls):
vb_create_machine(BASE_BOX_NAME)
@classmethod
def tearDownClass(cls):
vb_destroy_machine(BASE_BOX_NAME)
def test_cloud_create(self):
"""
Simply create a machine and make sure it was created
"""
machines = self.run_cloud('-p {0} {1} --log-level=debug'.format(PROFILE_NAME, INSTANCE_NAME))
self.assertIn(INSTANCE_NAME, machines.keys())
def test_cloud_list(self):
"""
List all machines in virtualbox and make sure the requested attributes are included
"""
machines = self.run_cloud_function('list_nodes')
expected_attributes = MINIMAL_MACHINE_ATTRIBUTES
names = machines.keys()
self.assertGreaterEqual(len(names), 1, "No machines found")
for name, machine in six.iteritems(machines):
if six.PY3:
self.assertCountEqual(expected_attributes, machine.keys())
else:
self.assertItemsEqual(expected_attributes, machine.keys())
self.assertIn(BASE_BOX_NAME, names)
def test_cloud_list_full(self):
"""
List all machines and make sure full information in included
"""
machines = self.run_cloud_function('list_nodes_full')
expected_minimal_attribute_count = len(MINIMAL_MACHINE_ATTRIBUTES)
names = machines.keys()
self.assertGreaterEqual(len(names), 1, "No machines found")
for name, machine in six.iteritems(machines):
self.assertGreaterEqual(len(machine.keys()), expected_minimal_attribute_count)
self.assertIn(BASE_BOX_NAME, names)
def test_cloud_list_select(self):
"""
List selected attributes of all machines
"""
machines = self.run_cloud_function('list_nodes_select')
# TODO find out how to get query.selection from the "cloud" config
expected_attributes = ["id"]
names = machines.keys()
self.assertGreaterEqual(len(names), 1, "No machines found")
for name, machine in six.iteritems(machines):
if six.PY3:
self.assertCountEqual(expected_attributes, machine.keys())
else:
self.assertItemsEqual(expected_attributes, machine.keys())
self.assertIn(BASE_BOX_NAME, names)
def test_cloud_destroy(self):
"""
Test creating an instance on virtualbox with the virtualbox driver
"""
# check if instance with salt installed returned
self.test_cloud_create()
ret = self.run_cloud_destroy(INSTANCE_NAME)
# destroy the instance
self.assertIn(INSTANCE_NAME, ret.keys())
def test_function_show_instance(self):
kw_function_args = {
"image": BASE_BOX_NAME
}
machines = self.run_cloud_function('show_image', kw_function_args, timeout=30)
expected_minimal_attribute_count = len(MINIMAL_MACHINE_ATTRIBUTES)
self.assertIn(BASE_BOX_NAME, machines)
machine = machines[BASE_BOX_NAME]
self.assertGreaterEqual(len(machine.keys()), expected_minimal_attribute_count)
def tearDown(self):
"""
Clean up after tests
"""
if vb_machine_exists(INSTANCE_NAME):
vb_destroy_machine(INSTANCE_NAME)
@skipIf(HAS_LIBS and vb_machine_exists(BOOTABLE_BASE_BOX_NAME) is False,
"Bootable VM '{0}' not found. Cannot run tests.".format(BOOTABLE_BASE_BOX_NAME)
)
class VirtualboxProviderHeavyTests(VirtualboxCloudTestCase):
"""
Tests that include actually booting a machine and doing operations on it that might be lengthy.
"""
def assertIsIpAddress(self, ip_str):
"""
Is it either a IPv4 or IPv6 address
@param ip_str:
@type ip_str: str
@raise AssertionError
"""
try:
socket.inet_aton(ip_str)
except Exception:
try:
socket.inet_pton(socket.AF_INET6, ip_str)
except Exception:
self.fail("{0} is not a valid IP address".format(ip_str))
def setUp(self):
"""
Sets up the test requirements
"""
# check if appropriate cloud provider and profile files are present
provider_str = CONFIG_NAME
providers = self.run_cloud('--list-providers')
log.debug("providers: %s", providers)
if provider_str not in providers:
self.skipTest(
'Configuration file for {0} was not found. Check {0}.conf files '
'in tests/integration/files/conf/cloud.*.d/ to run these tests.'.format(PROVIDER_NAME)
)
# check if personal access token, ssh_key_file, and ssh_key_names are present
config_path = os.path.join(
integration.FILES,
'conf',
'cloud.providers.d',
PROVIDER_NAME + '.conf'
)
log.debug("config_path: %s", config_path)
providers = cloud_providers_config(config_path)
log.debug("config: %s", providers)
config_path = os.path.join(
integration.FILES,
'conf',
'cloud.profiles.d',
PROVIDER_NAME + '.conf'
)
profiles = vm_profiles_config(config_path, providers)
profile = profiles.get(DEPLOY_PROFILE_NAME)
if not profile:
self.skipTest(
'Profile {0} was not found. Check {1}.conf files '
'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'.format(DEPLOY_PROFILE_NAME,
PROVIDER_NAME)
)
base_box_name = profile.get("clonefrom")
if base_box_name != BOOTABLE_BASE_BOX_NAME:
self.skipTest(
'Profile {0} does not have a base box to clone from. Check {1}.conf files '
'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'
'And add a "clone_from: {2}" to the profile'.format(PROFILE_NAME, PROVIDER_NAME, BOOTABLE_BASE_BOX_NAME)
)
def tearDown(self):
try:
vb_stop_vm(BOOTABLE_BASE_BOX_NAME)
except Exception:
pass
if vb_machine_exists(INSTANCE_NAME):
try:
vb_stop_vm(INSTANCE_NAME)
vb_destroy_machine(INSTANCE_NAME)
except Exception as e:
log.warning("Possibly dirty state after exception", exc_info=True)
def test_deploy(self):
machines = self.run_cloud('-p {0} {1} --log-level=debug'.format(DEPLOY_PROFILE_NAME, INSTANCE_NAME))
self.assertIn(INSTANCE_NAME, machines.keys())
machine = machines[INSTANCE_NAME]
self.assertIn("deployed", machine)
self.assertTrue(machine["deployed"], "Machine wasn't deployed :(")
def test_start_stop_action(self):
res = self.run_cloud_action("start", BOOTABLE_BASE_BOX_NAME, timeout=10)
log.info(res)
machine = res.get(BOOTABLE_BASE_BOX_NAME)
self.assertIsNotNone(machine)
expected_state = "Running"
state = machine.get("state")
self.assertEqual(state, expected_state)
res = self.run_cloud_action("stop", BOOTABLE_BASE_BOX_NAME, timeout=10)
log.info(res)
machine = res.get(BOOTABLE_BASE_BOX_NAME)
self.assertIsNotNone(machine)
expected_state = "PoweredOff"
state = machine.get("state")
self.assertEqual(state, expected_state)
def test_restart_action(self):
pass
def test_network_addresses(self):
# Machine is off
ip_addresses = vb_get_network_addresses(machine_name=BOOTABLE_BASE_BOX_NAME)
network_count = len(ip_addresses)
self.assertEqual(network_count, 0)
# Machine is up again
vb_start_vm(BOOTABLE_BASE_BOX_NAME)
ip_addresses = vb_wait_for_network_address(20, machine_name=BOOTABLE_BASE_BOX_NAME)
network_count = len(ip_addresses)
self.assertGreater(network_count, 0)
for ip_address in ip_addresses:
self.assertIsIpAddress(ip_address)
@skipIf(HAS_LIBS is False, 'The \'vboxapi\' library is not available')
class BaseVirtualboxTests(TestCase):
def test_get_manager(self):
self.assertIsNotNone(vb_get_box())
class CreationDestructionVirtualboxTests(VirtualboxTestCase):
def setUp(self):
super(CreationDestructionVirtualboxTests, self).setUp()
def test_vm_creation_and_destruction(self):
vm_name = BASE_BOX_NAME
vb_create_machine(vm_name)
self.assertMachineExists(vm_name)
vb_destroy_machine(vm_name)
self.assertMachineDoesNotExist(vm_name)
class CloneVirtualboxTests(VirtualboxTestCase):
def setUp(self):
self.vbox = vb_get_box()
self.name = "SaltCloudVirtualboxTestVM"
vb_create_machine(self.name)
self.assertMachineExists(self.name)
def tearDown(self):
vb_destroy_machine(self.name)
self.assertMachineDoesNotExist(self.name)
def test_create_machine(self):
vb_name = "NewTestMachine"
machine = vb_clone_vm(
name=vb_name,
clone_from=self.name
)
self.assertEqual(machine.get("name"), vb_name)
self.assertMachineExists(vb_name)
vb_destroy_machine(vb_name)
self.assertMachineDoesNotExist(vb_name)
@skipIf(HAS_LIBS and vb_machine_exists(BOOTABLE_BASE_BOX_NAME) is False,
"Bootable VM '{0}' not found. Cannot run tests.".format(BOOTABLE_BASE_BOX_NAME)
)
class BootVirtualboxTests(VirtualboxTestCase):
def test_start_stop(self):
for i in range(2):
machine = vb_start_vm(BOOTABLE_BASE_BOX_NAME, 20000)
self.assertEqual(machine_get_machinestate_str(machine), "Running")
machine = vb_stop_vm(BOOTABLE_BASE_BOX_NAME)
self.assertEqual(machine_get_machinestate_str(machine), "PoweredOff")
class XpcomConversionTests(TestCase):
@classmethod
def _mock_xpcom_object(cls, interface_name=None, attributes=None):
class XPCOM(object):
def __str__(self):
return "<XPCOM component '<unknown>' (implementing {0})>".format(interface_name)
o = XPCOM()
if attributes and isinstance(attributes, dict):
for key, value in six.iteritems(attributes):
setattr(o, key, value)
return o
def test_unknown_object(self):
xpcom = XpcomConversionTests._mock_xpcom_object()
ret = vb_xpcom_to_attribute_dict(xpcom)
self.assertDictEqual(ret, dict())
def test_imachine_object_default(self):
interface = "IMachine"
imachine = XpcomConversionTests._mock_xpcom_object(interface)
ret = vb_xpcom_to_attribute_dict(imachine, interface_name=interface)
expected_attributes = XPCOM_ATTRIBUTES[interface]
self.assertIsNotNone(expected_attributes, "%s is unknown")
for key in ret:
self.assertIn(key, expected_attributes)
def test_override_attributes(self):
expected_dict = {
"herp": "derp",
"lol": "rofl",
"something": 12345
}
xpc = XpcomConversionTests._mock_xpcom_object(attributes=expected_dict)
ret = vb_xpcom_to_attribute_dict(xpc, attributes=expected_dict.keys())
self.assertDictEqual(ret, expected_dict)
def test_extra_attributes(self):
interface = "IMachine"
expected_extras = {
"extra": "extra",
}
expected_machine = dict([(attribute, attribute) for attribute in XPCOM_ATTRIBUTES[interface]])
expected_machine.update(expected_extras)
imachine = XpcomConversionTests._mock_xpcom_object(interface, attributes=expected_machine)
ret = vb_xpcom_to_attribute_dict(
imachine,
interface_name=interface,
extra_attributes=expected_extras.keys()
)
self.assertDictEqual(ret, expected_machine)
ret_keys = ret.keys()
for key in expected_extras:
self.assertIn(key, ret_keys)
def test_extra_nonexistent_attributes(self):
expected_extra_dict = {
"nonexistent": ""
}
xpcom = XpcomConversionTests._mock_xpcom_object()
ret = vb_xpcom_to_attribute_dict(xpcom, extra_attributes=expected_extra_dict.keys())
self.assertDictEqual(ret, expected_extra_dict)
def test_extra_nonexistent_attribute_with_default(self):
expected_extras = [("nonexistent", list)]
expected_extra_dict = {
"nonexistent": []
}
xpcom = XpcomConversionTests._mock_xpcom_object()
ret = vb_xpcom_to_attribute_dict(xpcom, extra_attributes=expected_extras)
self.assertDictEqual(ret, expected_extra_dict)