DNS2: dig lookup testing

This commit is contained in:
Ronald van Zantvoort 2017-04-04 18:16:46 +02:00
parent a30dd2ed0c
commit e3239d90cf
2 changed files with 148 additions and 43 deletions

View File

@ -218,29 +218,6 @@ def _rec2data(*rdata):
return ' '.join(rdata)
def _lookup_gai(name, rdtype, timeout=None):
'''
Use Python's socket interface to lookup addresses
:param name: Name of record to search
:param rdtype: A or AAAA
:param timeout: ignored
:return: [] of addresses or False if error
'''
sock_t = {
'A': socket.AF_INET,
'AAAA': socket.AF_INET6
}[rdtype]
if timeout:
log.warn('Ignoring timeout on gai resolver; fix resolv.conf to do that')
try:
addresses = [sock[4][0] for sock in socket.getaddrinfo(name, None, sock_t, 0, socket.SOCK_RAW)]
return addresses
except socket.gaierror:
return False
def _lookup_dig(name, rdtype, timeout=None, servers=None, secure=None):
'''
Use dig to lookup addresses
@ -264,12 +241,13 @@ def _lookup_dig(name, rdtype, timeout=None, servers=None, secure=None):
cmd = __salt__['cmd.run_all'](cmd + str(name), python_shell=False, output_loglevel='quiet')
# In this case, 0 is not the same as False
if cmd['retcode'] != 0:
log.warning('dig returned ({0}): {1}'.format(
cmd['retcode'], cmd['stderr']
cmd['retcode'], cmd['stderr'].strip(string.whitespace + ';')
))
return False
elif not cmd['stdout']:
return []
validated = False
res = []
@ -344,6 +322,29 @@ def _lookup_drill(name, rdtype, timeout=None, servers=None, secure=None):
return res
def _lookup_gai(name, rdtype, timeout=None):
'''
Use Python's socket interface to lookup addresses
:param name: Name of record to search
:param rdtype: A or AAAA
:param timeout: ignored
:return: [] of addresses or False if error
'''
sock_t = {
'A': socket.AF_INET,
'AAAA': socket.AF_INET6
}[rdtype]
if timeout:
log.warn('Ignoring timeout on gai resolver; fix resolv.conf to do that')
try:
addresses = [sock[4][0] for sock in socket.getaddrinfo(name, None, sock_t, 0, socket.SOCK_RAW)]
return addresses
except socket.gaierror:
return False
def _lookup_host(name, rdtype, timeout=None, server=None):
'''
Use host to lookup addresses

View File

