DNS2: bugfixes, cleanup, drill tests

This commit is contained in:
Ronald van Zantvoort 2017-04-06 10:09:14 +02:00
parent e3239d90cf
commit 9dff676f8f
2 changed files with 112 additions and 45 deletions

View File

@ -285,7 +285,6 @@ def _lookup_drill(name, rdtype, timeout=None, servers=None, secure=None):
cmd, timeout=timeout,
python_shell=False, output_loglevel='quiet')
# In this case, 0 is not the same as False
if cmd['retcode'] != 0:
log.warning('drill returned ({0}): {1}'.format(
cmd['retcode'], cmd['stderr']
@ -293,6 +292,7 @@ def _lookup_drill(name, rdtype, timeout=None, servers=None, secure=None):
return False
lookup_res = iter(cmd['stdout'].splitlines())
validated = False
res = []
try:
line = ''
@ -363,7 +363,6 @@ def _lookup_host(name, rdtype, timeout=None, server=None):
cmd = __salt__['cmd.run_all'](cmd + name, python_shell=False, output_loglevel='quiet')
# In this case, 0 is not the same as False
if cmd['retcode'] != 0:
log.warning('host returned ({0}): {1}'.format(
cmd['retcode'], cmd['stdout']
@ -432,7 +431,6 @@ def _lookup_nslookup(name, rdtype, timeout=None, server=None):
cmd += ' {0}'.format(server)
cmd = __salt__['cmd.run_all'](cmd, python_shell=False, output_loglevel='quiet')
# In this case, 0 is not the same as False
if cmd['retcode'] != 0:
log.warning('nslookup returned ({0}): {1}'.format(
cmd['retcode'], cmd['stdout'].splitlines()[-1]
@ -440,7 +438,6 @@ def _lookup_nslookup(name, rdtype, timeout=None, server=None):
return False
lookup_res = iter(cmd['stdout'].splitlines())
res = []
try:
line = ''
@ -494,7 +491,7 @@ def lookup(
Lookup DNS record data
:param name: name to lookup
:param rdtype: DNS record type
:param method: gai (getaddrinfo()), pydns, dig, drill, host, nslookup or auto (default)
:param method: gai (getaddrinfo()), dnspython, dig, drill, host, nslookup or auto (default)
:param servers: (list of) server(s) to try in-order
:param timeout: query timeout or a valiant approximation of that
:param walk: Find records in parents if they don't exist
@ -543,7 +540,7 @@ def lookup(
if servers:
if not isinstance(servers, (list, tuple)):
servers = [servers]
if method in ('pydns', 'dig', 'drill'):
if method in ('dnspython', 'dig', 'drill'):
res_kwargs['servers'] = servers
else:
if timeout:
@ -601,7 +598,7 @@ def query(
:param timeout: query timeout or a valiant approximation of that
:param secure: return only DNSSEC secured response
:param walk: Find records in parents if they don't exist
:param walk_tld: Include the final domain in the walk
:param walk_tld: Include the top-level domain in the walk
:return: [] of records
'''
rdtype = rdtype.upper()
@ -777,10 +774,10 @@ def spf_rec(rdata):
# TODO: Should be in something intelligent like an SPF_get
# if mod == 'exp':
# res[mod] = query(val, 'TXT', **qargs)
# res[mod] = lookup(val, 'TXT', **qargs)
# continue
# elif mod == 'redirect':
# return records(val, 'SPF', **qargs)
# return query(val, 'SPF', **qargs)
mech = {}
if mech_spec[0] in ('+', '-', '~', '?'):

View File

@ -9,7 +9,8 @@ import socket
from salt._compat import ipaddress
from salt.utils.odict import OrderedDict
import salt.utils.dns
from salt.utils.dns import _to_port, _tree, _weighted_order, _data2rec, _data2rec_group, _lookup_gai, _lookup_dig
from salt.utils.dns import _to_port, _tree, _weighted_order, _data2rec, _data2rec_group
from salt.utils.dns import _lookup_gai, _lookup_dig, _lookup_drill
# Testing
from tests.support.unit import skipIf, TestCase
@ -148,10 +149,10 @@ class DNShelpersCase(TestCase):
@skipIf(NO_MOCK, NO_MOCK_REASON)
class DNSlookupsCase(TestCase):
'''
Test the lookup functions
Test the lookup result parsers
Note that by far and large the lookup functions actually
completely ignore the input name
Note that by far and large the parsers actually
completely ignore the input name or output content
a lookup function
- returns False upon error
@ -208,20 +209,54 @@ class DNSlookupsCase(TestCase):
return patch.dict(salt.utils.dns.__salt__, {'cmd.run_all': cmd_mock}, clear=True)
def test_dig(self):
def _test_cmd_lookup(self, lookup_cb, wrongs, rights, secure=False):
# wrong
with self._mock_cmd_ret({
'retcode': 9,
'stderr': ';; connection timed out; no servers could be reached',
}):
self.assertEqual(_lookup_dig('mockq', 'A'), False)
for wrong in wrongs:
with self._mock_cmd_ret(wrong):
self.assertEqual(lookup_cb('mockq', 'A'), False)
# empty response
with self._mock_cmd_ret({}):
self.assertEqual(_lookup_dig('mockq', 'AAAA'), [])
self.assertEqual(lookup_cb('mockq', 'AAAA'), [])
# Regular outputs
for rec_t, tests in rights.items():
with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
lookup_cb('mocksrvr.example.com', rec_t), test_res,
msg='Error parsing {0} returns'.format(rec_t)
)
# TODO
if secure or not secure:
return
# # Regular outputs are insecure outputs (e.g. False)
# for rec_t, tests in rights.items():
# with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
# for _ in self.RESULTS[rec_t]:
# self.assertEqual(
# lookup_cb('mocksrvr.example.com', rec_t, secure=True), False,
# msg='Insecure {0} returns should not be returned'.format(rec_t)
# )
#
# # dig won't include RRSIG's if they're not validated, which makes for easy mocking
# for rec_t, tests in rights.items():
# with self._mock_cmd_ret([dict([('stdout', dres + '\nIGNORED\tRRSIG\tIGNORED\n')]) for dres in tests]):
# for test_res in self.RESULTS[rec_t]:
# self.assertEqual(
# lookup_cb('mocksrvr.example.com', rec_t, secure=True), test_res,
# msg='Error parsing DNSSEC\'d {0} returns'.format(rec_t)
# )
def test_dig(self):
wrongs = [
{'retcode': 9, 'stderr': ';; connection timed out; no servers could be reached'}
]
# example returns for dig
right = {
rights = {
'A': [
'mocksrvr.example.com.\tA\t10.1.1.1',
'web.example.com.\t\tA\t10.1.1.1\n'
@ -248,33 +283,68 @@ class DNSlookupsCase(TestCase):
]
}
# Regular outputs
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t), test_res,
msg='Error parsing {0} returns'.format(rec_t)
)
self._test_cmd_lookup(_lookup_dig, wrongs, rights, secure=True)
# Regular outputs are insecure outputs (e.g. False)
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres)]) for dres in tests]):
for _ in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t, secure=True), False,
msg='Insecure {0} returns should not be returned'.format(rec_t)
)
def test_drill(self):
wrongs = [
{'retcode': 1, 'stderr': 'Error: error sending query: No (valid) nameservers defined in the resolver'}
]
# dig won't include RRSIG's if they're not validated, which makes for easy mocking
for rec_t, tests in right.items():
with self._mock_cmd_ret([dict([('stdout', dres + '\nIGNORED\tRRSIG\tIGNORED\n')]) for dres in tests]):
for test_res in self.RESULTS[rec_t]:
self.assertEqual(
_lookup_dig('mocksrvr.example.com', rec_t, secure=True), test_res,
msg='Error parsing DNSSEC\'d {0} returns'.format(rec_t)
)
# # example returns for dig
RES_TMPL = ''';; ->>HEADER<<- opcode: QUERY, rcode: NOERROR, id: 58233
;; flags: qr rd ra ; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0
;; QUESTION SECTION:
;; mocksrvr.example.com. IN A
;; ANSWER SECTION:
{}
;; AUTHORITY SECTION:
;; ADDITIONAL SECTION:
;; Query time: 37 msec
;; SERVER: 10.100.150.129
;; WHEN: Tue Apr 4 19:03:51 2017
;; MSG SIZE rcvd: 50
'''
rights = {
'A': [
'mocksrvr.example.com.\t4404\tIN\tA\t10.1.1.1\n',
'web.example.com.\t4404\tIN\tA\t10.1.1.1\n'
'web.example.com.\t4404\tIN\tA\t10.2.2.2\n'
'web.example.com.\t4404\tIN\tA\t10.3.3.3\n',
],
'AAAA': [
'mocksrvr.example.com.\t4404\tIN\tA\t2a00:a00:b01:c02:d03:e04:f05:111',
'mocksrvr.example.com.\t4404\tIN\tCNAME\tweb.example.com.\n'
'web.example.com.\t4404\tIN\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:111\n'
'web.example.com.\t4404\tIN\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:222\n'
'web.example.com.\t4404\tIN\tAAAA\t2a00:a00:b01:c02:d03:e04:f05:333'
],
'CNAME': [
'mocksrvr.example.com.\t4404\tIN\tCNAME\tweb.example.com.'
],
'MX': [
'example.com.\t4404\tIN\tMX\t10 mx1.example.com.',
'example.com.\t4404\tIN\tMX\t10 mx1.example.com.\n'
'example.com.\t4404\tIN\tMX\t20 mx2.example.eu.\n'
'example.com.\t4404\tIN\tMX\t30 mx3.example.nl.'
],
'SPF': [
'example.com.\t4404\tIN\tTXT\t"v=spf1 a include:_spf4.example.com include:mail.example.eu ip4:10.0.0.0/8 ip6:2a00:a00:b01::/48 ~all"'
]
}
for rec_t, tests in rights.items():
for idx, test in enumerate(tests):
rights[rec_t][idx] = RES_TMPL.format(test)
ppr(rights)
self._test_cmd_lookup(_lookup_drill, wrongs, rights, secure=True)
def test_gai(self):
# wrong