mirror of
https://github.com/valitydev/salt.git
synced 2024-11-09 01:36:48 +00:00
c0ac0c0096
The output of 'nft list tables' has the family included in the output
492 lines
20 KiB
Python
492 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
|
'''
|
|
:codeauthor: :email:`Jayesh Kariya <jayeshk@saltstack.com>`
|
|
'''
|
|
|
|
# Import Python Libs
|
|
from __future__ import absolute_import
|
|
|
|
# Import Salt Testing Libs
|
|
from salttesting import TestCase, skipIf
|
|
from salttesting.mock import (
|
|
MagicMock,
|
|
mock_open,
|
|
patch,
|
|
NO_MOCK,
|
|
NO_MOCK_REASON
|
|
)
|
|
|
|
from salttesting.helpers import ensure_in_syspath
|
|
|
|
ensure_in_syspath('../../')
|
|
|
|
# Import Salt Libs
|
|
from salt.modules import nftables
|
|
import salt.utils
|
|
from salt.exceptions import CommandExecutionError
|
|
|
|
# Globals
|
|
nftables.__grains__ = {}
|
|
nftables.__salt__ = {}
|
|
|
|
|
|
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
|
class NftablesTestCase(TestCase):
|
|
'''
|
|
Test cases for salt.modules.nftables
|
|
'''
|
|
# 'version' function tests: 1
|
|
|
|
def test_version(self):
|
|
'''
|
|
Test if it return version from nftables --version
|
|
'''
|
|
mock = MagicMock(return_value='nf_tables 0.3-1')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.version(), '0.3-1')
|
|
|
|
# 'build_rule' function tests: 1
|
|
|
|
def test_build_rule(self):
|
|
'''
|
|
Test if it build a well-formatted nftables rule based on kwargs.
|
|
'''
|
|
self.assertEqual(nftables.build_rule(full='True'),
|
|
'Error: Table needs to be specified')
|
|
|
|
self.assertEqual(nftables.build_rule(table='filter', full='True'),
|
|
'Error: Chain needs to be specified')
|
|
|
|
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
|
full='True'),
|
|
'Error: Command needs to be specified')
|
|
|
|
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
|
command='insert', position='3',
|
|
full='True'),
|
|
'nft insert rule ip filter input position 3 ')
|
|
|
|
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
|
command='insert', full='True'),
|
|
'nft insert rule ip filter input ')
|
|
|
|
self.assertEqual(nftables.build_rule(table='filter', chain='input',
|
|
command='halt', full='True'),
|
|
'nft halt rule ip filter input ')
|
|
|
|
self.assertEqual(nftables.build_rule(), '')
|
|
|
|
# 'get_saved_rules' function tests: 1
|
|
|
|
def test_get_saved_rules(self):
|
|
'''
|
|
Test if it return a data structure of the rules in the conf file
|
|
'''
|
|
with patch.dict(nftables.__grains__, {'os_family': 'Debian'}):
|
|
with patch.object(salt.utils, 'fopen', MagicMock(mock_open())):
|
|
self.assertListEqual(nftables.get_saved_rules(), [])
|
|
|
|
# 'get_rules' function tests: 1
|
|
|
|
def test_get_rules(self):
|
|
'''
|
|
Test if it return a data structure of the current, in-memory rules
|
|
'''
|
|
mock = MagicMock(return_value='SALT STACK')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertListEqual(nftables.get_rules(), ['SALT STACK'])
|
|
|
|
mock = MagicMock(return_value=False)
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertListEqual(nftables.get_rules(), [])
|
|
|
|
# 'save' function tests: 1
|
|
|
|
def test_save(self):
|
|
'''
|
|
Test if it save the current in-memory rules to disk
|
|
'''
|
|
with patch.dict(nftables.__grains__, {'os_family': 'Debian'}):
|
|
mock = MagicMock(return_value=False)
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
with patch.object(salt.utils, 'fopen', MagicMock(mock_open())):
|
|
self.assertEqual(nftables.save(), '#! nft -f\n\n')
|
|
|
|
with patch.object(salt.utils, 'fopen',
|
|
MagicMock(side_effect=IOError)):
|
|
self.assertRaises(CommandExecutionError, nftables.save)
|
|
|
|
# 'get_rule_handle' function tests: 1
|
|
|
|
def test_get_rule_handle(self):
|
|
'''
|
|
Test if it get the handle for a particular rule
|
|
'''
|
|
self.assertEqual(nftables.get_rule_handle(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
self.assertEqual(nftables.get_rule_handle(chain='input'),
|
|
'Error: Rule needs to be specified')
|
|
|
|
_ru = 'input tcp dport 22 log accept'
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
|
|
ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.get_rule_handle(chain='input', rule=_ru),
|
|
ret)
|
|
|
|
ret = ('Error: rule input tcp dport 22 log accept chain input'
|
|
' in table filter in family ipv4 does not exist')
|
|
ret1 = 'Error: could not find rule input tcp dport 22 log accept'
|
|
with patch.object(nftables, 'check_table',
|
|
MagicMock(return_value=True)):
|
|
with patch.object(nftables, 'check_chain',
|
|
MagicMock(return_value=True)):
|
|
with patch.object(nftables, 'check',
|
|
MagicMock(side_effect=[False, True])):
|
|
self.assertEqual(nftables.get_rule_handle(chain='input',
|
|
rule=_ru), ret)
|
|
|
|
_ru = 'input tcp dport 22 log accept'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.get_rule_handle(chain='input',
|
|
rule=_ru),
|
|
ret1)
|
|
|
|
# 'check' function tests: 1
|
|
|
|
def test_check(self):
|
|
'''
|
|
Test if it check for the existence of a rule in the table and chain
|
|
'''
|
|
self.assertEqual(nftables.check(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
self.assertEqual(nftables.check(chain='input'),
|
|
'Error: Rule needs to be specified')
|
|
|
|
_ru = 'input tcp dport 22 log accept'
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
|
|
|
mock = MagicMock(return_value='table ip filter')
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.check(chain='input', rule=_ru), ret)
|
|
|
|
mock = MagicMock(return_value='table ip filter chain input {{')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.check(chain='input', rule=_ru))
|
|
|
|
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
|
mock = MagicMock(return_value=r_val)
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.check(chain='input', rule=_ru))
|
|
|
|
# 'check_chain' function tests: 1
|
|
|
|
def test_check_chain(self):
|
|
'''
|
|
Test if it check for the existence of a chain in the table
|
|
'''
|
|
self.assertEqual(nftables.check_chain(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.check_chain(chain='input'))
|
|
|
|
mock = MagicMock(return_value='chain input {{')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.check_chain(chain='input'))
|
|
|
|
# 'check_table' function tests: 1
|
|
|
|
def test_check_table(self):
|
|
'''
|
|
Test if it check for the existence of a table
|
|
'''
|
|
self.assertEqual(nftables.check_table(),
|
|
'Error: table needs to be specified')
|
|
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.check_table(table='nat'))
|
|
|
|
mock = MagicMock(return_value='table ip nat')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.check_table(table='nat'))
|
|
|
|
# 'new_table' function tests: 1
|
|
|
|
def test_new_table(self):
|
|
'''
|
|
Test if it create new custom table.
|
|
'''
|
|
self.assertEqual(nftables.new_table(table=None),
|
|
'Error: table needs to be specified')
|
|
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.new_table(table='nat'), True)
|
|
|
|
mock = MagicMock(return_value='table ip nat')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.new_table(table='nat'),
|
|
'Error: table nat in family ipv4 already exists')
|
|
|
|
# 'delete_table' function tests: 1
|
|
|
|
def test_delete_table(self):
|
|
'''
|
|
Test if it delete custom table.
|
|
'''
|
|
self.assertEqual(nftables.delete_table(table=None),
|
|
'Error: table needs to be specified')
|
|
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete_table(table='nat'),
|
|
'Error: table nat in family ipv4 does not exist')
|
|
|
|
mock = MagicMock(return_value='table ip nat')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete_table(table='nat'), 'table ip nat')
|
|
|
|
# 'new_chain' function tests: 2
|
|
|
|
def test_new_chain(self):
|
|
'''
|
|
Test if it create new chain to the specified table.
|
|
'''
|
|
self.assertEqual(nftables.new_chain(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.new_chain(chain='input'), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 already exists'
|
|
mock = MagicMock(return_value='table ip filter chain input {{')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.new_chain(chain='input'), ret)
|
|
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=False))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_new_chain_variable(self):
|
|
'''
|
|
Test if it create new chain to the specified table.
|
|
'''
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.new_chain(chain='input',
|
|
table_type='filter'),
|
|
'Error: table_type hook and priority required')
|
|
|
|
self.assertTrue(nftables.new_chain(chain='input',
|
|
table_type='filter',
|
|
hook='input', priority=0))
|
|
|
|
# 'delete_chain' function tests: 1
|
|
|
|
def test_delete_chain(self):
|
|
'''
|
|
Test if it delete the chain from the specified table.
|
|
'''
|
|
self.assertEqual(nftables.delete_chain(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete_chain(chain='input'), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete_chain(chain='input'), ret)
|
|
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_delete_chain_variables(self):
|
|
'''
|
|
Test if it delete the chain from the specified table.
|
|
'''
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.delete_chain(chain='input'))
|
|
|
|
# 'append' function tests: 2
|
|
|
|
def test_append(self):
|
|
'''
|
|
Test if it append a rule to the specified table & chain.
|
|
'''
|
|
self.assertEqual(nftables.append(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
self.assertEqual(nftables.append(chain='input'),
|
|
'Error: Rule needs to be specified')
|
|
|
|
_ru = 'input tcp dport 22 log accept'
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.append(chain='input', rule=_ru), ret)
|
|
|
|
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
|
mock = MagicMock(return_value=r_val)
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.append(chain='input', rule=_ru))
|
|
|
|
@patch('salt.modules.nftables.check', MagicMock(return_value=False))
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_append_rule(self):
|
|
'''
|
|
Test if it append a rule to the specified table & chain.
|
|
'''
|
|
_ru = 'input tcp dport 22 log accept'
|
|
mock = MagicMock(side_effect=['1', ''])
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.append(chain='input', rule=_ru))
|
|
self.assertTrue(nftables.append(chain='input', rule=_ru))
|
|
|
|
# 'insert' function tests: 2
|
|
|
|
def test_insert(self):
|
|
'''
|
|
Test if it insert a rule into the specified table & chain,
|
|
at the specified position.
|
|
'''
|
|
self.assertEqual(nftables.insert(),
|
|
'Error: Chain needs to be specified')
|
|
|
|
self.assertEqual(nftables.insert(chain='input'),
|
|
'Error: Rule needs to be specified')
|
|
|
|
_ru = 'input tcp dport 22 log accept'
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.insert(chain='input', rule=_ru), ret)
|
|
|
|
r_val = 'table ip filter chain input {{ input tcp dport 22 log accept #'
|
|
mock = MagicMock(return_value=r_val)
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.insert(chain='input', rule=_ru))
|
|
|
|
@patch('salt.modules.nftables.check', MagicMock(return_value=False))
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_insert_rule(self):
|
|
'''
|
|
Test if it insert a rule into the specified table & chain,
|
|
at the specified position.
|
|
'''
|
|
_ru = 'input tcp dport 22 log accept'
|
|
mock = MagicMock(side_effect=['1', ''])
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.insert(chain='input', rule=_ru))
|
|
self.assertTrue(nftables.insert(chain='input', rule=_ru))
|
|
|
|
# 'delete' function tests: 2
|
|
|
|
def test_delete(self):
|
|
'''
|
|
Test if it delete a rule from the specified table & chain,
|
|
specifying either the rule in its entirety, or
|
|
the rule's position in the chain.
|
|
'''
|
|
_ru = 'input tcp dport 22 log accept'
|
|
self.assertEqual(nftables.delete(table='filter', chain='input',
|
|
position='3', rule=_ru),
|
|
'Error: Only specify a position or a rule, not both')
|
|
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete(table='filter', chain='input',
|
|
rule=_ru), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.delete(table='filter', chain='input',
|
|
rule=_ru), ret)
|
|
|
|
mock = MagicMock(return_value='table ip filter chain input {{')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertTrue(nftables.delete(table='filter', chain='input',
|
|
rule=_ru))
|
|
|
|
@patch('salt.modules.nftables.check', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_delete_rule(self):
|
|
'''
|
|
Test if it delete a rule from the specified table & chain,
|
|
specifying either the rule in its entirety, or
|
|
the rule's position in the chain.
|
|
'''
|
|
mock = MagicMock(side_effect=['1', ''])
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.delete(table='filter', chain='input',
|
|
position='3'))
|
|
self.assertTrue(nftables.delete(table='filter', chain='input',
|
|
position='3'))
|
|
|
|
# 'flush' function tests: 2
|
|
|
|
def test_flush(self):
|
|
'''
|
|
Test if it flush the chain in the specified table, flush all chains
|
|
in the specified table if chain is not specified.
|
|
'''
|
|
ret = 'Error: table filter in family ipv4 does not exist'
|
|
mock = MagicMock(return_value='')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
|
|
|
|
ret = 'Error: chain input in table filter in family ip does not exist'
|
|
mock = MagicMock(return_value='table ip filter')
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertEqual(nftables.flush(table='filter', chain='input'), ret)
|
|
|
|
@patch('salt.modules.nftables.check_chain', MagicMock(return_value=True))
|
|
@patch('salt.modules.nftables.check_table', MagicMock(return_value=True))
|
|
def test_flush_chain(self):
|
|
'''
|
|
Test if it flush the chain in the specified table, flush all chains
|
|
in the specified table if chain is not specified.
|
|
'''
|
|
mock = MagicMock(side_effect=['1', ''])
|
|
with patch.dict(nftables.__salt__, {'cmd.run': mock}):
|
|
self.assertFalse(nftables.flush(table='filter', chain='input'))
|
|
self.assertTrue(nftables.flush(table='filter', chain='input'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from integration import run_tests
|
|
run_tests(NftablesTestCase, needs_daemon=False)
|