mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 08:58:59 +00:00
2 MockFH enhancements:
1. Raise appropriate TypeErrors when wrong type is written 2. Don't raise exception about read_data unless we actually try to read from the filehandle. This makes it a bit more permissive when testing code where the file is only being written to.
This commit is contained in:
parent
ab2ba942ad
commit
0e8c83bac6
@ -99,17 +99,25 @@ if NO_MOCK is False:
|
||||
class MockFH(object):
|
||||
def __init__(self, filename, read_data, *args, **kwargs):
|
||||
self.filename = filename
|
||||
self.read_data = read_data
|
||||
try:
|
||||
self.mode = args[0]
|
||||
except IndexError:
|
||||
self.mode = kwargs.get('mode', 'r')
|
||||
self.binary_mode = 'b' in self.mode
|
||||
self.write_mode = any(x in self.mode for x in ('w', 'a', '+'))
|
||||
self.empty_string = b'' if self.binary_mode else ''
|
||||
self.call = MockCall(filename, *args, **kwargs)
|
||||
self.empty_string = b'' if isinstance(read_data, six.binary_type) else ''
|
||||
self.read_data = self._iterate_read_data(read_data)
|
||||
self.read_data_iter = self._iterate_read_data(read_data)
|
||||
self.read = Mock(side_effect=self._read)
|
||||
self.readlines = Mock(side_effect=self._readlines)
|
||||
self.readline = Mock(side_effect=self._readline)
|
||||
self.write = Mock(side_effect=self._write)
|
||||
self.writelines = Mock(side_effect=self._writelines)
|
||||
self.close = Mock()
|
||||
self.write = Mock()
|
||||
self.writelines = Mock()
|
||||
self.seek = Mock()
|
||||
self._loc = 0
|
||||
self.__loc = 0
|
||||
self.__read_data_ok = False
|
||||
|
||||
def _iterate_read_data(self, read_data):
|
||||
'''
|
||||
@ -151,50 +159,106 @@ class MockFH(object):
|
||||
return [x[1][0] for x in self.writelines.mock_calls]
|
||||
|
||||
def tell(self):
|
||||
return self._loc
|
||||
return self.__loc
|
||||
|
||||
def __check_read_data(self):
|
||||
if not self.__read_data_ok:
|
||||
if self.binary_mode:
|
||||
if not isinstance(self.read_data, six.binary_type):
|
||||
raise TypeError(
|
||||
'{0} opened in binary mode, expected read_data to be '
|
||||
'bytes, not {1}'.format(
|
||||
self.filename,
|
||||
type(self.read_data).__name__
|
||||
)
|
||||
)
|
||||
else:
|
||||
if not isinstance(self.read_data, str):
|
||||
raise TypeError(
|
||||
'{0} opened in non-binary mode, expected read_data to '
|
||||
'be str, not {1}'.format(
|
||||
self.filename,
|
||||
type(self.read_data).__name__
|
||||
)
|
||||
)
|
||||
# No need to repeat this the next time we check
|
||||
self.__read_data_ok = True
|
||||
|
||||
def _read(self, size=0):
|
||||
self.__check_read_data()
|
||||
if not isinstance(size, six.integer_types) or size < 0:
|
||||
raise TypeError('a positive integer is required')
|
||||
|
||||
joined = self.empty_string.join(self.read_data)
|
||||
joined = self.empty_string.join(self.read_data_iter)
|
||||
if not size:
|
||||
# read() called with no args, return everything
|
||||
self._loc += len(joined)
|
||||
self.__loc += len(joined)
|
||||
return joined
|
||||
else:
|
||||
# read() called with an explicit size. Return a slice matching the
|
||||
# requested size, but before doing so, reset read_data to reflect
|
||||
# what we read.
|
||||
self.read_data = self._iterate_read_data(joined[size:])
|
||||
self.read_data_iter = self._iterate_read_data(joined[size:])
|
||||
ret = joined[:size]
|
||||
self._loc += len(ret)
|
||||
self.__loc += len(ret)
|
||||
return ret
|
||||
|
||||
def _readlines(self, size=None): # pylint: disable=unused-argument
|
||||
# TODO: Implement "size" argument
|
||||
ret = list(self.read_data)
|
||||
self._loc += sum(len(x) for x in ret)
|
||||
self.__check_read_data()
|
||||
ret = list(self.read_data_iter)
|
||||
self.__loc += sum(len(x) for x in ret)
|
||||
return ret
|
||||
|
||||
def _readline(self, size=None): # pylint: disable=unused-argument
|
||||
# TODO: Implement "size" argument
|
||||
self.__check_read_data()
|
||||
try:
|
||||
ret = next(self.read_data)
|
||||
self._loc += len(ret)
|
||||
ret = next(self.read_data_iter)
|
||||
self.__loc += len(ret)
|
||||
return ret
|
||||
except StopIteration:
|
||||
return self.empty_string
|
||||
|
||||
def __iter__(self):
|
||||
self.__check_read_data()
|
||||
while True:
|
||||
try:
|
||||
ret = next(self.read_data)
|
||||
self._loc += len(ret)
|
||||
ret = next(self.read_data_iter)
|
||||
self.__loc += len(ret)
|
||||
yield ret
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
def _write(self, content):
|
||||
if not self.write_mode:
|
||||
raise IOError('File not open for writing')
|
||||
if six.PY2:
|
||||
if isinstance(content, six.text_type):
|
||||
# encoding intentionally not specified to force a
|
||||
# UnicodeEncodeError when non-ascii unicode type is passed
|
||||
content.encode()
|
||||
else:
|
||||
content_type = type(content)
|
||||
if self.binary_mode and content_type is not bytes:
|
||||
raise TypeError(
|
||||
'a bytes-like object is required, not \'{0}\''.format(
|
||||
content_type.__name__
|
||||
)
|
||||
)
|
||||
elif not self.binary_mode and content_type is not str:
|
||||
raise TypeError(
|
||||
'write() argument must be str, not {0}'.format(
|
||||
content_type.__name__
|
||||
)
|
||||
)
|
||||
|
||||
def _writelines(self, lines):
|
||||
if not self.write_mode:
|
||||
raise IOError('File not open for writing')
|
||||
for line in lines:
|
||||
self._write(line)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user