initial work to cleanup the nftables module, state module, and the unit tests.

This commit is contained in:
Gareth J. Greenaway 2019-02-19 10:44:01 -08:00
parent a8898f6c9d
commit 3ec96e36e2
No known key found for this signature in database
GPG Key ID: 10B62F8A7CAD7A41
3 changed files with 487 additions and 213 deletions

View File

@ -117,6 +117,10 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
family=ipv6
'''
ret = {'comment': '',
'rule': '',
'result': False}
if 'target' in kwargs:
kwargs['jump'] = kwargs['target']
del kwargs['target']
@ -207,8 +211,7 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
del kwargs['to-ports']
if 'to-destination' in kwargs:
after_jump.append('--to-destination {0} '.
format(kwargs['to-destination']))
after_jump.append('--to-destination {0} '.format(kwargs['to-destination']))
del kwargs['to-destination']
if 'reject-with' in kwargs:
@ -226,30 +229,50 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
rule = rule.replace('sport', '{0} sport'.format(proto))
if full in ['True', 'true']:
if not table:
return 'Error: Table needs to be specified'
ret['comment'] = 'Error: Table needs to be specified'
return ret
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Error: Chain needs to be specified'
return ret
if not command:
return 'Error: Command needs to be specified'
ret['comment'] = 'Error: Command needs to be specified'
return ret
if command in ['Insert', 'insert', 'INSERT']:
if position:
return '{0} insert rule {1} {2} {3} position {4} {5}'.\
format(_nftables_cmd(), nft_family, table,
chain, position, rule)
ret['rule'] = '{0} insert rule {1} {2} {3} ' \
'position {4} {5}'.format(_nftables_cmd(),
nft_family,
table,
chain,
position,
rule)
else:
return '{0} insert rule {1} {2} {3} {4}'.\
format(_nftables_cmd(), nft_family, table,
chain, rule)
ret['rule'] = '{0} insert rule ' \
'{1} {2} {3} {4}'.format(_nftables_cmd(),
nft_family,
table,
chain,
rule)
else:
ret['rule'] = '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
command,
nft_family,
table,
chain,
rule)
return '{0} {1} rule {2} {3} {4} {5}'.format(_nftables_cmd(),
command, nft_family, table, chain, rule)
return rule
if ret['rule']:
ret['comment'] = 'Successfully built rule'
ret['result'] = True
return ret
def get_saved_rules(conf_file=None, family='ipv4'):
def get_saved_rules(conf_file=None):
'''
Return a data structure of the rules in the conf file
@ -291,8 +314,9 @@ def get_rules(family='ipv4'):
'''
nft_family = _NFTABLES_FAMILIES[family]
rules = []
cmd = '{0} --numeric --numeric --numeric list tables {1}'.\
format(_nftables_cmd(), nft_family)
cmd = '{0} --numeric --numeric --numeric ' \
'list tables {1}'. format(_nftables_cmd(),
nft_family)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
return rules
@ -300,8 +324,9 @@ def get_rules(family='ipv4'):
tables = re.split('\n+', out)
for table in tables:
table_name = table.split(' ')[1]
cmd = '{0} --numeric --numeric --numeric list table {1} {2}'.format(_nftables_cmd(),
nft_family, table_name)
cmd = '{0} --numeric --numeric --numeric ' \
'list table {1} {2}'.format(_nftables_cmd(),
nft_family, table_name)
out = __salt__['cmd.run'](cmd, python_shell=False)
rules.append(out)
return rules
@ -361,10 +386,16 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
rule='input tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Error: Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Error: Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
@ -413,33 +444,41 @@ def check(table='filter', chain=None, rule=None, family='ipv4'):
rule='input tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Error: Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Error: Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
ret['comment'] = 'Table {0} in family {1} does not exist'.\
format(table, family)
return ret
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
ret['comment'] = 'Chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} --handle --numeric --numeric --numeric list chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
search_rule = '{0} #'.format(rule)
out = __salt__['cmd.run'](cmd, python_shell=False).find(search_rule)
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} exists'.\
format(rule, chain, table, family)
ret['result'] = True
return ret
def check_chain(table='filter', chain=None, family='ipv4'):
@ -458,21 +497,25 @@ def check_chain(table='filter', chain=None, family='ipv4'):
salt '*' nftables.check_chain filter input family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} list table {1} {2}' . format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False).find('chain {0} {{'.format(chain))
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} exists'.\
format(chain, table, family)
ret['result'] = True
return ret
def check_table(table=None, family='ipv4'):
@ -483,21 +526,25 @@ def check_table(table=None, family='ipv4'):
salt '*' nftables.check_table nat
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} list tables {1}' . format(_nftables_cmd(), nft_family)
out = __salt__['cmd.run'](cmd, python_shell=False).find('table {0} {1}'.format(nft_family, table))
if out != -1:
out = ''
if out == -1:
ret['comment'] = 'Table {0} in family {1} does not exist'.\
format(table, family)
else:
return False
if not out:
return True
return out
ret['comment'] = 'Table {0} in family {1} exists'.\
format(table, family)
ret['result'] = True
return ret
def new_table(table, family='ipv4'):
@ -515,21 +562,29 @@ def new_table(table, family='ipv4'):
IPv6:
salt '*' nftables.new_table filter family=ipv6
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Error: Table needs to be specified'
return ret
if check_table(table, family=family):
return 'Error: table {0} in family {1} already exists'.\
format(table, family)
res = check_table(table, family=family)
if res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add table {1} {2}'.format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Table {0} in family {1} created'.\
format(table, family)
ret['result'] = True
else:
ret['comment'] = 'Table {0} in family {1} could not be created'.\
format(table, family)
return ret
def delete_table(table, family='ipv4'):
@ -547,20 +602,29 @@ def delete_table(table, family='ipv4'):
IPv6:
salt '*' nftables.delete_table filter family=ipv6
'''
ret = {'comment': '',
'result': False}
if not table:
return 'Error: table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist' . format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete table {1} {2}'.format(_nftables_cmd(), nft_family, table)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Table {0} in family {1} deleted'.\
format(table, family)
ret['result'] = True
else:
ret['comment'] = 'Table {0} in family {1} could not be deleted'.\
format(table, family)
return ret
def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=None, family='ipv4'):
@ -588,34 +652,45 @@ def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=N
salt '*' nftables.new_chain filter foo family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} already exists'.\
res = check_chain(table, chain, family=family)
if res['result']:
ret['comment'] = 'Error: chain {0} in table {1} in family {2} already exists'.\
format(chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
if table_type or hook or priority:
if table_type and hook and six.text_type(priority):
cmd = r'{0} \{{ type {1} hook {2} priority {3}\; \}}'.\
format(cmd, table_type, hook, priority)
else:
# Specify one, rqeuire all
return 'Error: table_type hook and priority required'
# Specify one, require all
ret['comment'] = 'Error: table_type, hook, and priority required.'
return ret
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} created'.\
format(chain, table, family)
ret['result'] = True
else:
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be created'.\
format(chain, table, family)
return ret
def delete_chain(table='filter', chain=None, family='ipv4'):
@ -637,26 +712,38 @@ def delete_chain(table='filter', chain=None, family='ipv4'):
salt '*' nftables.delete_chain filter foo family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
res = check_table(table, family=family)
if not res['result']:
ret['comment'] = 'Error: table {0} in family {1} does not exist'.\
format(table, family)
return ret
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
res = check_chain(table, chain, family=family)
if not res['result']:
ret['comment'] = 'Error: chain {0} in table {1} in family {2} already exists'.\
format(chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
out = __salt__['cmd.run'](cmd, python_shell=False)
if not out:
out = True
return out
ret['comment'] = 'Chain {0} in table {1} in family {2} deleted'.\
format(chain, table, family)
ret['result'] = True
else:
ret['comment'] = 'Chain {0} in table {1} in family {2} could not be deleted'.\
format(chain, table, family)
return ret
def append(table='filter', chain=None, rule=None, family='ipv4'):
@ -673,38 +760,51 @@ def append(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.append filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.append filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to append rule {0} to chain {0} in table {1}.'.format(rule, chain, table),
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Error: Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Error: Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} add rule {1} {2} {3} {4}'.\
format(_nftables_cmd(), nft_family, table, chain, rule)
format(_nftables_cmd(), nft_family, table, chain, rule)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
return ret
def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
@ -730,29 +830,37 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
IPv6:
salt '*' nftables.insert filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
salt '*' nftables.insert filter input position=3 \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to insert rule {0} to table {1}.'.format(rule, table),
'result': False}
if not chain:
return 'Error: Chain needs to be specified'
ret['comment'] = 'Error: Chain needs to be specified'
return ret
if not rule:
return 'Error: Rule needs to be specified'
ret['comment'] = 'Error: Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
nft_family = _NFTABLES_FAMILIES[family]
if position:
@ -764,9 +872,13 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Added rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to add rule "{0}" chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
return ret
def delete(table, chain=None, position=None, rule=None, family='ipv4'):
@ -795,21 +907,26 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
rule='input tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to delete rule {0} in table {1}.'.format(rule, table),
'result': False}
if position and rule:
return 'Error: Only specify a position or a rule, not both'
ret['comment'] = 'Error: Only specify a position or a rule, not both'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if not check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if not res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
return ret
# nftables rules can only be deleted using the handle
# if we don't have it, find it.
@ -818,13 +935,17 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} delete rule {1} {2} {3} handle {4}'.\
format(_nftables_cmd(), nft_family, table, chain, position)
format(_nftables_cmd(), nft_family, table, chain, position)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Deleted rule "{0}" in chain {1} in table {2} in family {3}.'.\
format(rule, chain, table, family)
else:
return False
ret['comment'] = 'Failed to delete rule "{0}" in chain {1} table {2} in family {3}'.\
format(rule, chain, table, family)
return ret
def flush(table='filter', chain='', family='ipv4'):
@ -843,24 +964,33 @@ def flush(table='filter', chain='', family='ipv4'):
IPv6:
salt '*' nftables.flush filter input family=ipv6
'''
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
ret = {'comment': 'Failed to flush rules from chain {0} in table {1}.'.format(chain, table),
'result': False}
res = check_table(table, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
if chain:
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, nft_family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
cmd = '{0} flush chain {1} {2} {3}'.\
format(_nftables_cmd(), nft_family, table, chain)
format(_nftables_cmd(), nft_family, table, chain)
comment = 'from chain {0} in table {1} in family {2}.'.\
format(chain, table, family)
else:
cmd = '{0} flush table {1} {2}'.\
format(_nftables_cmd(), nft_family, table)
comment = 'from table {0} in family {1}.'.\
format(table, family)
out = __salt__['cmd.run'](cmd, python_shell=False)
if len(out) == 0:
return True
ret['result'] = True
ret['comment'] = 'Flushed rules {0}'.format(comment)
else:
return False
ret['comment'] = 'Failed to flush rules {0}'.format(comment)
return ret

View File

@ -106,6 +106,9 @@ from __future__ import absolute_import, print_function, unicode_literals
# Import salt libs
from salt.state import STATE_INTERNAL_KEYWORDS as _STATE_INTERNAL_KEYWORDS
import logging
log = logging.getLogger(__name__)
def __virtual__():
'''
@ -258,7 +261,12 @@ def append(name, family='ipv4', **kwargs):
command.strip(),
family)
return ret
if __salt__['nftables.append'](kwargs['table'], kwargs['chain'], rule, family):
result = __salt__['nftables.append'](kwargs['table'],
kwargs['chain'],
rule,
family)
log.debug('=== result %s ===', result)
if result['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
@ -274,9 +282,10 @@ def append(name, family='ipv4', **kwargs):
else:
ret['result'] = False
ret['comment'] = ('Failed to set nftables rule for {0}.\n'
'Attempted rule was {1} for {2}').format(
'Attempted rule was {1} for {2}.\n'
'{3}').format(
name,
command.strip(), family)
command.strip(), family, result['comment'])
return ret

