salt/tests/support/helpers.py

1278 lines
45 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
'''
:copyright: © 2013-2017 by the SaltStack Team, see AUTHORS for more details.
:license: Apache 2.0, see LICENSE for more details.
tests.support.helpers
~~~~~~~~~~~~~~~~~~~~~
Test support helpers
'''
# pylint: disable=repr-flag-used-in-string
# Import Python libs
from __future__ import absolute_import
import os
import sys
import time
import errno
import types
import signal
import socket
import inspect
import logging
import functools
# Import 3rd-party libs
2017-03-06 13:53:19 +00:00
import psutil # pylint: disable=3rd-party-module-not-gated
import salt.ext.six as six
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
if six.PY2:
import __builtin__ as pybuiltins # pylint: disable=incompatible-py3-code,import-error
else:
import builtins as pybuiltins # pylint: disable=import-error
# Import Salt Tests Support libs
from tests.support.unit import skip, _id
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
log = logging.getLogger(__name__)
def destructiveTest(caller):
'''
Mark a test case as a destructive test for example adding or removing users
from your system.
.. code-block:: python
class MyTestCase(TestCase):
@destructiveTest
def test_create_user(self):
pass
'''
if inspect.isclass(caller):
# We're decorating a class
old_setup = getattr(caller, 'setUp', None)
def setUp(self, *args, **kwargs):
if os.environ.get('DESTRUCTIVE_TESTS', 'False').lower() == 'false':
self.skipTest('Destructive tests are disabled')
if old_setup is not None:
old_setup(self, *args, **kwargs)
caller.setUp = setUp
return caller
# We're simply decorating functions
@functools.wraps(caller)
def wrap(cls):
if os.environ.get('DESTRUCTIVE_TESTS', 'False').lower() == 'false':
cls.skipTest('Destructive tests are disabled')
return caller(cls)
return wrap
def expensiveTest(caller):
'''
Mark a test case as an expensive test, for example, a test which can cost
money(Salt's cloud provider tests).
.. code-block:: python
class MyTestCase(TestCase):
@expensiveTest
def test_create_user(self):
pass
'''
if inspect.isclass(caller):
# We're decorating a class
old_setup = getattr(caller, 'setUp', None)
def setUp(self, *args, **kwargs):
if os.environ.get('EXPENSIVE_TESTS', 'False').lower() == 'false':
self.skipTest('Expensive tests are disabled')
if old_setup is not None:
old_setup(self, *args, **kwargs)
caller.setUp = setUp
return caller
# We're simply decorating functions
@functools.wraps(caller)
def wrap(cls):
if os.environ.get('EXPENSIVE_TESTS', 'False').lower() == 'false':
cls.skipTest('Expensive tests are disabled')
return caller(cls)
return wrap
def flaky(caller=None, condition=True):
'''
Mark a test as flaky. The test will attempt to run five times,
looking for a successful run. After an immediate second try,
it will use an exponential backoff starting with one second.
.. code-block:: python
class MyTestCase(TestCase):
@flaky
def test_sometimes_works(self):
pass
'''
if caller is None:
return functools.partial(flaky, condition=condition)
if isinstance(condition, bool) and condition is False:
# Don't even decorate
return caller
elif callable(condition):
if condition() is False:
# Don't even decorate
return caller
if inspect.isclass(caller):
attrs = [n for n in dir(caller) if n.startswith('test_')]
for attrname in attrs:
try:
function = getattr(caller, attrname)
if not inspect.isfunction(function) and not inspect.ismethod(function):
continue
setattr(caller, attrname, flaky(caller=function, condition=condition))
except Exception as exc:
log.exception(exc)
continue
return caller
@functools.wraps(caller)
def wrap(cls):
for attempt in range(0, 4):
try:
return caller(cls)
except Exception as exc:
if attempt == 4:
raise exc
backoff_time = attempt ** 2
log.info('Found Exception. Waiting %s seconds to retry.', backoff_time)
time.sleep(backoff_time)
return cls
return wrap
def requires_sshd_server(caller):
'''
Mark a test as requiring the tests SSH daemon running.
.. code-block:: python
class MyTestCase(TestCase):
@requiresSshdServer
def test_create_user(self):
pass
'''
if inspect.isclass(caller):
# We're decorating a class
old_setup = getattr(caller, 'setUp', None)
def setUp(self, *args, **kwargs):
if os.environ.get('SSH_DAEMON_RUNNING', 'False').lower() == 'false':
self.skipTest('SSH tests are disabled')
if old_setup is not None:
old_setup(self, *args, **kwargs)
caller.setUp = setUp
return caller
# We're simply decorating functions
@functools.wraps(caller)
def wrap(cls):
if os.environ.get('SSH_DAEMON_RUNNING', 'False').lower() == 'false':
cls.skipTest('SSH tests are disabled')
return caller(cls)
return wrap
class RedirectStdStreams(object):
'''
Temporarily redirect system output to file like objects.
Default is to redirect to `os.devnull`, which just mutes output, `stdout`
and `stderr`.
'''
def __init__(self, stdout=None, stderr=None):
if stdout is None:
stdout = open(os.devnull, 'w')
if stderr is None:
stderr = open(os.devnull, 'w')
self.__stdout = stdout
self.__stderr = stderr
self.__redirected = False
def __enter__(self):
self.redirect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.unredirect()
def redirect(self):
self.old_stdout = sys.stdout
self.old_stdout.flush()
self.old_stderr = sys.stderr
self.old_stderr.flush()
sys.stdout = self.__stdout
sys.stderr = self.__stderr
self.__redirected = True
def unredirect(self):
if not self.__redirected:
return
try:
self.__stdout.flush()
self.__stdout.close()
except ValueError:
# already closed?
pass
try:
self.__stderr.flush()
self.__stderr.close()
except ValueError:
# already closed?
pass
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
def flush(self):
if self.__redirected:
try:
self.__stdout.flush()
except Exception:
pass
try:
self.__stderr.flush()
except Exception:
pass
class TestsLoggingHandler(object):
'''
Simple logging handler which can be used to test if certain logging
messages get emitted or not:
.. code-block:: python
with TestsLoggingHandler() as handler:
# (...) Do what ever you wish here
handler.messages # here are the emitted log messages
'''
def __init__(self, level=0, format='%(levelname)s:%(message)s'):
self.level = level
self.format = format
self.activated = False
self.prev_logging_level = None
def activate(self):
class Handler(logging.Handler):
def __init__(self, level):
logging.Handler.__init__(self, level)
self.messages = []
def emit(self, record):
self.messages.append(self.format(record))
self.handler = Handler(self.level)
formatter = logging.Formatter(self.format)
self.handler.setFormatter(formatter)
logging.root.addHandler(self.handler)
self.activated = True
# Make sure we're running with the lowest logging level with our
# tests logging handler
current_logging_level = logging.root.getEffectiveLevel()
if current_logging_level > logging.DEBUG:
self.prev_logging_level = current_logging_level
logging.root.setLevel(0)
def deactivate(self):
if not self.activated:
return
logging.root.removeHandler(self.handler)
# Restore previous logging level if changed
if self.prev_logging_level is not None:
logging.root.setLevel(self.prev_logging_level)
@property
def messages(self):
if not self.activated:
return []
return self.handler.messages
def clear(self):
self.handler.messages = []
def __enter__(self):
self.activate()
return self
def __exit__(self, type, value, traceback):
self.deactivate()
self.activated = False
# Mimic some handler attributes and methods
@property
def lock(self):
if self.activated:
return self.handler.lock
def createLock(self):
if self.activated:
return self.handler.createLock()
def acquire(self):
if self.activated:
return self.handler.acquire()
def release(self):
if self.activated:
return self.handler.release()
def relative_import(import_name, relative_from='../'):
'''
Update sys.path to include `relative_from` before importing `import_name`
'''
try:
return __import__(import_name)
except ImportError:
previous_frame = inspect.getframeinfo(inspect.currentframe().f_back)
sys.path.insert(
0, os.path.realpath(
os.path.join(
os.path.abspath(
os.path.dirname(previous_frame.filename)
),
relative_from
)
)
)
return __import__(import_name)
class ForceImportErrorOn(object):
'''
This class is meant to be used in mock'ed test cases which require an
``ImportError`` to be raised.
>>> import os.path
>>> with ForceImportErrorOn('os.path'):
... import os.path
...
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "salttesting/helpers.py", line 263, in __import__
'Forced ImportError raised for {0!r}'.format(name)
ImportError: Forced ImportError raised for 'os.path'
>>>
>>> with ForceImportErrorOn(('os', 'path')):
... import os.path
... sys.modules.pop('os', None)
... from os import path
...
<module 'os' from '/usr/lib/python2.7/os.pyc'>
Traceback (most recent call last):
File "<stdin>", line 4, in <module>
File "salttesting/helpers.py", line 288, in __fake_import__
name, ', '.join(fromlist)
ImportError: Forced ImportError raised for 'from os import path'
>>>
>>> with ForceImportErrorOn(('os', 'path'), 'os.path'):
... import os.path
... sys.modules.pop('os', None)
... from os import path
...
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "salttesting/helpers.py", line 281, in __fake_import__
'Forced ImportError raised for {0!r}'.format(name)
ImportError: Forced ImportError raised for 'os.path'
>>>
'''
def __init__(self, *module_names):
self.__module_names = {}
for entry in module_names:
if isinstance(entry, (list, tuple)):
modname = entry[0]
self.__module_names[modname] = set(entry[1:])
else:
self.__module_names[entry] = None
def patch_import_function(self):
self.__original_import = pybuiltins.__import__
pybuiltins.__import__ = self.__fake_import__
def restore_import_funtion(self):
pybuiltins.__import__ = self.__original_import
def __fake_import__(self, name, globals_, locals_, fromlist, level=-1):
if name in self.__module_names:
importerror_fromlist = self.__module_names.get(name)
if importerror_fromlist is None:
raise ImportError(
'Forced ImportError raised for {0!r}'.format(name)
)
if importerror_fromlist.intersection(set(fromlist)):
raise ImportError(
'Forced ImportError raised for {0!r}'.format(
'from {0} import {1}'.format(
name, ', '.join(fromlist)
)
)
)
return self.__original_import(name, globals_, locals_, fromlist, level)
def __enter__(self):
self.patch_import_function()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.restore_import_funtion()
class MockWraps(object):
'''
Helper class to be used with the mock library.
To be used in the ``wraps`` keyword of ``Mock`` or ``MagicMock`` where you
want to trigger a side effect for X times, and afterwards, call the
original and un-mocked method.
As an example:
>>> def original():
... print 'original'
...
>>> def side_effect():
... print 'side effect'
...
>>> mw = MockWraps(original, 2, side_effect)
>>> mw()
side effect
>>> mw()
side effect
>>> mw()
original
>>>
'''
def __init__(self, original, expected_failures, side_effect):
self.__original = original
self.__expected_failures = expected_failures
self.__side_effect = side_effect
self.__call_counter = 0
def __call__(self, *args, **kwargs):
try:
if self.__call_counter < self.__expected_failures:
if isinstance(self.__side_effect, types.FunctionType):
return self.__side_effect()
raise self.__side_effect
return self.__original(*args, **kwargs)
finally:
self.__call_counter += 1
def requires_network(only_local_network=False):
'''
Simple decorator which is supposed to skip a test case in case there's no
network connection to the internet.
'''
def decorator(func):
@functools.wraps(func)
def wrapper(cls):
has_local_network = False
# First lets try if we have a local network. Inspired in
# verify_socket
try:
pubsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
retsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
pubsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
pubsock.bind(('', 18000))
pubsock.close()
retsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
retsock.bind(('', 18001))
retsock.close()
has_local_network = True
except socket.error:
# I wonder if we just have IPV6 support?
try:
pubsock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
retsock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
pubsock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
pubsock.bind(('', 18000))
pubsock.close()
retsock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
retsock.bind(('', 18001))
retsock.close()
has_local_network = True
except socket.error:
# Let's continue
pass
if only_local_network is True:
if has_local_network is False:
# Since we're only supposed to check local network, and no
# local network was detected, skip the test
cls.skipTest('No local network was detected')
return func(cls)
# We are using the google.com DNS records as numerical IPs to avoid
# DNS lookups which could greatly slow down this check
for addr in ('173.194.41.198', '173.194.41.199', '173.194.41.200',
'173.194.41.201', '173.194.41.206', '173.194.41.192',
'173.194.41.193', '173.194.41.194', '173.194.41.195',
'173.194.41.196', '173.194.41.197'):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.25)
sock.connect((addr, 80))
sock.close()
# We connected? Stop the loop
break
except socket.error:
# Let's check the next IP
continue
else:
cls.skipTest('No internet network connection was detected')
return func(cls)
return wrapper
return decorator
def with_system_user(username, on_existing='delete', delete=True):
'''
Create and optionally destroy a system user to be used within a test
case. The system user is crated using the ``user`` salt module.
The decorated testcase function must accept 'username' as an argument.
:param username: The desired username for the system user.
:param on_existing: What to do when the desired username is taken. The
available options are:
* nothing: Do nothing, act as if the user was created.
* delete: delete and re-create the existing user
* skip: skip the test case
'''
if on_existing not in ('nothing', 'delete', 'skip'):
raise RuntimeError(
'The value of \'on_existing\' can only be one of, '
'\'nothing\', \'delete\' and \'skip\''
)
if not isinstance(delete, bool):
raise RuntimeError(
'The value of \'delete\' can only be \'True\' or \'False\''
)
def decorator(func):
@functools.wraps(func)
def wrap(cls):
# Let's add the user to the system.
log.debug('Creating system user {0!r}'.format(username))
create_user = cls.run_function('user.add', [username])
if not create_user:
log.debug('Failed to create system user')
# The user was not created
if on_existing == 'skip':
cls.skipTest(
'Failed to create system user {0!r}'.format(
username
)
)
if on_existing == 'delete':
log.debug(
'Deleting the system user {0!r}'.format(
username
)
)
delete_user = cls.run_function(
'user.delete', [username, True, True]
)
if not delete_user:
cls.skipTest(
'A user named {0!r} already existed on the '
'system and re-creating it was not possible'
.format(username)
)
log.debug(
'Second time creating system user {0!r}'.format(
username
)
)
create_user = cls.run_function('user.add', [username])
if not create_user:
cls.skipTest(
'A user named {0!r} already existed, was deleted '
'as requested, but re-creating it was not possible'
.format(username)
)
failure = None
try:
try:
return func(cls, username)
except Exception as exc: # pylint: disable=W0703
log.error(
'Running {0!r} raised an exception: {1}'.format(
func, exc
),
exc_info=True
)
# Store the original exception details which will be raised
# a little further down the code
failure = sys.exc_info()
finally:
if delete:
delete_user = cls.run_function(
'user.delete', [username, True, True]
)
if not delete_user:
if failure is None:
log.warning(
'Although the actual test-case did not fail, '
'deleting the created system user {0!r} '
'afterwards did.'.format(username)
)
else:
log.warning(
'The test-case failed and also did the removal'
' of the system user {0!r}'.format(username)
)
if failure is not None:
# If an exception was thrown, raise it
six.reraise(failure[0], failure[1], failure[2])
return wrap
return decorator
def with_system_group(group, on_existing='delete', delete=True):
'''
Create and optionally destroy a system group to be used within a test
case. The system user is crated using the ``group`` salt module.
The decorated testcase function must accept 'group' as an argument.
:param group: The desired group name for the system user.
:param on_existing: What to do when the desired username is taken. The
available options are:
* nothing: Do nothing, act as if the group was created
* delete: delete and re-create the existing user
* skip: skip the test case
'''
if on_existing not in ('nothing', 'delete', 'skip'):
raise RuntimeError(
'The value of \'on_existing\' can only be one of, '
'\'nothing\', \'delete\' and \'skip\''
)
if not isinstance(delete, bool):
raise RuntimeError(
'The value of \'delete\' can only be \'True\' or \'False\''
)
def decorator(func):
@functools.wraps(func)
def wrap(cls):
# Let's add the user to the system.
log.debug('Creating system group {0!r}'.format(group))
create_group = cls.run_function('group.add', [group])
if not create_group:
log.debug('Failed to create system group')
# The group was not created
if on_existing == 'skip':
cls.skipTest(
'Failed to create system group {0!r}'.format(group)
)
if on_existing == 'delete':
log.debug(
'Deleting the system group {0!r}'.format(group)
)
delete_group = cls.run_function('group.delete', [group])
if not delete_group:
cls.skipTest(
'A group named {0!r} already existed on the '
'system and re-creating it was not possible'
.format(group)
)
log.debug(
'Second time creating system group {0!r}'.format(
group
)
)
create_group = cls.run_function('group.add', [group])
if not create_group:
cls.skipTest(
'A group named {0!r} already existed, was deleted '
'as requested, but re-creating it was not possible'
.format(group)
)
failure = None
try:
try:
return func(cls, group)
except Exception as exc: # pylint: disable=W0703
log.error(
'Running {0!r} raised an exception: {1}'.format(
func, exc
),
exc_info=True
)
# Store the original exception details which will be raised
# a little further down the code
failure = sys.exc_info()
finally:
if delete:
delete_group = cls.run_function('group.delete', [group])
if not delete_group:
if failure is None:
log.warning(
'Although the actual test-case did not fail, '
'deleting the created system group {0!r} '
'afterwards did.'.format(group)
)
else:
log.warning(
'The test-case failed and also did the removal'
' of the system group {0!r}'.format(group)
)
if failure is not None:
# If an exception was thrown, raise it
six.reraise(failure[0], failure[1], failure[2])
return wrap
return decorator
def with_system_user_and_group(username, group,
on_existing='delete', delete=True):
'''
Create and optionally destroy a system user and group to be used within a
test case. The system user is crated using the ``user`` salt module, and
the system group is created with the ``group`` salt module.
The decorated testcase function must accept both the 'username' and 'group'
arguments.
:param username: The desired username for the system user.
:param group: The desired name for the system group.
:param on_existing: What to do when the desired username is taken. The
available options are:
* nothing: Do nothing, act as if the user was created.
* delete: delete and re-create the existing user
* skip: skip the test case
'''
if on_existing not in ('nothing', 'delete', 'skip'):
raise RuntimeError(
'The value of \'on_existing\' can only be one of, '
'\'nothing\', \'delete\' and \'skip\''
)
if not isinstance(delete, bool):
raise RuntimeError(
'The value of \'delete\' can only be \'True\' or \'False\''
)
def decorator(func):
@functools.wraps(func)
def wrap(cls):
# Let's add the user to the system.
log.debug('Creating system user {0!r}'.format(username))
create_user = cls.run_function('user.add', [username])
log.debug('Creating system group {0!r}'.format(group))
create_group = cls.run_function('group.add', [group])
if not create_user:
log.debug('Failed to create system user')
# The user was not created
if on_existing == 'skip':
cls.skipTest(
'Failed to create system user {0!r}'.format(
username
)
)
if on_existing == 'delete':
log.debug(
'Deleting the system user {0!r}'.format(
username
)
)
delete_user = cls.run_function(
'user.delete', [username, True, True]
)
if not delete_user:
cls.skipTest(
'A user named {0!r} already existed on the '
'system and re-creating it was not possible'
.format(username)
)
log.debug(
'Second time creating system user {0!r}'.format(
username
)
)
create_user = cls.run_function('user.add', [username])
if not create_user:
cls.skipTest(
'A user named {0!r} already existed, was deleted '
'as requested, but re-creating it was not possible'
.format(username)
)
if not create_group:
log.debug('Failed to create system group')
# The group was not created
if on_existing == 'skip':
cls.skipTest(
'Failed to create system group {0!r}'.format(group)
)
if on_existing == 'delete':
log.debug(
'Deleting the system group {0!r}'.format(group)
)
delete_group = cls.run_function('group.delete', [group])
if not delete_group:
cls.skipTest(
'A group named {0!r} already existed on the '
'system and re-creating it was not possible'
.format(group)
)
log.debug(
'Second time creating system group {0!r}'.format(
group
)
)
create_group = cls.run_function('group.add', [group])
if not create_group:
cls.skipTest(
'A group named {0!r} already existed, was deleted '
'as requested, but re-creating it was not possible'
.format(group)
)
failure = None
try:
try:
return func(cls, username, group)
except Exception as exc: # pylint: disable=W0703
log.error(
'Running {0!r} raised an exception: {1}'.format(
func, exc
),
exc_info=True
)
# Store the original exception details which will be raised
# a little further down the code
failure = sys.exc_info()
finally:
if delete:
delete_user = cls.run_function(
'user.delete', [username, True, True]
)
delete_group = cls.run_function('group.delete', [group])
if not delete_user:
if failure is None:
log.warning(
'Although the actual test-case did not fail, '
'deleting the created system user {0!r} '
'afterwards did.'.format(username)
)
else:
log.warning(
'The test-case failed and also did the removal'
' of the system user {0!r}'.format(username)
)
if not delete_group:
if failure is None:
log.warning(
'Although the actual test-case did not fail, '
'deleting the created system group {0!r} '
'afterwards did.'.format(group)
)
else:
log.warning(
'The test-case failed and also did the removal'
' of the system group {0!r}'.format(group)
)
if failure is not None:
# If an exception was thrown, raise it
six.reraise(failure[0], failure[1], failure[2])
return wrap
return decorator
def requires_system_grains(func):
'''
Function decorator which loads and passes the system's grains to the test
case.
'''
@functools.wraps(func)
def decorator(cls):
if not hasattr(cls, 'run_function'):
raise RuntimeError(
'{0} does not have the \'run_function\' method which is '
'necessary to collect the system grains'.format(
cls.__class__.__name__
)
)
return func(cls, grains=cls.run_function('grains.items'))
return decorator
def requires_salt_modules(*names):
'''
Makes sure the passed salt module is available. Skips the test if not
.. versionadded:: 0.5.2
'''
def decorator(caller):
if inspect.isclass(caller):
# We're decorating a class
old_setup = getattr(caller, 'setUp', None)
def setUp(self, *args, **kwargs):
if old_setup is not None:
old_setup(self, *args, **kwargs)
if not hasattr(self, 'run_function'):
raise RuntimeError(
'{0} does not have the \'run_function\' method which '
'is necessary to collect the loaded modules'.format(
self.__class__.__name__
)
)
for name in names:
if not hasattr(self, '__salt_sys_docs__'):
# cache salts documentation
self.__salt_sys_docs__ = self.run_function('sys.doc')
if name not in self.__salt_sys_docs__:
self.skipTest('Salt module {0!r} is not available'.format(name))
caller.setUp = setUp
2017-03-06 16:45:59 +00:00
old_teardown = getattr(caller, 'tearDown', None)
def teardown(self, *args, **kwargs):
if hasattr(self, '__salt_sys_docs__'):
del self.__salt_sys_docs__
if old_teardown is not None:
old_teardown(self, *args, **kwargs)
caller.tearDown = teardown
return caller
# We're simply decorating functions
@functools.wraps(caller)
def wrapper(cls):
if not hasattr(cls, 'run_function'):
raise RuntimeError(
'{0} does not have the \'run_function\' method which is '
'necessary to collect the loaded modules'.format(
cls.__class__.__name__
)
)
for name in names:
if name not in cls.run_function('sys.doc'):
cls.skipTest(
'Salt module {0!r} is not available'.format(name)
)
break
return caller(cls)
return wrapper
return decorator
def skip_if_binaries_missing(*binaries, **kwargs):
import salt.utils
if len(binaries) == 1:
if isinstance(binaries[0], (list, tuple, set, frozenset)):
binaries = binaries[0]
check_all = kwargs.pop('check_all', False)
message = kwargs.pop('message', None)
if kwargs:
raise RuntimeError(
'The only supported keyword argument is \'check_all\' and '
'\'message\'. Invalid keyword arguments: {0}'.format(
', '.join(kwargs.keys())
)
)
if check_all:
for binary in binaries:
if salt.utils.which(binary) is None:
return skip(
'{0}The {1!r} binary was not found'.format(
message and '{0}. '.format(message) or '',
binary
)
)
elif salt.utils.which_bin(binaries) is None:
return skip(
'{0}None of the following binaries was found: {1}'.format(
message and '{0}. '.format(message) or '',
', '.join(binaries)
)
)
return _id
def skip_if_not_root(func):
if not sys.platform.startswith('win'):
if os.getuid() != 0:
func.__unittest_skip__ = True
func.__unittest_skip_why__ = 'You must be logged in as root to run this test'
else:
import salt.utils.win_functions
current_user = salt.utils.win_functions.get_current_user()
if current_user != 'SYSTEM':
if not salt.utils.win_functions.is_admin(current_user):
func.__unittest_skip__ = True
func.__unittest_skip_why__ = 'You must be logged in as an Administrator to run this test'
return func
if sys.platform.startswith('win'):
SIGTERM = signal.CTRL_BREAK_EVENT # pylint: disable=no-member
else:
SIGTERM = signal.SIGTERM
def collect_child_processes(pid):
'''
Try to collect any started child processes of the provided pid
'''
# Let's get the child processes of the started subprocess
try:
parent = psutil.Process(pid)
if hasattr(parent, 'children'):
children = parent.children(recursive=True)
else:
children = []
except psutil.NoSuchProcess:
children = []
return children[::-1] # return a reversed list of the children
def _terminate_process_list(process_list, kill=False, slow_stop=False):
for process in process_list[:][::-1]: # Iterate over a reversed copy of the list
if not psutil.pid_exists(process.pid):
process_list.remove(process)
continue
try:
if not kill and process.status() == psutil.STATUS_ZOMBIE:
# Zombie processes will exit once child processes also exit
continue
try:
cmdline = process.cmdline()
except psutil.AccessDenied:
# OSX is more restrictive about the above information
cmdline = None
if not cmdline:
try:
cmdline = process.as_dict()
except Exception:
cmdline = 'UNKNOWN PROCESS'
if kill:
log.info('Killing process(%s): %s', process.pid, cmdline)
process.kill()
else:
log.info('Terminating process(%s): %s', process.pid, cmdline)
try:
if slow_stop:
# Allow coverage data to be written down to disk
process.send_signal(SIGTERM)
try:
process.wait(2)
except psutil.TimeoutExpired:
if psutil.pid_exists(process.pid):
continue
else:
process.terminate()
except OSError as exc:
if exc.errno not in (errno.ESRCH, errno.EACCES):
raise
if not psutil.pid_exists(process.pid):
process_list.remove(process)
except psutil.NoSuchProcess:
process_list.remove(process)
def terminate_process_list(process_list, kill=False, slow_stop=False):
def on_process_terminated(proc):
log.info('Process %s terminated with exit code: %s', getattr(proc, '_cmdline', proc), proc.returncode)
# Try to terminate processes with the provided kill and slow_stop parameters
log.info('Terminating process list. 1st step. kill: %s, slow stop: %s', kill, slow_stop)
# Cache the cmdline since that will be inaccessible once the process is terminated
for proc in process_list:
try:
cmdline = proc.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
# OSX is more restrictive about the above information
cmdline = None
if not cmdline:
try:
cmdline = proc
except (psutil.NoSuchProcess, psutil.AccessDenied):
cmdline = '<could not be retrived; dead process: {0}>'.format(proc)
proc._cmdline = cmdline
_terminate_process_list(process_list, kill=kill, slow_stop=slow_stop)
psutil.wait_procs(process_list, timeout=15, callback=on_process_terminated)
if process_list:
# If there's still processes to be terminated, retry and kill them if slow_stop is False
log.info('Terminating process list. 2nd step. kill: %s, slow stop: %s', slow_stop is False, slow_stop)
_terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop)
psutil.wait_procs(process_list, timeout=10, callback=on_process_terminated)
if process_list:
# If there's still processes to be terminated, just kill them, no slow stopping now
log.info('Terminating process list. 3rd step. kill: True, slow stop: False')
_terminate_process_list(process_list, kill=True, slow_stop=False)
psutil.wait_procs(process_list, timeout=5, callback=on_process_terminated)
if process_list:
# In there's still processes to be terminated, log a warning about it
log.warning('Some processes failed to properly terminate: %s', process_list)
def terminate_process(pid=None, process=None, children=None, kill_children=False, slow_stop=False):
'''
Try to terminate/kill the started processe
'''
children = children or []
process_list = []
def on_process_terminated(proc):
2017-02-27 19:42:43 +00:00
if proc.returncode:
log.info('Process %s terminated with exit code: %s', getattr(proc, '_cmdline', proc), proc.returncode)
else:
log.info('Process %s terminated', getattr(proc, '_cmdline', proc))
if pid and not process:
try:
process = psutil.Process(pid)
process_list.append(process)
except psutil.NoSuchProcess:
# Process is already gone
process = None
if kill_children:
if process:
if not children:
children = collect_child_processes(process.pid)
else:
# Let's collect children again since there might be new ones
children.extend(collect_child_processes(pid))
if children:
process_list.extend(children)
if process_list:
if process:
log.info('Stopping process %s and respective children: %s', process, children)
else:
log.info('Terminating process list: %s', process_list)
terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop)
if process and psutil.pid_exists(process.pid):
log.warning('Process left behind which we were unable to kill: %s', process)
def terminate_process_pid(pid, only_children=False):
children = []
process = None
# Let's begin the shutdown routines
try:
process = psutil.Process(pid)
children = collect_child_processes(pid)
except psutil.NoSuchProcess:
log.info('No process with the PID %s was found running', pid)
if only_children:
return terminate_process(children=children, kill_children=True, slow_stop=True)
return terminate_process(pid=pid, process=process, children=children, kill_children=True, slow_stop=True)
def repeat(caller=None, condition=True, times=5):
'''
Repeat a test X amount of times until the first failure.
.. code-block:: python
class MyTestCase(TestCase):
@repeat
def test_sometimes_works(self):
pass
'''
if caller is None:
return functools.partial(repeat, condition=condition, times=times)
if isinstance(condition, bool) and condition is False:
# Don't even decorate
return caller
elif callable(condition):
if condition() is False:
# Don't even decorate
return caller
if inspect.isclass(caller):
attrs = [n for n in dir(caller) if n.startswith('test_')]
for attrname in attrs:
try:
function = getattr(caller, attrname)
if not inspect.isfunction(function) and not inspect.ismethod(function):
continue
setattr(caller, attrname, repeat(caller=function, condition=condition, times=times))
except Exception as exc:
log.exception(exc)
continue
return caller
@functools.wraps(caller)
def wrap(cls):
result = None
for attempt in range(1, times+1):
log.info('%s test run %d of %s times', cls, attempt, times)
caller(cls)
return cls
return wrap