salt/tests/unit/states/iptables_test.py
Alessandro Ghedini 882c6c9c86 Do not force 'filter' table when flushing
The "table" argument is already part of the function signature, this means
that flush() will always force the "filter" table even when the user sets
a different one.
2016-06-28 15:40:50 +01:00

384 lines
18 KiB
Python

# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Rahul Handay <rahulha@saltstack.com>`
'''
# Import Python Libs
from __future__ import absolute_import
# Import Salt Testing Libs
from salttesting import TestCase, skipIf
from salttesting.helpers import ensure_in_syspath
from salttesting.mock import (
MagicMock,
patch,
NO_MOCK,
NO_MOCK_REASON
)
ensure_in_syspath('../../')
# Import Salt Libs
from salt.states import iptables
# Globals
iptables.__salt__ = {}
iptables.__opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class IptablesTestCase(TestCase):
'''
Validate the iptables state
'''
def test_chain_present(self):
'''
Test to verify the chain is exist.
'''
ret = {'name': 'salt',
'changes': {},
'result': True,
'comment': ''
}
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(iptables.__salt__, {'iptables.check_chain': mock}):
ret.update({'comment': 'iptables salt chain is already'
' exist in filter table for ipv4'})
self.assertDictEqual(iptables.chain_present("salt"), ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'comment': 'iptables salt chain in filter'
' table needs to be set for ipv4',
'result': None})
self.assertDictEqual(iptables.chain_present("salt"), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[True, ''])
with patch.dict(iptables.__salt__,
{'iptables.new_chain': mock}):
ret.update({'result': True,
'comment': 'iptables salt chain in filter'
' table create success for ipv4',
'changes': {'locale': 'salt'}})
self.assertDictEqual(iptables.chain_present('salt'), ret)
ret.update({'changes': {}, 'result': False,
'comment': 'Failed to create salt chain'
' in filter table: for ipv4'})
self.assertDictEqual(iptables.chain_present('salt'), ret)
def test_chain_absent(self):
'''
Test to verify the chain is absent.
'''
ret = {'name': 'salt',
'changes': {},
'result': True,
'comment': ''
}
mock = MagicMock(side_effect=[False, True, True, True])
with patch.dict(iptables.__salt__, {'iptables.check_chain': mock}):
ret.update({'comment': 'iptables salt chain is already'
' absent in filter table for ipv4'})
self.assertDictEqual(iptables.chain_absent("salt"), ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'comment': 'iptables salt chain in filter'
' table needs to be removed ipv4',
'result': None})
self.assertDictEqual(iptables.chain_absent("salt"), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[False, 'a'])
with patch.dict(iptables.__salt__, {'iptables.flush': mock}):
mock = MagicMock(return_value=True)
with patch.dict(iptables.__salt__,
{'iptables.delete_chain': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'iptables salt chain in filter'
' table delete success for ipv4',
'result': True})
self.assertDictEqual(iptables.chain_absent("salt"),
ret)
ret.update({'changes': {}, 'result': False,
'comment': 'Failed to flush salt chain'
' in filter table: a for ipv4'})
self.assertDictEqual(iptables.chain_absent("salt"), ret)
def test_append(self):
'''
Test to append a rule to a chain
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''
}
self.assertDictEqual(iptables.append('salt', rules=[]), ret)
mock = MagicMock(return_value=[])
with patch.object(iptables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(iptables.__salt__, {'iptables.build_rule': mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(iptables.__salt__, {'iptables.check': mock}):
ret.update({'comment': 'iptables rule for salt'
' already set (a) for ipv4',
'result': True})
self.assertDictEqual(iptables.append('salt',
table='', chain=''),
ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'iptables rule for salt'
' needs to be set (a) for ipv4'})
self.assertDictEqual(iptables.append('salt',
table='',
chain=''), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[True, False])
with patch.dict(iptables.__salt__,
{'iptables.append': mock}):
ret.update({'changes': {'locale': 'salt'},
'result': True,
'comment': 'Set iptables rule'
' for salt to: a for ipv4'})
self.assertDictEqual(iptables.append('salt',
table='',
chain=''),
ret)
ret.update({'changes': {},
'result': False,
'comment': 'Failed to set iptables'
' rule for salt.\nAttempted rule was'
' a for ipv4'})
self.assertDictEqual(iptables.append('salt',
table='',
chain=''),
ret)
def test_insert(self):
'''
Test to insert a rule into a chain
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''}
self.assertDictEqual(iptables.insert('salt', rules=[]), ret)
mock = MagicMock(return_value=[])
with patch.object(iptables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(iptables.__salt__, {'iptables.build_rule': mock}):
mock = MagicMock(side_effect=[True, False, False, False])
with patch.dict(iptables.__salt__, {'iptables.check': mock}):
ret.update({'comment': 'iptables rule for salt'
' already set for ipv4 (a)',
'result': True})
self.assertDictEqual(iptables.insert('salt',
table='', chain=''),
ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'iptables rule for salt'
' needs to be set for ipv4 (a)'})
self.assertDictEqual(iptables.insert('salt',
table='',
chain=''), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[False, True])
with patch.dict(iptables.__salt__,
{'iptables.insert': mock}):
ret.update({'changes': {'locale': 'salt'},
'result': True,
'comment': 'Set iptables rule'
' for salt to: a for ipv4'})
self.assertDictEqual(iptables.insert('salt',
table='',
chain='',
position=''),
ret)
ret.update({'changes': {},
'result': False,
'comment': 'Failed to set iptables'
' rule for salt.\nAttempted rule was a'
})
self.assertDictEqual(iptables.insert('salt',
table='',
chain='',
position=''),
ret)
def test_delete(self):
'''
Test to delete a rule to a chain
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''}
self.assertDictEqual(iptables.delete('salt', rules=[]), ret)
mock = MagicMock(return_value=[])
with patch.object(iptables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='a')
with patch.dict(iptables.__salt__, {'iptables.build_rule': mock}):
mock = MagicMock(side_effect=[False, True, True, True])
with patch.dict(iptables.__salt__, {'iptables.check': mock}):
ret.update({'comment': 'iptables rule for salt'
' already absent for ipv4 (a)',
'result': True})
self.assertDictEqual(iptables.delete('salt',
table='', chain=''),
ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'result': None,
'comment': 'iptables rule for salt needs'
' to be deleted for ipv4 (a)'})
self.assertDictEqual(iptables.delete('salt',
table='',
chain=''), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[False, True])
with patch.dict(iptables.__salt__,
{'iptables.delete': mock}):
ret.update({'result': True,
'changes': {'locale': 'salt'},
'comment': 'Delete iptables rule'
' for salt a'})
self.assertDictEqual(iptables.delete('salt',
table='',
chain='',
position=''),
ret)
ret.update({'result': False,
'changes': {},
'comment': 'Failed to delete iptables'
' rule for salt.\nAttempted rule was a'
})
self.assertDictEqual(iptables.delete('salt',
table='',
chain='',
position=''),
ret)
def test_set_policy(self):
'''
Test to sets the default policy for iptables firewall tables
'''
ret = {'name': 'salt',
'changes': {},
'result': True,
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(iptables, '_STATE_INTERNAL_KEYWORDS', mock):
mock = MagicMock(return_value='stack')
with patch.dict(iptables.__salt__, {'iptables.get_policy': mock}):
ret.update({'comment': 'iptables default policy for chain'
' on table for ipv4 already set to stack'})
self.assertDictEqual(iptables.set_policy('salt',
table='',
chain='',
policy='stack'), ret)
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'comment': 'iptables default policy for chain'
' on table for ipv4 needs to be set'
' to sal', 'result': None})
self.assertDictEqual(iptables.set_policy('salt',
table='',
chain='',
policy='sal'),
ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[False, True])
with patch.dict(iptables.__salt__,
{'iptables.set_policy': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Set default policy for'
' to sal family ipv4',
'result': True})
self.assertDictEqual(iptables.set_policy('salt',
table='',
chain='',
policy='sal'),
ret)
ret.update({'comment': 'Failed to set iptables'
' default policy',
'result': False,
'changes': {}})
self.assertDictEqual(iptables.set_policy('salt',
table='',
chain='',
policy='sal'),
ret)
def test_flush(self):
'''
Test to flush current iptables state
'''
ret = {'name': 'salt',
'changes': {},
'result': None,
'comment': ''}
mock = MagicMock(return_value=[])
with patch.object(iptables, '_STATE_INTERNAL_KEYWORDS', mock):
with patch.dict(iptables.__opts__, {'test': True}):
ret.update({'comment': 'iptables rules in salt table filter'
' chain ipv4 family needs to be flushed'})
self.assertDictEqual(iptables.flush('salt'), ret)
with patch.dict(iptables.__opts__, {'test': False}):
mock = MagicMock(side_effect=[False, True])
with patch.dict(iptables.__salt__,
{'iptables.flush': mock}):
ret.update({'changes': {'locale': 'salt'},
'comment': 'Flush iptables rules in '
'table chain ipv4 family',
'result': True})
self.assertDictEqual(iptables.flush('salt',
table='', chain=''),
ret)
ret.update({'changes': {},
'comment': 'Failed to flush iptables rules',
'result': False})
self.assertDictEqual(iptables.flush('salt',
table='', chain=''),
ret)
def test_mod_aggregate(self):
'''
Test to mod_aggregate function
'''
self.assertDictEqual(iptables.mod_aggregate({'fun': 'salt'}, [], []),
{'fun': 'salt'})
self.assertDictEqual(iptables.mod_aggregate({'fun': 'append'}, [], []),
{'fun': 'append'})
if __name__ == '__main__':
from integration import run_tests
run_tests(IptablesTestCase, needs_daemon=False)