More updates to nftables module, state modules, and unit tests.

This commit is contained in:
Gareth J. Greenaway 2019-02-19 17:42:19 -08:00
parent 3ec96e36e2
commit dc4afbb8b4
No known key found for this signature in database
GPG Key ID: 10B62F8A7CAD7A41
4 changed files with 241 additions and 148 deletions

View File

@ -231,15 +231,15 @@ def build_rule(table=None, chain=None, command=None, position='', full=None, fam
if full in ['True', 'true']:
if not table:
ret['comment'] = 'Error: Table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
if not chain:
ret['comment'] = 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not command:
ret['comment'] = 'Error: Command needs to be specified'
ret['comment'] = 'Command needs to be specified'
return ret
if command in ['Insert', 'insert', 'INSERT']:
@ -379,35 +379,36 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.get_rule_handle filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.get_rule_handle filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
ret['comment'] = 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
ret['comment'] = 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
return 'Error: table {0} in family {1} does not exist'.\
format(table, family)
res = check_table(table, family=family)
log.debug('=== res %s ===', res)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
return 'Error: chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
res = check_chain(table, chain, family=family)
if not res['result']:
return res
if not check(table, chain, rule, family=family):
return 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
res = check(table, chain, rule, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} --numeric --numeric --numeric --handle list chain {1} {2} {3}'.\
@ -419,8 +420,9 @@ def get_rule_handle(table='filter', chain=None, rule=None, family='ipv4'):
for r in rules:
match = pat.search(r)
if match:
return match.group('handle')
return 'Error: could not find rule {0}'.format(rule)
return {'result': True, 'handle': match.group('handle')}
return {'result': False,
'comment': 'Could not find rule {0}'.format(rule)}
def check(table='filter', chain=None, rule=None, family='ipv4'):
@ -437,33 +439,31 @@ def check(table='filter', chain=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.check filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.check filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': '',
'result': False}
if not chain:
ret['comment'] = 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
ret['comment'] = 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
if not check_table(table, family=family):
ret['comment'] = 'Table {0} in family {1} does not exist'.\
format(table, family)
return ret
res = check_table(table, family=family)
if not res['result']:
return res
if not check_chain(table, chain, family=family):
ret['comment'] = 'Chain {0} in table {1} in family {2} does not exist'.\
format(chain, table, family)
return ret
res = check_chain(table, chain, family=family)
if not res['result']:
return res
nft_family = _NFTABLES_FAMILIES[family]
cmd = '{0} --handle --numeric --numeric --numeric list chain {1} {2} {3}'.\
@ -472,10 +472,10 @@ def check(table='filter', chain=None, rule=None, family='ipv4'):
out = __salt__['cmd.run'](cmd, python_shell=False).find(search_rule)
if out == -1:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} does not exist'.\
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
else:
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} exists'.\
ret['comment'] = 'Rule {0} in chain {1} in table {2} in family {3} exists'.\
format(rule, chain, table, family)
ret['result'] = True
return ret
@ -566,7 +566,7 @@ def new_table(table, family='ipv4'):
'result': False}
if not table:
ret['comment'] = 'Error: Table needs to be specified'
ret['comment'] = 'Table needs to be specified'
return ret
res = check_table(table, family=family)
@ -665,7 +665,7 @@ def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=N
res = check_chain(table, chain, family=family)
if res['result']:
ret['comment'] = 'Error: chain {0} in table {1} in family {2} already exists'.\
ret['comment'] = 'Chain {0} in table {1} in family {2} already exists'.\
format(chain, table, family)
return ret
@ -678,7 +678,7 @@ def new_chain(table='filter', chain=None, table_type=None, hook=None, priority=N
format(cmd, table_type, hook, priority)
else:
# Specify one, require all
ret['comment'] = 'Error: table_type, hook, and priority required.'
ret['comment'] = 'Table_type, hook, and priority required.'
return ret
out = __salt__['cmd.run'](cmd, python_shell=False)
@ -721,13 +721,13 @@ def delete_chain(table='filter', chain=None, family='ipv4'):
res = check_table(table, family=family)
if not res['result']:
ret['comment'] = 'Error: table {0} in family {1} does not exist'.\
ret['comment'] = 'Table {0} in family {1} does not exist'.\
format(table, family)
return ret
res = check_chain(table, chain, family=family)
if not res['result']:
ret['comment'] = 'Error: chain {0} in table {1} in family {2} already exists'.\
ret['comment'] = 'Chain {0} in table {1} in family {2} already exists'.\
format(chain, table, family)
return ret
@ -771,11 +771,11 @@ def append(table='filter', chain=None, rule=None, family='ipv4'):
'result': False}
if not chain:
ret['comment'] = 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
ret['comment'] = 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
res = check_table(table, family=family)
@ -788,7 +788,7 @@ def append(table='filter', chain=None, rule=None, family='ipv4'):
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
@ -823,10 +823,10 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
.. code-block:: bash
salt '*' nftables.insert filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
salt '*' nftables.insert filter input position=3 \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.insert filter input \\
@ -841,11 +841,11 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
'result': False}
if not chain:
ret['comment'] = 'Error: Chain needs to be specified'
ret['comment'] = 'Chain needs to be specified'
return ret
if not rule:
ret['comment'] = 'Error: Rule needs to be specified'
ret['comment'] = 'Rule needs to be specified'
return ret
res = check_table(table, family=family)
@ -858,7 +858,7 @@ def insert(table='filter', chain=None, position=None, rule=None, family='ipv4'):
res = check(table, chain, rule, family=family)
if res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} already exists'.\
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} already exists'.\
format(rule, chain, table, family)
return ret
@ -898,20 +898,20 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
salt '*' nftables.delete filter input position=3
salt '*' nftables.delete filter input \\
rule='input tcp dport 22 log accept'
rule='tcp dport 22 log accept'
IPv6:
salt '*' nftables.delete filter input position=3 family=ipv6
salt '*' nftables.delete filter input \\
rule='input tcp dport 22 log accept' \\
rule='tcp dport 22 log accept' \\
family=ipv6
'''
ret = {'comment': 'Failed to delete rule {0} in table {1}.'.format(rule, table),
'result': False}
if position and rule:
ret['comment'] = 'Error: Only specify a position or a rule, not both'
ret['comment'] = 'Only specify a position or a rule, not both'
return ret
res = check_table(table, family=family)
@ -924,7 +924,7 @@ def delete(table, chain=None, position=None, rule=None, family='ipv4'):
res = check(table, chain, rule, family=family)
if not res['result']:
ret['comment'] = 'Error: rule {0} chain {1} in table {2} in family {3} does not exist'.\
ret['comment'] = 'Rule {0} chain {1} in table {2} in family {3} does not exist'.\
format(rule, chain, table, family)
return ret

View File

@ -139,13 +139,13 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
'comment': ''}
chain_check = __salt__['nftables.check_chain'](table, name, family=family)
if chain_check is True:
if chain_check['result'] is True:
ret['result'] = True
ret['comment'] = ('nftables {0} chain is already exist in {1} table for {2}'
.format(name, table, family))
return ret
command = __salt__['nftables.new_chain'](
res = __salt__['nftables.new_chain'](
table,
name,
table_type=table_type,
@ -154,7 +154,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
family=family
)
if command is True:
if res['result'] is True:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = ('nftables {0} chain in {1} table create success for {2}'
@ -165,7 +165,7 @@ def chain_present(name, table='filter', table_type=None, hook=None, priority=Non
ret['comment'] = 'Failed to create {0} chain in {1} table: {2} for {3}'.format(
name,
table,
command.strip(),
res['comment'].strip(),
family
)
return ret
@ -242,31 +242,38 @@ def append(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
if __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](full=True, family=family, command='add', **kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already set ({1}) for {2}'.format(
name,
command.strip(),
family)
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be set ({1}) for {2}'.format(
name,
command.strip(),
family)
return ret
result = __salt__['nftables.append'](kwargs['table'],
kwargs['chain'],
rule,
family)
log.debug('=== result %s ===', result)
if result['result']:
res = __salt__['nftables.append'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
@ -285,7 +292,7 @@ def append(name, family='ipv4', **kwargs):
'Attempted rule was {1} for {2}.\n'
'{3}').format(
name,
command.strip(), family, result['comment'])
command.strip(), family, res['comment'])
return ret
@ -315,25 +322,42 @@ def insert(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='insert', **kwargs)
if __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
res = __salt__['nftables.build_rule'](full=True,
family=family,
command='insert',
**kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already set for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be set for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __salt__['nftables.insert'](kwargs['table'], kwargs['chain'], kwargs['position'], rule, family):
res = __salt__['nftables.insert'](kwargs['table'],
kwargs['chain'],
kwargs['position'],
rule,
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Set nftables rule for {0} to: {1} for {2}'.format(
@ -381,19 +405,29 @@ def delete(name, family='ipv4', **kwargs):
for ignore in _STATE_INTERNAL_KEYWORDS:
if ignore in kwargs:
del kwargs[ignore]
rule = __salt__['nftables.build_rule'](family=family, **kwargs)
command = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
if not __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family) is True:
res = __salt__['nftables.build_rule'](family=family, **kwargs)
if not res['result']:
return res
rule = res['rule']
res = __salt__['nftables.build_rule'](full=True, family=family, command='D', **kwargs)
if not res['result']:
return res
command = res['rule']
res = __salt__['nftables.check'](kwargs['table'],
kwargs['chain'],
rule,
family)
if not res['result']:
ret['result'] = True
ret['comment'] = 'nftables rule for {0} already absent for {1} ({2})'.format(
name,
family,
command.strip())
return ret
if __opts__['test']:
if 'test' in __opts__ and __opts__['test']:
ret['comment'] = 'nftables rule for {0} needs to be deleted for {1} ({2})'.format(
name,
family,
@ -401,19 +435,19 @@ def delete(name, family='ipv4', **kwargs):
return ret
if 'position' in kwargs:
result = __salt__['nftables.delete'](
res = __salt__['nftables.delete'](
kwargs['table'],
kwargs['chain'],
family=family,
position=kwargs['position'])
else:
result = __salt__['nftables.delete'](
res = __salt__['nftables.delete'](
kwargs['table'],
kwargs['chain'],
family=family,
rule=rule)
if result:
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Delete nftables rule for {0} {1}'.format(
@ -456,7 +490,8 @@ def flush(name, family='ipv4', **kwargs):
if 'table' not in kwargs:
kwargs['table'] = 'filter'
if not __salt__['nftables.check_table'](kwargs['table'], family=family):
res = __salt__['nftables.check_table'](kwargs['table'], family=family)
if not res['result']:
ret['result'] = False
ret['comment'] = 'Failed to flush table {0} in family {1}, table does not exist.'.format(
kwargs['table'],
@ -467,7 +502,10 @@ def flush(name, family='ipv4', **kwargs):
if 'chain' not in kwargs:
kwargs['chain'] = ''
else:
if not __salt__['nftables.check_chain'](kwargs['table'], kwargs['chain'], family=family):
res = __salt__['nftables.check_chain'](kwargs['table'],
kwargs['chain'],
family=family)
if not res['result']:
ret['result'] = False
ret['comment'] = 'Failed to flush chain {0} in table {1} in family {2}, chain does not exist.'.format(
kwargs['chain'],
@ -476,7 +514,10 @@ def flush(name, family='ipv4', **kwargs):
)
return ret
if __salt__['nftables.flush'](kwargs['table'], kwargs['chain'], family):
res = __salt__['nftables.flush'](kwargs['table'],
kwargs['chain'],
family)
if res['result']:
ret['changes'] = {'locale': name}
ret['result'] = True
ret['comment'] = 'Flush nftables rules in {0} table {1} chain {2} family'.format(

View File

@ -50,18 +50,18 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(nftables.build_rule(full='True'),
{'result': False,
'rule': '',
'comment': 'Error: Table needs to be specified'})
'comment': 'Table needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', full='True'),
{'result': False,
'rule': '',
'comment': 'Error: Chain needs to be specified'})
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
full='True'),
{'result': False,
'rule': '',
'comment': 'Error: Command needs to be specified'})
'comment': 'Command needs to be specified'})
self.assertEqual(nftables.build_rule(table='filter', chain='input',
command='insert', position='3',
@ -134,33 +134,46 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
Test if it get the handle for a particular rule
'''
self.assertEqual(nftables.get_rule_handle(),
'Error: Chain needs to be specified')
{'result': False,
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.get_rule_handle(chain='input'),
'Error: Rule needs to be specified')
{'result': False,
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = 'Error: table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
ret)
ret = 'Error: chain input in table filter in family ipv4 does not exist'
ret = {'result': False,
'comment': 'Chain input in table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
ret)
ret = ('Error: rule input tcp dport 22 log accept chain input'
' in table filter in family ipv4 does not exist')
ret1 = 'Error: could not find rule input tcp dport 22 log accept'
ret = {'result': False,
'comment': ('Rule input tcp dport 22 log accept chain'
' input in table filter in family ipv4 does not exist')}
ret1 = {'result': False,
'comment': 'Could not find rule input tcp dport 22 log accept'}
with patch.object(nftables, 'check_table',
MagicMock(return_value=True)):
MagicMock(return_value={'result': True,
'comment': ''})):
with patch.object(nftables, 'check_chain',
MagicMock(return_value=True)):
MagicMock(return_value={'result': True,
'comment': ''})):
_ret1 = {'result': False,
'comment': ('Rule input tcp dport 22 log accept'
' chain input in table filter in'
' family ipv4 does not exist')}
_ret2 = {'result': True, 'comment': ''}
with patch.object(nftables, 'check',
MagicMock(side_effect=[False, True])):
MagicMock(side_effect=[_ret1, _ret2])):
self.assertEqual(nftables.get_rule_handle(chain='input',
rule=_ru), ret)
@ -179,33 +192,37 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
self.assertEqual(nftables.check(),
{'result': False,
'comment': 'Error: Chain needs to be specified'})
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.check(chain='input'),
{'result': False,
'comment': 'Error: Rule needs to be specified'})
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
_ru = 'tcp dport 22 log accept'
ret = {'result': False,
'comment': 'Error: table filter in family ipv4 does not exist'}
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
mock = MagicMock(return_value='table ip filter')
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 does not exist'}
'comment': 'Chain input in table filter in family ipv4 does not exist'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
mock = MagicMock(return_value='table ip filter chain input {{')
ret = {'result': False, 'comment':
'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 does not exist'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertFalse(nftables.check(chain='input', rule=_ru))
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
ret = {'result': True,
'comment': 'Rule tcp dport 22 log accept in chain input in table filter in family ipv4 exists'}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertTrue(nftables.check(chain='input', rule=_ru))
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
# 'check_chain' function tests: 1
@ -259,7 +276,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
self.assertEqual(nftables.new_table(table=None),
{'result': False,
'comment': 'Error: Table needs to be specified'})
'comment': 'Table needs to be specified'})
mock = MagicMock(return_value='')
ret = {'comment': 'Table nat in family ipv4 created', 'result': True}
@ -326,7 +343,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(nftables.new_chain(chain='input'), ret)
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 already exists'}
'comment': 'Chain input in table filter in family ipv4 already exists'}
mock = MagicMock(return_value='table ip filter chain input {{')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.new_chain(chain='input'), ret)
@ -346,7 +363,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(nftables.new_chain(chain='input',
table_type='filter'),
{'result': False,
'comment': 'Error: table_type, hook, and priority required.'})
'comment': 'Table_type, hook, and priority required.'})
self.assertTrue(nftables.new_chain(chain='input',
table_type='filter',
@ -363,13 +380,13 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': 'Chain needs to be specified'})
ret = {'result': False,
'comment': 'Error: table filter in family ipv4 does not exist'}
'comment': 'Table filter in family ipv4 does not exist'}
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
ret = {'result': False,
'comment': 'Error: chain input in table filter in family ipv4 already exists'}
'comment': 'Chain input in table filter in family ipv4 already exists'}
mock = MagicMock(return_value='table ip filter')
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.delete_chain(chain='input'), ret)
@ -398,11 +415,11 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
self.assertEqual(nftables.append(),
{'result': False,
'comment': 'Error: Chain needs to be specified'})
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.append(chain='input'),
{'result': False,
'comment': 'Error: Rule needs to be specified'})
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = {'comment': 'Table filter in family ipv4 does not exist',
@ -419,7 +436,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
mock = MagicMock(return_value=r_val)
_expected = {'comment': 'Error: rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists',
_expected = {'comment': 'Rule input tcp dport 22 log accept chain input in table filter in family ipv4 already exists',
'result': False}
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
self.assertEqual(nftables.append(chain='input',
@ -456,11 +473,11 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
self.assertEqual(nftables.insert(),
{'result': False,
'comment': 'Error: Chain needs to be specified'})
'comment': 'Chain needs to be specified'})
self.assertEqual(nftables.insert(chain='input'),
{'result': False,
'comment': 'Error: Rule needs to be specified'})
'comment': 'Rule needs to be specified'})
_ru = 'input tcp dport 22 log accept'
ret = {'result': False,
@ -520,7 +537,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'''
_ru = 'input tcp dport 22 log accept'
ret = {'result': False,
'comment': 'Error: Only specify a position or a rule, not both'}
'comment': 'Only specify a position or a rule, not both'}
self.assertEqual(nftables.delete(table='filter', chain='input',
position='3', rule=_ru),
ret)

View File

@ -36,14 +36,17 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'changes': {},
'result': True,
'comment': ''}
mock = MagicMock(side_effect=[True, False, False])
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
ret.update({'comment': 'nftables salt chain is already'
' exist in filter table for ipv4'})
self.assertDictEqual(nftables.chain_present('salt'), ret)
mock = MagicMock(side_effect=[True, ''])
with patch.dict(nftables.__salt__, {"nftables.new_chain": mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.new_chain': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'nftables salt chain in filter'
' table create success for ipv4'})
@ -64,13 +67,13 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'result': True,
'comment': ''}
mock = MagicMock(side_effect=[False, True])
with patch.dict(nftables.__salt__, {"nftables.check_chain": mock}):
with patch.dict(nftables.__salt__, {'nftables.check_chain': mock}):
ret.update({'comment': 'nftables salt chain is already absent'
' in filter table for ipv4'})
self.assertDictEqual(nftables.chain_absent('salt'), ret)
mock = MagicMock(return_value='')
with patch.dict(nftables.__salt__, {"nftables.flush": mock}):
with patch.dict(nftables.__salt__, {'nftables.flush': mock}):
ret.update({'result': False,
'comment': 'Failed to flush salt chain'
' in filter table: for ipv4'})
@ -86,26 +89,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt'
' already set (a) for ipv4'})
self.assertDictEqual(nftables.append('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": True}):
with patch.dict(nftables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'nftables rule for salt needs'
' to be set (a) for ipv4'})
self.assertDictEqual(nftables.append('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{"nftables.append": mock}):
{'nftables.append': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Set nftables rule for salt'
' to: a for ipv4',
@ -118,7 +129,7 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
ret.update({'changes': {},
'comment': 'Failed to set nftables'
' rule for salt.\nAttempted rule was'
' a for ipv4', 'result': False})
' a for ipv4.\n', 'result': False})
self.assertDictEqual(nftables.append('salt',
table='',
chain=''),
@ -134,26 +145,34 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(nftables.__salt__, {"nftables.build_rule": mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(nftables.__salt__, {"nftables.check": mock}):
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[{'result': True, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''},
{'result': False, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt already'
' set for ipv4 (a)'})
self.assertDictEqual(nftables.insert('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": True}):
with patch.dict(nftables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'nftables rule for salt'
' needs to be set for ipv4 (a)'})
self.assertDictEqual(nftables.insert('salt', table='',
chain=''), ret)
with patch.dict(nftables.__opts__, {"test": False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{"nftables.insert": mock}):
{'nftables.insert': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Set nftables rule for'
' salt to: a for ipv4',
@ -185,9 +204,14 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
mock = MagicMock(return_value={'result': True,
'comment': '',
'rule': 'a'})
with patch.dict(nftables.__salt__, {'nftables.build_rule': mock}):
mock = MagicMock(side_effect=[False, True, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check': mock}):
ret.update({'comment': 'nftables rule for salt'
' already absent for ipv4 (a)',
@ -205,7 +229,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
chain=''), ret)
with patch.dict(nftables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[True, False])
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.delete': mock}):
ret.update({'result': True,
@ -239,7 +266,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(nftables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(side_effect=[False, True, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__, {'nftables.check_table': mock}):
ret.update({'comment': 'Failed to flush table in family'
' ipv4, table does not exist.',
@ -248,7 +278,9 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
table='', chain=''),
ret)
mock = MagicMock(side_effect=[False, True, True])
mock = MagicMock(side_effect=[{'result': False, 'comment': ''},
{'result': True, 'comment': ''},
{'result': True, 'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.check_chain': mock}):
ret.update({'comment': 'Failed to flush chain in table'
@ -256,7 +288,10 @@ class NftablesTestCase(TestCase, LoaderModuleMockMixin):
self.assertDictEqual(nftables.flush('salt', table='',
chain=''), ret)
mock = MagicMock(side_effect=[True, False])
mock = MagicMock(side_effect=[{'result': True,
'comment': ''},
{'result': False,
'comment': ''}])
with patch.dict(nftables.__salt__,
{'nftables.flush': mock}):
ret.update({'changes': {'locale': 'salt'},