salt/tests/support/mock.py

516 lines
17 KiB
Python

# -*- coding: utf-8 -*-
'''
:codeauthor: Pedro Algarvio (pedro@algarvio.me)
tests.support.mock
~~~~~~~~~~~~~~~~~~
Helper module that wraps `mock` and provides some fake objects in order to
properly set the function/class decorators and yet skip the test case's
execution.
Note: mock >= 2.0.0 required since unittest.mock does not have
MagicMock.assert_called in Python < 3.6.
'''
# pylint: disable=unused-import,function-redefined,blacklisted-module,blacklisted-external-module
from __future__ import absolute_import
import collections
import copy
import errno
import fnmatch
import sys
# Import salt libs
from salt.ext import six
import salt.utils.stringutils
try:
from mock import (
Mock,
MagicMock,
patch,
sentinel,
DEFAULT,
# ANY and call will be imported further down
create_autospec,
FILTER_DIR,
NonCallableMock,
NonCallableMagicMock,
PropertyMock,
__version__
)
NO_MOCK = False
NO_MOCK_REASON = ''
mock_version = []
for __part in __version__.split('.'):
try:
mock_version.append(int(__part))
except ValueError:
# Non-integer value (ex. '1a')
mock_version.append(__part)
mock_version = tuple(mock_version)
except ImportError as exc:
NO_MOCK = True
NO_MOCK_REASON = 'mock python module is unavailable'
mock_version = (0, 0, 0)
# Let's not fail on imports by providing fake objects and classes
class MagicMock(object):
# __name__ can't be assigned a unicode
__name__ = str('{0}.fakemock').format(__name__) # future lint: disable=blacklisted-function
def __init__(self, *args, **kwargs):
pass
def dict(self, *args, **kwargs):
return self
def multiple(self, *args, **kwargs):
return self
def __call__(self, *args, **kwargs):
return self
Mock = MagicMock
patch = MagicMock()
sentinel = object()
DEFAULT = object()
create_autospec = MagicMock()
FILTER_DIR = True
NonCallableMock = MagicMock()
NonCallableMagicMock = MagicMock()
mock_open = object()
PropertyMock = object()
call = tuple
ANY = object()
if NO_MOCK is False:
try:
from mock import call, ANY
except ImportError:
NO_MOCK = True
NO_MOCK_REASON = 'you need to upgrade your mock version to >= 0.8.0'
class MockFH(object):
def __init__(self, filename, read_data, *args, **kwargs):
self.filename = filename
self.read_data = read_data
try:
self.mode = args[0]
except IndexError:
self.mode = kwargs.get('mode', 'r')
self.binary_mode = 'b' in self.mode
self.read_mode = any(x in self.mode for x in ('r', '+'))
self.write_mode = any(x in self.mode for x in ('w', 'a', '+'))
self.empty_string = b'' if self.binary_mode else ''
self.call = MockCall(filename, *args, **kwargs)
self.read_data_iter = self._iterate_read_data(read_data)
self.read = Mock(side_effect=self._read)
self.readlines = Mock(side_effect=self._readlines)
self.readline = Mock(side_effect=self._readline)
self.write = Mock(side_effect=self._write)
self.writelines = Mock(side_effect=self._writelines)
self.close = Mock()
self.seek = Mock()
self.__loc = 0
self.__read_data_ok = False
def _iterate_read_data(self, read_data):
'''
Helper for mock_open:
Retrieve lines from read_data via a generator so that separate calls to
readline, read, and readlines are properly interleaved
'''
# Newline will always be a bytestring on PY2 because mock_open will have
# normalized it to one.
newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
read_data = [line + newline for line in read_data.split(newline)]
if read_data[-1] == newline:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
read_data = read_data[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line, so remove the
# newline that we added in the list comprehension.
read_data[-1] = read_data[-1][:-1]
for line in read_data:
yield line
@property
def write_calls(self):
'''
Return a list of all calls to the .write() mock
'''
return [x[1][0] for x in self.write.mock_calls]
@property
def writelines_calls(self):
'''
Return a list of all calls to the .writelines() mock
'''
return [x[1][0] for x in self.writelines.mock_calls]
def tell(self):
return self.__loc
def __check_read_data(self):
if not self.__read_data_ok:
if self.binary_mode:
if not isinstance(self.read_data, six.binary_type):
raise TypeError(
'{0} opened in binary mode, expected read_data to be '
'bytes, not {1}'.format(
self.filename,
type(self.read_data).__name__
)
)
else:
if not isinstance(self.read_data, str):
raise TypeError(
'{0} opened in non-binary mode, expected read_data to '
'be str, not {1}'.format(
self.filename,
type(self.read_data).__name__
)
)
# No need to repeat this the next time we check
self.__read_data_ok = True
def _read(self, size=0):
self.__check_read_data()
if not self.read_mode:
raise IOError('File not open for reading')
if not isinstance(size, six.integer_types) or size < 0:
raise TypeError('a positive integer is required')
joined = self.empty_string.join(self.read_data_iter)
if not size:
# read() called with no args, return everything
self.__loc += len(joined)
return joined
else:
# read() called with an explicit size. Return a slice matching the
# requested size, but before doing so, reset read_data to reflect
# what we read.
self.read_data_iter = self._iterate_read_data(joined[size:])
ret = joined[:size]
self.__loc += len(ret)
return ret
def _readlines(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
self.__check_read_data()
if not self.read_mode:
raise IOError('File not open for reading')
ret = list(self.read_data_iter)
self.__loc += sum(len(x) for x in ret)
return ret
def _readline(self, size=None): # pylint: disable=unused-argument
# TODO: Implement "size" argument
self.__check_read_data()
if not self.read_mode:
raise IOError('File not open for reading')
try:
ret = next(self.read_data_iter)
self.__loc += len(ret)
return ret
except StopIteration:
return self.empty_string
def __iter__(self):
self.__check_read_data()
if not self.read_mode:
raise IOError('File not open for reading')
while True:
try:
ret = next(self.read_data_iter)
self.__loc += len(ret)
yield ret
except StopIteration:
break
def _write(self, content):
if not self.write_mode:
raise IOError('File not open for writing')
if six.PY2:
if isinstance(content, six.text_type):
# encoding intentionally not specified to force a
# UnicodeEncodeError when non-ascii unicode type is passed
content.encode()
else:
content_type = type(content)
if self.binary_mode and content_type is not bytes:
raise TypeError(
'a bytes-like object is required, not \'{0}\''.format(
content_type.__name__
)
)
elif not self.binary_mode and content_type is not str:
raise TypeError(
'write() argument must be str, not {0}'.format(
content_type.__name__
)
)
def _writelines(self, lines):
if not self.write_mode:
raise IOError('File not open for writing')
for line in lines:
self._write(line)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument
pass
class MockCall(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __repr__(self):
# future lint: disable=blacklisted-function
ret = str('MockCall(')
for arg in self.args:
ret += repr(arg) + str(', ')
if not self.kwargs:
if self.args:
# Remove trailing ', '
ret = ret[:-2]
else:
for key, val in six.iteritems(self.kwargs):
ret += str('{0}={1}').format(
salt.utils.stringutils.to_str(key),
repr(val)
)
ret += str(')')
return ret
# future lint: enable=blacklisted-function
def __str__(self):
return self.__repr__()
def __eq__(self, other):
return self.args == other.args and self.kwargs == other.kwargs
class MockOpen(object):
r'''
This class can be used to mock the use of ``open()``.
``read_data`` is a string representing the contents of the file to be read.
By default, this is an empty string.
Optionally, ``read_data`` can be a dictionary mapping ``fnmatch.fnmatch()``
patterns to strings (or optionally, exceptions). This allows the mocked
filehandle to serve content for more than one file path.
.. code-block:: python
data = {
'/etc/foo.conf': textwrap.dedent("""\
Foo
Bar
Baz
"""),
'/etc/bar.conf': textwrap.dedent("""\
A
B
C
"""),
}
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
If the file path being opened does not match any of the glob expressions,
an IOError will be raised to simulate the file not existing.
Passing ``read_data`` as a string is equivalent to passing it with a glob
expression of "*". That is to say, the below two invocations are
equivalent:
.. code-block:: python
mock_open(read_data='foo\n')
mock_open(read_data={'*': 'foo\n'})
Instead of a string representing file contents, ``read_data`` can map to an
exception, and that exception will be raised if a file matching that
pattern is opened:
.. code-block:: python
data = {
'/etc/*': IOError(errno.EACCES, 'Permission denied'),
'*': 'Hello world!\n',
}
with patch('salt.utils.files.fopen', mock_open(read_data=data)):
do stuff
The above would raise an exception if any files within /etc are opened, but
would produce a mocked filehandle if any other file is opened.
To simulate file contents changing upon subsequent opens, the file contents
can be a list of strings/exceptions. For example:
.. code-block:: python
data = {
'/etc/foo.conf': [
'before\n',
'after\n',
],
'/etc/bar.conf': [
IOError(errno.ENOENT, 'No such file or directory', '/etc/bar.conf'),
'Hey, the file exists now!',
],
}
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
The first open of ``/etc/foo.conf`` would return "before\n" when read,
while the second would return "after\n" when read. For ``/etc/bar.conf``,
the first read would raise an exception, while the second would open
successfully and read the specified string.
Expressions will be attempted in dictionary iteration order (the exception
being ``*`` which is tried last), so if a file path matches more than one
fnmatch expression then the first match "wins". If your use case calls for
overlapping expressions, then an OrderedDict can be used to ensure that the
desired matching behavior occurs:
.. code-block:: python
data = OrderedDict()
data['/etc/foo.conf'] = 'Permission granted!'
data['/etc/*'] = IOError(errno.EACCES, 'Permission denied')
data['*'] = '*': 'Hello world!\n'
with patch('salt.utils.files.fopen', mock_open(read_data=data):
do stuff
The following attributes are tracked for the life of a mock object:
* call_count - Tracks how many fopen calls were attempted
* filehandles - This is a dictionary mapping filenames to lists of MockFH
objects, representing the individual times that a given file was opened.
'''
def __init__(self, read_data=''):
# If the read_data contains lists, we will be popping it. So, don't
# modify the original value passed.
read_data = copy.copy(read_data)
# Normalize read_data, Python 2 filehandles should never produce unicode
# types on read.
if not isinstance(read_data, dict):
read_data = {'*': read_data}
if six.PY2:
# .__class__() used here to preserve the dict class in the event that
# an OrderedDict was used.
new_read_data = read_data.__class__()
for key, val in six.iteritems(read_data):
try:
val = salt.utils.data.decode(val, to_str=True)
except TypeError:
if not isinstance(val, BaseException):
raise
new_read_data[key] = val
read_data = new_read_data
del new_read_data
self.read_data = read_data
self.filehandles = {}
self.calls = []
self.call_count = 0
def __call__(self, name, *args, **kwargs):
'''
Match the file being opened to the patterns in the read_data and spawn
a mocked filehandle with the corresponding file contents.
'''
call = MockCall(name, *args, **kwargs)
self.calls.append(call)
self.call_count += 1
for pat in self.read_data:
if pat == '*':
continue
if fnmatch.fnmatch(name, pat):
matched_pattern = pat
break
else:
# No non-glob match in read_data, fall back to '*'
matched_pattern = '*'
try:
matched_contents = self.read_data[matched_pattern]
try:
# Assuming that the value for the matching expression is a
# list, pop the first element off of it.
file_contents = matched_contents.pop(0)
except AttributeError:
# The value for the matching expression is a string (or exception)
file_contents = matched_contents
except IndexError:
# We've run out of file contents, abort!
raise RuntimeError(
'File matching expression \'{0}\' opened more times than '
'expected'.format(matched_pattern)
)
try:
# Raise the exception if the matched file contents are an
# instance of an exception class.
raise file_contents
except TypeError:
# Contents were not an exception, so proceed with creating the
# mocked filehandle.
pass
ret = MockFH(name, file_contents, *args, **kwargs)
self.filehandles.setdefault(name, []).append(ret)
return ret
except KeyError:
# No matching glob in read_data, treat this as a file that does
# not exist and raise the appropriate exception.
raise IOError(errno.ENOENT, 'No such file or directory', name)
def write_calls(self, path=None):
'''
Returns the contents passed to all .write() calls. Use `path` to narrow
the results to files matching a given pattern.
'''
ret = []
for filename, handles in six.iteritems(self.filehandles):
if path is None or fnmatch.fnmatch(filename, path):
for fh_ in handles:
ret.extend(fh_.write_calls)
return ret
def writelines_calls(self, path=None):
'''
Returns the contents passed to all .writelines() calls. Use `path` to
narrow the results to files matching a given pattern.
'''
ret = []
for filename, handles in six.iteritems(self.filehandles):
if path is None or fnmatch.fnmatch(filename, path):
for fh_ in handles:
ret.extend(fh_.writelines_calls)
return ret
# reimplement mock_open to support multiple filehandles
mock_open = MockOpen