@ -8,7 +8,8 @@ import socket
# Salt
from salt._compat import ipaddress
from salt.utils.odict import OrderedDict
from salt.utils.dns import _to_port, _tree, _weighted_order, _data2rec, _data2rec_group, _lookup_gai
import salt.utils.dns
from salt.utils.dns import _to_port, _tree, _weighted_order, _data2rec, _data2rec_group, _lookup_gai, _lookup_dig
# Testing
from tests.support.unit import skipIf, TestCase
@ -148,9 +149,16 @@ class DNShelpersCase(TestCase):
class DNSlookupsCase(TestCase):
'''
Test the lookup functions
'''
CMD_RETURN = {
Note that by far and large the lookup functions actually
completely ignore the input name
a lookup function
- returns False upon error
- returns [*record-data] upon succes/no records
'''
CMD_RET = {
'pid': 12345,
'retcode': 0,
'stderr': '',
@ -158,18 +166,116 @@ class DNSlookupsCase(TestCase):
}
RESULTS = {
'A': [
['10.1.1.1'], # one-match
['10.1.1.1', '10.2.2.2', '10.3.3.3'], # multi-match
],
'AAAA': [
['2a00:a00:b01:c02:d03:e04:f05:111'], # one-match
['2a00:a00:b01:c02:d03:e04:f05:111',
'2a00:a00:b01:c02:d03:e04:f05:222',
'2a00:a00:b01:c02:d03:e04:f05:333'] # multi-match
],
'CNAME': [
['web.example.com.']
],
'MX': [
['10 mx1.example.com.'],
['10 mx1.example.com.', '20 mx2.example.eu.', '30 mx3.example.nl.']
],
'SPF': [
['v=spf1 a include:_spf4.example.com include:mail.example.eu ip4:10.0.0.0/8 ip6:2a00:a00:b01::/48 ~all']
]
}
def _mock_cmd_ret(self, delta_res):
if isinstance(delta_res, (list, tuple)):
test_res = []
for dres in delta_res:
tres = self.CMD_RET.copy()
tres.update(dres)
test_res.append(tres)
cmd_mock = MagicMock(
side_effect=test_res
)
else:
test_res = self.CMD_RET.copy()
test_res.update(delta_res)
cmd_mock = MagicMock(
return_value=test_res
)
return patch.dict(salt.utils.dns.__salt__, {'cmd.run_all': cmd_mock}, clear=True)
def test_dig(self):
# wrong
with self._mock_cmd_ret({
'retcode': 9,
'stderr': ';; connection timed out; no servers could be reached',
}):
self.assertEqual(_lookup_dig('mockq', 'A'), False)
# empty response
with self._mock_cmd_ret({}):
self.assertEqual(_lookup_dig('mockq', 'AAAA'), [])
# example returns for dig
right = {
'A': [
['10.1.1.1'],
['10.1.1.1', '10.2.2.2', '10.3.3.3'],
'mocksrvr.example.com.\tA\t10.1.1.1',
'web.example.com.\t\tA\t10.1.1.1\n'
'web.example.com.\t\tA\t10.2.2.2\n'
'web.example.com.\t\tA\t10.3.3.3'
],
'AAAA': [
['2a00:a00:b01:c02:d03:e04:f05:111'],
['2a00:a00:b01:c02:d03:e04:f05:111',
'2a00:a00:b01:c02:d03:e04:f05:222',
'2a00:a00:b01:c02:d03:e04:f05:333']
'mocksrvr.example.com.\tA\t2a00:a00:b01:c02:d03:e04:f05:111',
'mocksrvr.example.com.\tCNAME\tweb.example.com.\n'
'web.example.com.\t\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:111\n'
'web.example.com.\t\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:222\n'
'web.example.com.\t\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:333'
],
'CNAME': [
'mocksrvr.example.com.\tCNAME\tweb.example.com.'
],
'MX': [
'example.com.\t\tMX\t10 mx1.example.com.',
'example.com.\t\tMX\t10 mx1.example.com.\nexample.com.\t\tMX\t20 mx2.example.eu.\nexample.com.\t\tMX\t30 mx3.example.nl.'
],
'SPF': [
'example.com.\tTXT\t"v=spf1 a include:_spf4.example.com include:mail.example.eu ip4:10.0.0.0/8 ip6:2a00:a00:b01::/48 ~all"'
]
}
# Regular outputs
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t), test_res,
msg='Error parsing {0} returns'.format(rec_t)
)
# Regular outputs are insecure outputs (e.g. False)
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
for _ in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t, secure=True), False,
msg='Insecure {0} returns should not be returned'.format(rec_t)
)
# dig won't include RRSIG's if they're not validated, which makes for easy mocking
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres + '\nIGNORED\tRRSIG\tIGNORED\n')]) for dres in tests]):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t, secure=True), test_res,
msg='Error parsing DNSSEC\'d {0} returns'.format(rec_t)
)
def test_gai(self):
# wrong
with patch.object(socket, 'getaddrinfo', MagicMock(side_effect=socket.gaierror)):
@ -191,13 +297,11 @@ class DNSlookupsCase(TestCase):
(10, 3, 3, '', ('2a00:a00:b01:c02:d03:e04:f05:333', 0, 0, 0))]
]
}
for rec_t, tests in right.items():
for mock_return, test_res in zip(tests, self.RESULTS[rec_t]):
with patch.object(socket, 'getaddrinfo', MagicMock(return_value=mock_return)):
self.assertEqual(_lookup_gai('mockq', rec_t), test_res)
def test_dig(self):
pass
with patch.object(socket, 'getaddrinfo', MagicMock(side_effect=tests)):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_gai('mockq', rec_t), test_res,
msg='Error parsing {0} returns'.format(rec_t)
)