salt/tests/unit/daemons/test_masterapi.py
rallytime 3d2ea16c3a
Merge branch '2017.7' into '2018.3'
Conflicts:
  - requirements/base.txt
  - salt/states/archive.py
  - salt/states/file.py
  - tests/integration/runners/test_state.py
  - tests/unit/daemons/test_masterapi.py
2018-06-11 11:45:37 -04:00

609 lines
27 KiB
Python

# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
from functools import wraps
import os
import io
import stat
# Import Salt libs
import salt.config
import salt.daemons.masterapi as masterapi
import salt.utils.platform
# Import Salt Testing Libs
from tests.support.paths import TMP_CONF_DIR
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
patch,
MagicMock,
NO_MOCK,
NO_MOCK_REASON
)
def gen_permissions(owner='', group='', others=''):
'''
Helper method to generate file permission bits
Usage: gen_permissions('rw', 'r', 'r')
'''
ret = 0
for c in owner:
ret |= getattr(stat, 'S_I{}USR'.format(c.upper()), 0)
for c in group:
ret |= getattr(stat, 'S_I{}GRP'.format(c.upper()), 0)
for c in others:
ret |= getattr(stat, 'S_I{}OTH'.format(c.upper()), 0)
return ret
def patch_check_permissions(uid=1, groups=None, is_windows=False, permissive_pki=False):
if not groups:
groups = [uid]
def decorator(func):
@wraps(func)
def wrapper(self):
self.auto_key.opts['permissive_pki_access'] = permissive_pki
if salt.utils.platform.is_windows():
with patch('salt.utils.platform.is_windows', MagicMock(return_value=True)):
func(self)
else:
with patch('os.stat', self.os_stat_mock), \
patch('os.getuid', MagicMock(return_value=uid)), \
patch('salt.utils.user.get_gid_list', MagicMock(return_value=groups)), \
patch('salt.utils.platform.is_windows', MagicMock(return_value=is_windows)):
func(self)
return wrapper
return decorator
@skipIf(NO_MOCK, NO_MOCK_REASON)
class AutoKeyTest(TestCase):
'''
Tests for the salt.daemons.masterapi.AutoKey class
'''
def setUp(self):
opts = salt.config.master_config(None)
opts['user'] = 'test_user'
self.auto_key = masterapi.AutoKey(opts)
self.stats = {}
def os_stat_mock(self, filename):
fmode = MagicMock()
fstats = self.stats.get(filename, {})
fmode.st_mode = fstats.get('mode', 0)
fmode.st_gid = fstats.get('gid', 0)
return fmode
@patch_check_permissions(uid=0, is_windows=True)
def test_check_permissions_windows(self):
'''
Assert that all files are accepted on windows
'''
self.stats['testfile'] = {'mode': gen_permissions('rwx', 'rwx', 'rwx'), 'gid': 2}
self.assertTrue(self.auto_key.check_permissions('testfile'))
@patch_check_permissions(permissive_pki=True)
def test_check_permissions_others_can_write(self):
'''
Assert that no file is accepted, when others can write to it
'''
self.stats['testfile'] = {'mode': gen_permissions('', '', 'w'), 'gid': 1}
if salt.utils.platform.is_windows():
self.assertTrue(self.auto_key.check_permissions('testfile'))
else:
self.assertFalse(self.auto_key.check_permissions('testfile'))
@patch_check_permissions()
def test_check_permissions_group_can_write_not_permissive(self):
'''
Assert that a file is accepted, when group can write to it and perkissive_pki_access=False
'''
self.stats['testfile'] = {'mode': gen_permissions('w', 'w', ''), 'gid': 1}
if salt.utils.platform.is_windows():
self.assertTrue(self.auto_key.check_permissions('testfile'))
else:
self.assertFalse(self.auto_key.check_permissions('testfile'))
@patch_check_permissions(permissive_pki=True)
def test_check_permissions_group_can_write_permissive(self):
'''
Assert that a file is accepted, when group can write to it and perkissive_pki_access=True
'''
self.stats['testfile'] = {'mode': gen_permissions('w', 'w', ''), 'gid': 1}
self.assertTrue(self.auto_key.check_permissions('testfile'))
@patch_check_permissions(uid=0, permissive_pki=True)
def test_check_permissions_group_can_write_permissive_root_in_group(self):
'''
Assert that a file is accepted, when group can write to it, perkissive_pki_access=False,
salt is root and in the file owning group
'''
self.stats['testfile'] = {'mode': gen_permissions('w', 'w', ''), 'gid': 0}
self.assertTrue(self.auto_key.check_permissions('testfile'))
@patch_check_permissions(uid=0, permissive_pki=True)
def test_check_permissions_group_can_write_permissive_root_not_in_group(self):
'''
Assert that no file is accepted, when group can write to it, perkissive_pki_access=False,
salt is root and **not** in the file owning group
'''
self.stats['testfile'] = {'mode': gen_permissions('w', 'w', ''), 'gid': 1}
if salt.utils.platform.is_windows():
self.assertTrue(self.auto_key.check_permissions('testfile'))
else:
self.assertFalse(self.auto_key.check_permissions('testfile'))
@patch_check_permissions()
def test_check_permissions_only_owner_can_write(self):
'''
Assert that a file is accepted, when only the owner can write to it
'''
self.stats['testfile'] = {'mode': gen_permissions('w', '', ''), 'gid': 1}
self.assertTrue(self.auto_key.check_permissions('testfile'))
@patch_check_permissions(uid=0)
def test_check_permissions_only_owner_can_write_root(self):
'''
Assert that a file is accepted, when only the owner can write to it and salt is root
'''
self.stats['testfile'] = {'mode': gen_permissions('w', '', ''), 'gid': 0}
self.assertTrue(self.auto_key.check_permissions('testfile'))
def _test_check_autosign_grains(self,
test_func,
file_content='test_value',
file_name='test_grain',
autosign_grains_dir='test_dir',
permissions_ret=True):
'''
Helper function for testing autosign_grains().
Patches ``os.walk`` to return only ``file_name`` and ``salt.utils.files.fopen`` to open a
mock file with ``file_content`` as content. Optionally sets ``opts`` values.
Then executes test_func. The ``os.walk`` and ``salt.utils.files.fopen`` mock objects
are passed to the function as arguments.
'''
if autosign_grains_dir:
self.auto_key.opts['autosign_grains_dir'] = autosign_grains_dir
mock_file = io.StringIO(file_content)
mock_dirs = [(None, None, [file_name])]
with patch('os.walk', MagicMock(return_value=mock_dirs)) as mock_walk, \
patch('salt.utils.files.fopen', MagicMock(return_value=mock_file)) as mock_open, \
patch('salt.daemons.masterapi.AutoKey.check_permissions',
MagicMock(return_value=permissions_ret)) as mock_permissions:
test_func(mock_walk, mock_open, mock_permissions)
def test_check_autosign_grains_no_grains(self):
'''
Asserts that autosigning from grains fails when no grain values are passed.
'''
def test_func(mock_walk, mock_open, mock_permissions):
self.assertFalse(self.auto_key.check_autosign_grains(None))
self.assertEqual(mock_walk.call_count, 0)
self.assertEqual(mock_open.call_count, 0)
self.assertEqual(mock_permissions.call_count, 0)
self.assertFalse(self.auto_key.check_autosign_grains({}))
self.assertEqual(mock_walk.call_count, 0)
self.assertEqual(mock_open.call_count, 0)
self.assertEqual(mock_permissions.call_count, 0)
self._test_check_autosign_grains(test_func)
def test_check_autosign_grains_no_autosign_grains_dir(self):
'''
Asserts that autosigning from grains fails when the \'autosign_grains_dir\' config option
is undefined.
'''
def test_func(mock_walk, mock_open, mock_permissions):
self.assertFalse(self.auto_key.check_autosign_grains({'test_grain': 'test_value'}))
self.assertEqual(mock_walk.call_count, 0)
self.assertEqual(mock_open.call_count, 0)
self.assertEqual(mock_permissions.call_count, 0)
self._test_check_autosign_grains(test_func, autosign_grains_dir=None)
def test_check_autosign_grains_accept(self):
'''
Asserts that autosigning from grains passes when a matching grain value is in an
autosign_grain file.
'''
def test_func(*args):
self.assertTrue(self.auto_key.check_autosign_grains({'test_grain': 'test_value'}))
file_content = '#test_ignore\ntest_value'
self._test_check_autosign_grains(test_func, file_content=file_content)
def test_check_autosign_grains_accept_not(self):
'''
Asserts that autosigning from grains fails when the grain value is not in the
autosign_grain files.
'''
def test_func(*args):
self.assertFalse(self.auto_key.check_autosign_grains({'test_grain': 'test_invalid'}))
file_content = '#test_invalid\ntest_value'
self._test_check_autosign_grains(test_func, file_content=file_content)
def test_check_autosign_grains_invalid_file_permissions(self):
'''
Asserts that autosigning from grains fails when the grain file has the wrong permissions.
'''
def test_func(*args):
self.assertFalse(self.auto_key.check_autosign_grains({'test_grain': 'test_value'}))
file_content = '#test_ignore\ntest_value'
self._test_check_autosign_grains(test_func, file_content=file_content, permissions_ret=False)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class LocalFuncsTestCase(TestCase):
'''
TestCase for salt.daemons.masterapi.LocalFuncs class
'''
def setUp(self):
opts = salt.config.master_config(None)
self.local_funcs = masterapi.LocalFuncs(opts, 'test-key')
# runner tests
def test_runner_token_not_authenticated(self):
'''
Asserts that a TokenAuthenticationError is returned when the token can't authenticate.
'''
mock_ret = {'error': {'name': 'TokenAuthenticationError',
'message': 'Authentication failure of type "token" occurred.'}}
ret = self.local_funcs.runner({'token': 'asdfasdfasdfasdf'})
self.assertDictEqual(mock_ret, ret)
def test_runner_token_authorization_error(self):
'''
Asserts that a TokenAuthenticationError is returned when the token authenticates, but is
not authorized.
'''
token = 'asdfasdfasdfasdf'
load = {'token': token, 'fun': 'test.arg', 'kwarg': {}}
mock_token = {'token': token, 'eauth': 'foo', 'name': 'test'}
mock_ret = {'error': {'name': 'TokenAuthenticationError',
'message': 'Authentication failure of type "token" occurred '
'for user test.'}}
with patch('salt.auth.LoadAuth.authenticate_token', MagicMock(return_value=mock_token)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
ret = self.local_funcs.runner(load)
self.assertDictEqual(mock_ret, ret)
def test_runner_token_salt_invocation_error(self):
'''
Asserts that a SaltInvocationError is returned when the token authenticates, but the
command is malformed.
'''
token = 'asdfasdfasdfasdf'
load = {'token': token, 'fun': 'badtestarg', 'kwarg': {}}
mock_token = {'token': token, 'eauth': 'foo', 'name': 'test'}
mock_ret = {'error': {'name': 'SaltInvocationError',
'message': 'A command invocation error occurred: Check syntax.'}}
with patch('salt.auth.LoadAuth.authenticate_token', MagicMock(return_value=mock_token)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=['testing'])):
ret = self.local_funcs.runner(load)
self.assertDictEqual(mock_ret, ret)
def test_runner_eauth_not_authenticated(self):
'''
Asserts that an EauthAuthenticationError is returned when the user can't authenticate.
'''
mock_ret = {'error': {'name': 'EauthAuthenticationError',
'message': 'Authentication failure of type "eauth" occurred for '
'user UNKNOWN.'}}
ret = self.local_funcs.runner({'eauth': 'foo'})
self.assertDictEqual(mock_ret, ret)
def test_runner_eauth_authorization_error(self):
'''
Asserts that an EauthAuthenticationError is returned when the user authenticates, but is
not authorized.
'''
load = {'eauth': 'foo', 'username': 'test', 'fun': 'test.arg', 'kwarg': {}}
mock_ret = {'error': {'name': 'EauthAuthenticationError',
'message': 'Authentication failure of type "eauth" occurred for '
'user test.'}}
with patch('salt.auth.LoadAuth.authenticate_eauth', MagicMock(return_value=True)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
ret = self.local_funcs.runner(load)
self.assertDictEqual(mock_ret, ret)
def test_runner_eauth_salt_invocation_error(self):
'''
Asserts that an EauthAuthenticationError is returned when the user authenticates, but the
command is malformed.
'''
load = {'eauth': 'foo', 'username': 'test', 'fun': 'bad.test.arg.func', 'kwarg': {}}
mock_ret = {'error': {'name': 'SaltInvocationError',
'message': 'A command invocation error occurred: Check syntax.'}}
with patch('salt.auth.LoadAuth.authenticate_eauth', MagicMock(return_value=True)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=['testing'])):
ret = self.local_funcs.runner(load)
self.assertDictEqual(mock_ret, ret)
# wheel tests
def test_wheel_token_not_authenticated(self):
'''
Asserts that a TokenAuthenticationError is returned when the token can't authenticate.
'''
mock_ret = {'error': {'name': 'TokenAuthenticationError',
'message': 'Authentication failure of type "token" occurred.'}}
ret = self.local_funcs.wheel({'token': 'asdfasdfasdfasdf'})
self.assertDictEqual(mock_ret, ret)
def test_wheel_token_authorization_error(self):
'''
Asserts that a TokenAuthenticationError is returned when the token authenticates, but is
not authorized.
'''
token = 'asdfasdfasdfasdf'
load = {'token': token, 'fun': 'test.arg', 'kwarg': {}}
mock_token = {'token': token, 'eauth': 'foo', 'name': 'test'}
mock_ret = {'error': {'name': 'TokenAuthenticationError',
'message': 'Authentication failure of type "token" occurred '
'for user test.'}}
with patch('salt.auth.LoadAuth.authenticate_token', MagicMock(return_value=mock_token)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
ret = self.local_funcs.wheel(load)
self.assertDictEqual(mock_ret, ret)
def test_wheel_token_salt_invocation_error(self):
'''
Asserts that a SaltInvocationError is returned when the token authenticates, but the
command is malformed.
'''
token = 'asdfasdfasdfasdf'
load = {'token': token, 'fun': 'badtestarg', 'kwarg': {}}
mock_token = {'token': token, 'eauth': 'foo', 'name': 'test'}
mock_ret = {'error': {'name': 'SaltInvocationError',
'message': 'A command invocation error occurred: Check syntax.'}}
with patch('salt.auth.LoadAuth.authenticate_token', MagicMock(return_value=mock_token)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=['testing'])):
ret = self.local_funcs.wheel(load)
self.assertDictEqual(mock_ret, ret)
def test_wheel_eauth_not_authenticated(self):
'''
Asserts that an EauthAuthenticationError is returned when the user can't authenticate.
'''
mock_ret = {'error': {'name': 'EauthAuthenticationError',
'message': 'Authentication failure of type "eauth" occurred for '
'user UNKNOWN.'}}
ret = self.local_funcs.wheel({'eauth': 'foo'})
self.assertDictEqual(mock_ret, ret)
def test_wheel_eauth_authorization_error(self):
'''
Asserts that an EauthAuthenticationError is returned when the user authenticates, but is
not authorized.
'''
load = {'eauth': 'foo', 'username': 'test', 'fun': 'test.arg', 'kwarg': {}}
mock_ret = {'error': {'name': 'EauthAuthenticationError',
'message': 'Authentication failure of type "eauth" occurred for '
'user test.'}}
with patch('salt.auth.LoadAuth.authenticate_eauth', MagicMock(return_value=True)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
ret = self.local_funcs.wheel(load)
self.assertDictEqual(mock_ret, ret)
def test_wheel_eauth_salt_invocation_error(self):
'''
Asserts that an EauthAuthenticationError is returned when the user authenticates, but the
command is malformed.
'''
load = {'eauth': 'foo', 'username': 'test', 'fun': 'bad.test.arg.func', 'kwarg': {}}
mock_ret = {'error': {'name': 'SaltInvocationError',
'message': 'A command invocation error occurred: Check syntax.'}}
with patch('salt.auth.LoadAuth.authenticate_eauth', MagicMock(return_value=True)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=['testing'])):
ret = self.local_funcs.wheel(load)
self.assertDictEqual(mock_ret, ret)
def test_wheel_user_not_authenticated(self):
'''
Asserts that an UserAuthenticationError is returned when the user can't authenticate.
'''
mock_ret = {'error': {'name': 'UserAuthenticationError',
'message': 'Authentication failure of type "user" occurred for '
'user UNKNOWN.'}}
ret = self.local_funcs.wheel({})
self.assertDictEqual(mock_ret, ret)
# publish tests
def test_publish_user_is_blacklisted(self):
'''
Asserts that an AuthorizationError is returned when the user has been blacklisted.
'''
mock_ret = {'error': {'name': 'AuthorizationError',
'message': 'Authorization error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=True)):
self.assertEqual(mock_ret, self.local_funcs.publish({'user': 'foo', 'fun': 'test.arg'}))
def test_publish_cmd_blacklisted(self):
'''
Asserts that an AuthorizationError is returned when the command has been blacklisted.
'''
mock_ret = {'error': {'name': 'AuthorizationError',
'message': 'Authorization error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=True)):
self.assertEqual(mock_ret, self.local_funcs.publish({'user': 'foo', 'fun': 'test.arg'}))
def test_publish_token_not_authenticated(self):
'''
Asserts that an AuthenticationError is returned when the token can't authenticate.
'''
load = {'user': 'foo', 'fun': 'test.arg', 'tgt': 'test_minion',
'kwargs': {'token': 'asdfasdfasdfasdf'}}
mock_ret = {'error': {'name': 'AuthenticationError',
'message': 'Authentication error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_token_authorization_error(self):
'''
Asserts that an AuthorizationError is returned when the token authenticates, but is not
authorized.
'''
token = 'asdfasdfasdfasdf'
load = {'user': 'foo', 'fun': 'test.arg', 'tgt': 'test_minion',
'arg': 'bar', 'kwargs': {'token': token}}
mock_token = {'token': token, 'eauth': 'foo', 'name': 'test'}
mock_ret = {'error': {'name': 'AuthorizationError',
'message': 'Authorization error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.auth.LoadAuth.authenticate_token', MagicMock(return_value=mock_token)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_eauth_not_authenticated(self):
'''
Asserts that an AuthenticationError is returned when the user can't authenticate.
'''
load = {'user': 'test', 'fun': 'test.arg', 'tgt': 'test_minion',
'kwargs': {'eauth': 'foo'}}
mock_ret = {'error': {'name': 'AuthenticationError',
'message': 'Authentication error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_eauth_authorization_error(self):
'''
Asserts that an AuthorizationError is returned when the user authenticates, but is not
authorized.
'''
load = {'user': 'test', 'fun': 'test.arg', 'tgt': 'test_minion',
'kwargs': {'eauth': 'foo'}, 'arg': 'bar'}
mock_ret = {'error': {'name': 'AuthorizationError',
'message': 'Authorization error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.auth.LoadAuth.authenticate_eauth', MagicMock(return_value=True)), \
patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[])):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_user_not_authenticated(self):
'''
Asserts that an AuthenticationError is returned when the user can't authenticate.
'''
load = {'user': 'test', 'fun': 'test.arg', 'tgt': 'test_minion'}
mock_ret = {'error': {'name': 'AuthenticationError',
'message': 'Authentication error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_user_authenticated_missing_auth_list(self):
'''
Asserts that an AuthenticationError is returned when the user has an effective user id and is
authenticated, but the auth_list is empty.
'''
load = {'user': 'test', 'fun': 'test.arg', 'tgt': 'test_minion',
'kwargs': {'user': 'test'}, 'arg': 'foo'}
mock_ret = {'error': {'name': 'AuthenticationError',
'message': 'Authentication error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.auth.LoadAuth.authenticate_key', MagicMock(return_value='fake-user-key')), \
patch('salt.utils.master.get_values_of_matching_keys', MagicMock(return_value=[])):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
def test_publish_user_authorization_error(self):
'''
Asserts that an AuthorizationError is returned when the user authenticates, but is not
authorized.
'''
load = {'user': 'test', 'fun': 'test.arg', 'tgt': 'test_minion',
'kwargs': {'user': 'test'}, 'arg': 'foo'}
mock_ret = {'error': {'name': 'AuthorizationError',
'message': 'Authorization error occurred.'}}
with patch('salt.acl.PublisherACL.user_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.acl.PublisherACL.cmd_is_blacklisted', MagicMock(return_value=False)), \
patch('salt.auth.LoadAuth.authenticate_key', MagicMock(return_value='fake-user-key')), \
patch('salt.utils.master.get_values_of_matching_keys', MagicMock(return_value=['test'])), \
patch('salt.utils.minions.CkMinions.auth_check', MagicMock(return_value=False)):
self.assertEqual(mock_ret, self.local_funcs.publish(load))
class FakeCache(object):
def __init__(self):
self.data = {}
def store(self, bank, key, value):
self.data[bank, key] = value
def fetch(self, bank, key):
return self.data[bank, key]
class RemoteFuncsTestCase(TestCase):
'''
TestCase for salt.daemons.masterapi.RemoteFuncs class
'''
def setUp(self):
opts = salt.config.master_config(os.path.join(TMP_CONF_DIR, 'master'))
self.funcs = masterapi.RemoteFuncs(opts)
self.funcs.cache = FakeCache()
def test_mine_get(self, tgt_type_key='tgt_type'):
'''
Asserts that ``mine_get`` gives the expected results.
Actually this only tests that:
- the correct check minions method is called
- the correct cache key is subsequently used
'''
self.funcs.cache.store('minions/webserver', 'mine',
dict(ip_addr='2001:db8::1:3'))
with patch('salt.utils.minions.CkMinions._check_compound_minions',
MagicMock(return_value=(dict(
minions=['webserver'],
missing=[])))):
ret = self.funcs._mine_get(
{
'id': 'requester_minion',
'tgt': 'G@roles:web',
'fun': 'ip_addr',
tgt_type_key: 'compound',
}
)
self.assertDictEqual(ret, dict(webserver='2001:db8::1:3'))
def test_mine_get_pre_nitrogen_compat(self):
'''
Asserts that pre-Nitrogen API key ``expr_form`` is still accepted.
This is what minions before Nitrogen would issue.
'''
self.test_mine_get(tgt_type_key='expr_form')