salt/tests/unit/auth_test.py

503 lines
22 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
'''
2014-05-22 00:59:02 +00:00
:codeauthor: :email:`Mike Place <mp@saltstack.com>`
'''
# Import pytohn libs
from __future__ import absolute_import
# Import Salt Testing libs
from salttesting import TestCase, skipIf
from salttesting.helpers import ensure_in_syspath
from salttesting.mock import patch, call, NO_MOCK, NO_MOCK_REASON, MagicMock
2014-05-05 22:49:51 +00:00
2014-10-07 13:03:06 +00:00
ensure_in_syspath('../')
2014-05-05 22:49:51 +00:00
# Import Salt libraries
import salt.master
2014-05-08 22:19:07 +00:00
import integration
from salt import auth
@skipIf(NO_MOCK, NO_MOCK_REASON)
class LoadAuthTestCase(TestCase):
@patch('salt.payload.Serial')
2014-05-05 22:49:51 +00:00
@patch('salt.loader.auth', return_value={'pam.auth': 'fake_func_str', 'pam.groups': 'fake_groups_function_str'})
def setUp(self, auth_mock, serial_mock): # pylint: disable=W0221
self.lauth = auth.LoadAuth({}) # Load with empty opts
def test_load_name(self):
valid_eauth_load = {'username': 'test_user',
'show_timeout': False,
'test_password': '',
'eauth': 'pam'}
# Test a case where the loader auth doesn't have the auth type
without_auth_type = dict(valid_eauth_load)
without_auth_type.pop('eauth')
ret = self.lauth.load_name(without_auth_type)
self.assertEqual(ret, '', "Did not bail when the auth loader didn't have the auth type.")
# Test a case with valid params
with patch('salt.utils.format_call') as format_call_mock:
2014-01-30 04:17:58 +00:00
expected_ret = call('fake_func_str', {
'username': 'test_user',
'test_password': '',
'show_timeout': False,
'eauth': 'pam'
2015-01-20 16:43:22 +00:00
}, expected_extra_kws=auth.AUTH_INTERNAL_KEYWORDS)
2014-01-30 04:17:58 +00:00
ret = self.lauth.load_name(valid_eauth_load)
format_call_mock.assert_has_calls((expected_ret,), any_order=True)
2014-01-30 04:16:02 +00:00
2014-05-05 22:49:51 +00:00
def test_get_groups(self):
valid_eauth_load = {'username': 'test_user',
'show_timeout': False,
'test_password': '',
'eauth': 'pam'}
with patch('salt.utils.format_call') as format_call_mock:
expected_ret = call('fake_groups_function_str', {
'username': 'test_user',
'test_password': '',
'show_timeout': False,
'eauth': 'pam'
2015-01-20 16:43:22 +00:00
}, expected_extra_kws=auth.AUTH_INTERNAL_KEYWORDS)
2014-05-05 22:49:51 +00:00
self.lauth.get_groups(valid_eauth_load)
format_call_mock.assert_has_calls((expected_ret,), any_order=True)
2014-05-05 22:49:51 +00:00
@patch('zmq.Context', MagicMock())
@patch('salt.payload.Serial.dumps', MagicMock())
@patch('salt.master.tagify', MagicMock())
2014-05-05 22:49:51 +00:00
@patch('salt.utils.event.SaltEvent.fire_event', return_value='dummy_tag')
@patch('salt.auth.LoadAuth.time_auth', MagicMock(return_value=True))
2014-05-08 22:19:07 +00:00
class MasterACLTestCase(integration.ModuleCase):
2014-05-07 22:43:52 +00:00
'''
A class to check various aspects of the publisher ACL system
2014-05-07 22:43:52 +00:00
'''
2014-05-07 22:58:34 +00:00
@patch('salt.minion.MasterMinion', MagicMock())
2014-05-08 22:19:07 +00:00
@patch('salt.utils.verify.check_path_traversal', MagicMock())
2014-05-07 22:58:34 +00:00
def setUp(self):
2014-10-07 13:03:06 +00:00
opts = self.get_config('minion', from_scratch=True)
2015-10-23 14:26:15 +00:00
opts['client_acl'] = {}
opts['publisher_acl'] = {}
2014-05-08 22:19:07 +00:00
opts['client_acl_blacklist'] = {}
opts['publisher_acl_blacklist'] = {}
2014-05-08 22:19:07 +00:00
opts['master_job_cache'] = ''
opts['sign_pub_messages'] = False
2014-10-23 17:08:08 +00:00
opts['con_cache'] = ''
2015-11-24 11:58:24 +00:00
opts['external_auth'] = {}
opts['external_auth']['pam'] = \
{'test_user': [{'*': ['test.ping']},
{'minion_glob*': ['foo.bar']},
{'minion_func_test': ['func_test.*']}],
'test_group%': [{'*': ['test.echo']}],
'test_user_mminion': [{'target_minion': ['test.ping']}],
'*': [{'my_minion': ['my_mod.my_func']}],
'test_user_func': [{'*': [{'test.echo': {'args': ['MSG:.*']}},
{'test.echo': {'kwargs': {'text': 'KWMSG:.*',
'anything': '.*',
'none': None}}},
{'my_mod.*': {'args': ['a.*', 'b.*'],
'kwargs': {'kwa': 'kwa.*',
'kwb': 'kwb'}}}]},
{'minion1': [{'test.echo': {'args': ['TEST',
2015-11-24 14:31:44 +00:00
None,
'TEST.*']}},
2015-11-25 09:33:46 +00:00
{'test.empty': {}}]}
2015-11-24 11:58:24 +00:00
]
}
2015-03-21 22:40:25 +00:00
self.clear = salt.master.ClearFuncs(opts, MagicMock())
# overwrite the _send_pub method so we don't have to serialize MagicMock
self.clear._send_pub = lambda payload: True
# make sure to return a JID, instead of a mock
self.clear.mminion.returners = {'.prep_jid': lambda x: 1}
2014-05-05 22:49:51 +00:00
2014-05-07 22:43:52 +00:00
self.valid_clear_load = {'tgt_type': 'glob',
2014-05-05 22:49:51 +00:00
'jid': '',
'cmd': 'publish',
'tgt': 'test_minion',
'kwargs':
2014-05-06 15:23:09 +00:00
{'username': 'test_user',
'password': 'test_password',
'show_timeout': False,
'eauth': 'pam',
'show_jid': False},
2014-05-05 22:49:51 +00:00
'ret': '',
'user': 'test_user',
'key': '',
'arg': '',
'fun': 'test.ping',
2014-05-08 22:19:07 +00:00
}
2014-05-07 22:43:52 +00:00
@patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value='some_minions'))
def test_master_publish_name(self, fire_event_mock):
'''
Test to ensure a simple name can auth against a given function.
This tests to ensure test_user can access test.ping but *not* sys.doc
'''
# Can we access test.ping?
2014-05-07 22:43:52 +00:00
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.ping')
# Are we denied access to sys.doc?
2014-05-07 22:43:52 +00:00
sys_doc_load = self.valid_clear_load
sys_doc_load['fun'] = 'sys.doc'
self.clear.publish(sys_doc_load)
self.assertNotEqual(fire_event_mock.call_args[0][0]['fun'], 'sys.doc') # If sys.doc were to fire, this would match
2014-05-05 22:49:51 +00:00
2014-05-07 22:43:52 +00:00
@patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value='some_minions'))
def test_master_publish_group(self, fire_event_mock):
'''
Tests to ensure test_group can access test.echo but *not* sys.doc
'''
self.valid_clear_load['kwargs']['user'] = 'new_user'
self.valid_clear_load['fun'] = 'test.echo'
self.valid_clear_load['arg'] = 'hello'
with patch('salt.auth.LoadAuth.get_groups', return_value=['test_group', 'second_test_group']):
self.clear.publish(self.valid_clear_load)
# Did we fire test.echo?
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.echo')
# Request sys.doc
self.valid_clear_load['fun'] = 'sys.doc'
# Did we fire it?
2014-05-07 22:58:34 +00:00
self.assertNotEqual(fire_event_mock.call_args[0][0]['fun'], 'sys.doc')
2014-05-07 22:43:52 +00:00
def test_master_publish_some_minions(self, fire_event_mock):
'''
Tests to ensure we can only target minions for which we
have permission with publisher acl.
2014-05-07 22:43:52 +00:00
Note that in order for these sorts of tests to run correctly that
you should NOT patch check_minions!
'''
self.valid_clear_load['kwargs']['username'] = 'test_user_mminion'
self.valid_clear_load['user'] = 'test_user_mminion'
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
def test_master_not_user_glob_all(self, fire_event_mock):
'''
Test to ensure that we DO NOT access to a given
function to all users with publisher acl. ex:
2014-05-07 22:43:52 +00:00
'*':
my_minion:
- my_func
Yes, this seems like a bit of a no-op test but it's
here to document that this functionality
is NOT supported currently.
WARNING: Do not patch this wit
'''
self.valid_clear_load['kwargs']['username'] = 'NOT_A_VALID_USERNAME'
self.valid_clear_load['user'] = 'NOT_A_VALID_USERNAME'
self.valid_clear_load['fun'] = 'test.ping'
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
def test_master_minion_glob(self, fire_event_mock):
'''
Test to ensure we can allow access to a given
function for a user to a subset of minions
selected by a glob. ex:
test_user:
'minion_glob*':
- glob_mod.glob_func
This test is a bit tricky, because ultimately the real functionality
lies in what's returned from check_minions, but this checks a limited
amount of logic on the way there as well. Note the inline patch.
'''
requested_function = 'foo.bar'
requested_tgt = 'minion_glob1'
self.valid_clear_load['tgt'] = requested_tgt
self.valid_clear_load['fun'] = requested_function
with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=['minion_glob1'])): # Assume that there is a listening minion match
self.clear.publish(self.valid_clear_load)
self.assertTrue(fire_event_mock.called, 'Did not fire {0} for minion tgt {1}'.format(requested_function, requested_tgt))
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], requested_function, 'Did not fire {0} for minion glob'.format(requested_function))
def test_master_function_glob(self, fire_event_mock):
'''
Test to ensure that we can allow access to a given
set of functions in an execution module as selected
by a glob. ex:
my_user:
my_minion:
'test.*'
'''
# Unimplemented
pass
2015-11-24 14:31:44 +00:00
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='minion1'))
def test_args_empty_spec(self, fire_event_mock):
'''
Test simple arg restriction allowed.
'test_user_func':
minion1:
2015-11-25 09:33:46 +00:00
- test.empty:
2015-11-24 14:31:44 +00:00
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': 'minion1',
'fun': 'test.empty',
'arg': ['TEST']})
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.empty')
2015-11-24 11:58:24 +00:00
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='minion1'))
def test_args_simple_match(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test simple arg restriction allowed.
2015-11-24 11:58:24 +00:00
'test_user_func':
minion1:
- test.echo:
args:
- 'TEST'
- 'TEST.*'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': 'minion1',
'fun': 'test.echo',
2015-11-24 14:31:44 +00:00
'arg': ['TEST', 'any', 'TEST ABC']})
2015-11-24 11:58:24 +00:00
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.echo')
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='minion1'))
def test_args_more_args(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test simple arg restriction allowed to pass unlisted args.
2015-11-24 11:58:24 +00:00
'test_user_func':
minion1:
- test.echo:
args:
- 'TEST'
- 'TEST.*'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': 'minion1',
'fun': 'test.echo',
'arg': ['TEST',
2015-11-24 14:31:44 +00:00
'any',
2015-11-24 11:58:24 +00:00
'TEST ABC',
'arg 3',
{'kwarg1': 'val1',
'__kwarg__': True}]})
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.echo')
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='minion1'))
def test_args_simple_forbidden(self, fire_event_mock):
'''
Test simple arg restriction forbidden.
'test_user_func':
minion1:
- test.echo:
args:
- 'TEST'
- 'TEST.*'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
2015-11-24 14:31:44 +00:00
# Wrong last arg
2015-11-24 11:58:24 +00:00
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': 'minion1',
'fun': 'test.echo',
2015-11-24 14:31:44 +00:00
'arg': ['TEST', 'any', 'TESLA']})
2015-11-24 11:58:24 +00:00
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# Wrong first arg
2015-11-24 14:31:44 +00:00
self.valid_clear_load['arg'] = ['TES', 'any', 'TEST1234']
2015-11-24 11:58:24 +00:00
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
2015-11-24 14:31:44 +00:00
# Missing the last arg
self.valid_clear_load['arg'] = ['TEST', 'any']
2015-11-24 11:58:24 +00:00
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# No args
self.valid_clear_load['arg'] = []
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='some_minions'))
def test_args_kwargs_match(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test simple kwargs restriction allowed.
2015-11-24 11:58:24 +00:00
'test_user_func':
'*':
- test.echo:
kwargs:
text: 'KWMSG:.*'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': '*',
'fun': 'test.echo',
'arg': [{'text': 'KWMSG: a message',
'anything': 'hello all',
'none': 'hello none',
'__kwarg__': True}]})
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'], 'test.echo')
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='some_minions'))
def test_args_kwargs_mismatch(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test simple kwargs restriction allowed.
2015-11-24 11:58:24 +00:00
'test_user_func':
'*':
- test.echo:
kwargs:
text: 'KWMSG:.*'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': '*',
'fun': 'test.echo'})
# Wrong kwarg value
self.valid_clear_load['arg'] = [{'text': 'KWMSG a message',
'anything': 'hello all',
'none': 'hello none',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# Missing kwarg value
self.valid_clear_load['arg'] = [{'anything': 'hello all',
'none': 'hello none',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
2015-11-24 14:31:44 +00:00
self.valid_clear_load['arg'] = [{'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
self.valid_clear_load['arg'] = [{}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
self.valid_clear_load['arg'] = []
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
2015-11-24 11:58:24 +00:00
# Missing kwarg allowing any value
self.valid_clear_load['arg'] = [{'text': 'KWMSG: a message',
'none': 'hello none',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
self.valid_clear_load['arg'] = [{'text': 'KWMSG: a message',
'anything': 'hello all',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='some_minions'))
def test_args_mixed_match(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test mixed args and kwargs restriction allowed.
2015-11-24 11:58:24 +00:00
'test_user_func':
'*':
- 'my_mod.*':
args:
- 'a.*'
- 'b.*'
kwargs:
'kwa': 'kwa.*'
'kwb': 'kwb'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': '*',
'fun': 'my_mod.some_func',
'arg': ['alpha',
'beta',
'gamma',
{'kwa': 'kwarg #1',
'kwb': 'kwb',
'one_more': 'just one more',
'__kwarg__': True}]})
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.call_args[0][0]['fun'],
'my_mod.some_func')
@patch('salt.utils.minions.CkMinions.check_minions',
MagicMock(return_value='some_minions'))
def test_args_mixed_mismatch(self, fire_event_mock):
'''
2015-11-24 14:31:44 +00:00
Test mixed args and kwargs restriction forbidden.
2015-11-24 11:58:24 +00:00
'test_user_func':
'*':
- 'my_mod.*':
args:
- 'a.*'
- 'b.*'
kwargs:
'kwa': 'kwa.*'
'kwb': 'kwb'
'''
self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
self.valid_clear_load.update({'user': 'test_user_func',
'tgt': '*',
'fun': 'my_mod.some_func'})
# Wrong arg value
self.valid_clear_load['arg'] = ['alpha',
'gamma',
{'kwa': 'kwarg #1',
'kwb': 'kwb',
'one_more': 'just one more',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# Wrong kwarg value
self.valid_clear_load['arg'] = ['alpha',
'beta',
'gamma',
{'kwa': 'kkk',
'kwb': 'kwb',
'one_more': 'just one more',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# Missing arg
self.valid_clear_load['arg'] = ['alpha',
{'kwa': 'kwarg #1',
'kwb': 'kwb',
'one_more': 'just one more',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
# Missing kwarg
self.valid_clear_load['arg'] = ['alpha',
'beta',
'gamma',
{'kwa': 'kwarg #1',
'one_more': 'just one more',
'__kwarg__': True}]
self.clear.publish(self.valid_clear_load)
self.assertEqual(fire_event_mock.mock_calls, [])
2014-01-30 04:16:02 +00:00
if __name__ == '__main__':
from integration import run_tests
2015-06-06 13:47:52 +00:00
tests = [LoadAuthTestCase, MasterACLTestCase]
run_tests(*tests, needs_daemon=False)