View File

@ -48,29 +48,44 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it build a well-formatted nftables rule based on kwargs.
'''
self.assertEqual(nftables.build_rule(full='True'),
'Error: Table needs to be specified')
{'result': False,
'rule': '',
'comment': 'Error: Table needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', full='True'),
'Error: Chain needs to be specified')
{'result': False,
'rule': '',
'comment': 'Error: Chain needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
full='True'),
'Error: Command needs to be specified')
{'result': False,
'rule': '',
'comment': 'Error: Command needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='insert', position='3',
full='True'),
'nft insert rule ip filter input position 3 ')
{'result': True,
'rule': 'nft insert rule ip filter input position 3 ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='insert', full='True'),
'nft insert rule ip filter input ')
{'result': True,
'rule': 'nft insert rule ip filter input ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='halt', full='True'),
'nft halt rule ip filter input ')
{'result': True,
'rule': 'nft halt rule ip filter input ',
'comment': 'Successfully built rule'})
self.assertEqual(nftables.build_rule(), '')
self.assertEqual(nftables.build_rule(),
{'result': True,
'rule': '',
'comment': ''})
# 'get_saved_rules' function tests: 1
@ -163,19 +178,23 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a rule in the table and chain
'''
self.assertEqual(nftables.check(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Error: Chain needs to be specified'})
self.assertEqual(nftables.check(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Error: Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Error: table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
mock = MagicMock(return_value='table ip filter')
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 does not exist'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
@ -195,15 +214,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a chain in the table
'''
self.assertEqual(nftables.check_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check_chain(chain='input'))
self.assertEqual(nftables.check_chain(chain='input'), ret)
mock = MagicMock(return_value='chain input {{')
ret = {'comment': 'Chain input in table filter in family ipv4 exists',
'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check_chain(chain='input'))
self.assertEqual(nftables.check_chain(chain='input'), ret)
# 'check_table' function tests: 1
@ -212,15 +236,20 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it check for the existence of a table
'''
self.assertEqual(nftables.check_table(),
'Error: table needs to be specified')
{'result': False,
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Table nat in family ipv4 does not exist',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check_table(table='nat'))
self.assertEqual(nftables.check_table(table='nat'), ret)
mock = MagicMock(return_value='table ip nat')
ret = {'comment': 'Table nat in family ipv4 exists',
'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check_table(table='nat'))
self.assertEqual(nftables.check_table(table='nat'), ret)
# 'new_table' function tests: 1
@ -229,16 +258,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it create new custom table.
'''
self.assertEqual(nftables.new_table(table=None),
'Error: table needs to be specified')
{'result': False,
'comment': 'Error: Table needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Table nat in family ipv4 created', 'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_table(table='nat'), True)
self.assertEqual(nftables.new_table(table='nat'), ret)
mock = MagicMock(return_value='table ip nat')
ret = {'comment': 'Table nat in family ipv4 exists', 'result': True}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_table(table='nat'),
'Error: table nat in family ipv4 already exists')
ret)
# 'delete_table' function tests: 1
@ -247,16 +279,35 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it delete custom table.
'''
self.assertEqual(nftables.delete_table(table=None),
'Error: table needs to be specified')
{'result': False,
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_table(table='nat'),
'Error: table nat in family ipv4 does not exist')
mock_ret = {'result': False,
'comment': 'Table nat in family ipv4 does not exist'}
with patch('salt.modules.nftables.check_table',
MagicMock(return_value=mock_ret)):
ret = nftables.delete_table(table='nat')
self.assertEqual(ret,
{'result': False,
'comment': 'Table nat in family ipv4 does not exist'})
mock = MagicMock(return_value='table ip nat')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_table(table='nat'), 'table ip nat')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_table(table='nat'),
{'comment': 'Table nat in family ipv4 could not be deleted',
'result': False})
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.delete_table(table='nat'),
{'comment': 'Table nat in family ipv4 deleted',
'result': True})
# 'new_chain' function tests: 2
@ -265,14 +316,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it create new chain to the specified table.
'''
self.assertEqual(nftables.new_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_chain(chain='input'), ret)
ret = 'Error: chain input in table filter in family ipv4 already exists'
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 already exists'}
mock = MagicMock(return_value='table ip filter chain input {{')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_chain(chain='input'), ret)
@ -283,11 +337,16 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
self.assertEqual(nftables.new_chain(chain='input',
table_type='filter'),
'Error: table_type hook and priority required')
{'result': False,
'comment': 'Error: table_type, hook, and priority required.'})
self.assertTrue(nftables.new_chain(chain='input',
table_type='filter',
@ -300,14 +359,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it delete the chain from the specified table.
'''
self.assertEqual(nftables.delete_chain(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Error: table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 already exists'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
@ -318,9 +380,15 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertTrue(nftables.delete_chain(chain='input'))
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'comment': 'Chain input in table filter in family ipv4 deleted',
'result': True}
self.assertEqual(nftables.delete_chain(chain='input'), _expected)
# 'append' function tests: 2
@ -329,26 +397,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it append a rule to the specified table & chain.
'''
self.assertEqual(nftables.append(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Error: Chain needs to be specified'})
self.assertEqual(nftables.append(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Error: Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'comment': 'Table filter in family ipv4 does not exist',
'result': False}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'comment': 'Chain input in table filter in family ipv4 does not exist',
'result': False}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
_expected = {'comment': 'Error: rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.append(chain='input', rule=_ru))
self.assertEqual(nftables.append(chain='input',
rule=_ru),
_expected)
def test_append_rule(self):
'''
@ -357,11 +433,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
_ru = 'input tcp dport 22 log accept'
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.append(chain='input', rule=_ru))
self.assertTrue(nftables.append(chain='input', rule=_ru))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': False}
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
_expected = {'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru), 'result': True}
self.assertEqual(nftables.append(chain='input', rule=_ru), _expected)
# 'insert' function tests: 2
@ -371,18 +455,22 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
at the specified position.
'''
self.assertEqual(nftables.insert(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Error: Chain needs to be specified'})
self.assertEqual(nftables.insert(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Error: Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
@ -390,6 +478,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
res = nftables.insert(chain='input', rule=_ru)
import logging
log = logging.getLogger(__name__)
log.debug('=== res %s ===', res)
self.assertTrue(nftables.insert(chain='input', rule=_ru))
def test_insert_rule(self):
@ -400,11 +492,23 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
_ru = 'input tcp dport 22 log accept'
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=False)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.insert(chain='input', rule=_ru))
self.assertTrue(nftables.insert(chain='input', rule=_ru))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': False,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to add rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
self.assertEqual(nftables.insert(chain='input', rule=_ru),
_expected)
_expected = {'result': True,
'comment': 'Added rule "{0}" chain input in table filter in family ipv4.'.format(_ru)}
self.assertEqual(nftables.insert(chain='input', rule=_ru),
_expected)
# 'delete' function tests: 2
@ -415,17 +519,21 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
the rule's position in the chain.
'''
_ru = 'input tcp dport 22 log accept'
ret = {'result': False,
'comment': 'Error: Only specify a position or a rule, not both'}
self.assertEqual(nftables.delete(table='filter', chain='input',
position='3', rule=_ru),
'Error: Only specify a position or a rule, not both')
ret)
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete(table='filter', chain='input',
rule=_ru), ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete(table='filter', chain='input',
@ -444,14 +552,27 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.delete(table='filter', chain='input',
position='3'))
self.assertTrue(nftables.delete(table='filter', chain='input',
position='3'))
patch('salt.modules.nftables.check',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to delete rule "None" in chain input table filter in family ipv4'}
self.assertEqual(nftables.delete(table='filter',
chain='input',
position='3'),
_expected)
_expected = {'result': True,
'comment': 'Deleted rule "None" in chain input in table filter in family ipv4.'}
self.assertEqual(nftables.delete(table='filter',
chain='input',
position='3'),
_expected)
# 'flush' function tests: 2
def test_flush(self):
@ -459,12 +580,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it flush the chain in the specified table, flush all chains
in the specified table if chain is not specified.
'''
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
ret = 'Error: chain input in table filter in family ip does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
@ -476,7 +599,19 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
mock = MagicMock(side_effect=['1', ''])
with patch.dict(nftables.__salt__, {'cmd.run': mock}), \
patch('salt.modules.nftables.check_chain', MagicMock(return_value=True)), \
patch('salt.modules.nftables.check_table', MagicMock(return_value=True)):
self.assertFalse(nftables.flush(table='filter', chain='input'))
self.assertTrue(nftables.flush(table='filter', chain='input'))
patch('salt.modules.nftables.check_chain',
MagicMock(return_value={'result': True,
'comment': ''})), \
patch('salt.modules.nftables.check_table',
MagicMock(return_value={'result': True,
'comment': ''})):
_expected = {'result': False,
'comment': 'Failed to flush rules from chain input in table filter in family ipv4.'}
self.assertEqual(nftables.flush(table='filter',
chain='input'),
_expected)
_expected = {'result': True,
'comment': 'Flushed rules from chain input in table filter in family ipv4.'}
self.assertEqual(nftables.flush(table='filter',
chain='input'),
_expected)