salt/tests/unit/utils/test_minions.py

512 lines
19 KiB
Python

# -*- coding: utf-8 -*-
# Import python libs
from __future__ import absolute_import, unicode_literals
import sys
# Import Salt Libs
import salt.utils.minions
# Import Salt Testing Libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
patch,
MagicMock,
)
NODEGROUPS = {
'group1': 'L@host1,host2,host3',
'group2': ['G@foo:bar', 'or', 'web1*'],
'group3': ['N@group1', 'or', 'N@group2'],
'group4': ['host4', 'host5', 'host6'],
}
EXPECTED = {
'group1': ['L@host1,host2,host3'],
'group2': ['G@foo:bar', 'or', 'web1*'],
'group3': ['(', '(', 'L@host1,host2,host3', ')', 'or', '(', 'G@foo:bar', 'or', 'web1*', ')', ')'],
'group4': ['L@host4,host5,host6'],
}
class MinionsTestCase(TestCase):
'''
TestCase for salt.utils.minions module functions
'''
def test_nodegroup_comp(self):
'''
Test a simple string nodegroup
'''
for nodegroup in NODEGROUPS:
expected = EXPECTED[nodegroup]
ret = salt.utils.minions.nodegroup_comp(nodegroup, NODEGROUPS)
self.assertEqual(ret, expected)
class CkMinionsTestCase(TestCase):
'''
TestCase for salt.utils.minions.CkMinions class
'''
def setUp(self):
self.ckminions = salt.utils.minions.CkMinions({'minion_data_cache': True})
def test_spec_check(self):
# Test spec-only rule
auth_list = ['@runner']
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'testarg', {}, 'runner')
mock_ret = {'error': {'name': 'SaltInvocationError',
'message': 'A command invocation error occurred: Check syntax.'}}
self.assertDictEqual(mock_ret, ret)
# Test spec in plural form
auth_list = ['@runners']
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
self.assertFalse(ret)
# Test spec with module.function restriction
auth_list = [{'@runner': 'test.arg'}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'wheel')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'tes.arg', {}, 'runner')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'test.ar', {}, 'runner')
self.assertFalse(ret)
# Test function name is a regex
auth_list = [{'@runner': 'test.arg.*some'}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'test.argsome', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.arg_aaa_some', {}, 'runner')
self.assertTrue(ret)
# Test a list of funcs
auth_list = [{'@runner': ['test.arg', 'jobs.active']}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.active', {}, 'runner')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.arg', {}, 'runner')
self.assertFalse(ret)
# Test args-kwargs rules
auth_list = [{
'@runner': {
'test.arg': {
'args': ['1', '2'],
'kwargs': {
'aaa': 'bbb',
'ccc': 'ddd'
}
}
}
}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = {
'arg': ['1', '2', '3'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd', 'zzz': 'zzz'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddc'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '3'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = {
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '2'],
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
# Test kwargs only
auth_list = [{
'@runner': {
'test.arg': {
'kwargs': {
'aaa': 'bbb',
'ccc': 'ddd'
}
}
}
}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
# Test args only
auth_list = [{
'@runner': {
'test.arg': {
'args': ['1', '2']
}
}
}]
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertFalse(ret)
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
# Test list of args
auth_list = [{'@runner': [{'test.arg': [{'args': ['1', '2'],
'kwargs': {'aaa': 'bbb',
'ccc': 'ddd'
}
},
{'args': ['2', '3'],
'kwargs': {'aaa': 'aaa',
'ccc': 'ccc'
}
}]
}]
}]
args = {
'arg': ['1', '2'],
'kwarg': {'aaa': 'bbb', 'ccc': 'ddd'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = {
'arg': ['2', '3'],
'kwarg': {'aaa': 'aaa', 'ccc': 'ccc'}
}
ret = self.ckminions.spec_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
# Test @module form
auth_list = ['@jobs']
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'wheel')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'test.arg', {}, 'runner')
self.assertFalse(ret)
ret = self.ckminions.spec_check(auth_list, 'job.arg', {}, 'runner')
self.assertFalse(ret)
# Test @module: function
auth_list = [{'@jobs': 'active'}]
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'wheel')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.active_jobs', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.activ', {}, 'runner')
self.assertFalse(ret)
# Test @module: [functions]
auth_list = [{'@jobs': ['active', 'li']}]
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.list_jobs', {}, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.last_run', {}, 'runner')
self.assertFalse(ret)
# Test @module: function with args
auth_list = [{'@jobs': {'active': {'args': ['1', '2'],
'kwargs': {'a': 'b', 'c': 'd'}}}}]
args = {'arg': ['1', '2'],
'kwarg': {'a': 'b', 'c': 'd'}}
ret = self.ckminions.spec_check(auth_list, 'jobs.active', args, 'runner')
self.assertTrue(ret)
ret = self.ckminions.spec_check(auth_list, 'jobs.active', {}, 'runner')
self.assertFalse(ret)
@patch('salt.utils.minions.CkMinions._pki_minions', MagicMock(return_value=['alpha', 'beta', 'gamma']))
def test_auth_check(self):
# Test function-only rule
auth_list = ['test.ping']
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'alpha')
self.assertTrue(ret)
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
self.assertFalse(ret)
# Test minion and function
auth_list = [{'alpha': 'test.ping'}]
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'alpha')
self.assertTrue(ret)
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
self.assertFalse(ret)
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'beta')
self.assertFalse(ret)
# Test function list
auth_list = [{'*': ['test.*', 'saltutil.cmd']}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'alpha')
self.assertTrue(ret)
ret = self.ckminions.auth_check(auth_list, 'test.ping', None, 'beta')
self.assertTrue(ret)
ret = self.ckminions.auth_check(auth_list, 'saltutil.cmd', None, 'gamma')
self.assertTrue(ret)
ret = self.ckminions.auth_check(auth_list, 'saltutil.running', None, 'gamma')
self.assertFalse(ret)
# Test an args and kwargs rule
auth_list = [{
'alpha': {
'test.arg': {
'args': ['1', '2'],
'kwargs': {
'aaa': 'bbb',
'ccc': 'ddd'
}
}
}
}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', None, 'runner')
self.assertFalse(ret)
ret = self.ckminions.auth_check(auth_list, 'test.arg', [], 'runner')
self.assertFalse(ret)
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = ['1', '2', '3', {'aaa': 'bbb', 'ccc': 'ddd', 'eee': 'fff', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = ['1', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = ['1', '2', {'aaa': 'bbb', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = ['1', '3', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'fff', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertFalse(ret)
# Test kwargs only rule
auth_list = [{
'alpha': {
'test.arg': {
'kwargs': {
'aaa': 'bbb',
'ccc': 'ddd'
}
}
}
}]
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = [{'aaa': 'bbb', 'ccc': 'ddd', 'eee': 'fff', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
# Test args only rule
auth_list = [{
'alpha': {
'test.arg': {
'args': ['1', '2'],
}
}
}]
args = ['1', '2', {'aaa': 'bbb', 'ccc': 'ddd', '__kwarg__': True}]
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
args = ['1', '2']
ret = self.ckminions.auth_check(auth_list, 'test.arg', args, 'runner')
self.assertTrue(ret)
@skipIf(sys.version_info < (2, 7), 'Python 2.7 needed for dictionary equality assertions')
class TargetParseTestCase(TestCase):
def test_parse_grains_target(self):
'''
Ensure proper parsing for grains
'''
g_tgt = 'G@a:b'
ret = salt.utils.minions.parse_target(g_tgt)
self.assertDictEqual(ret, {'engine': 'G', 'pattern': 'a:b', 'delimiter': None})
def test_parse_grains_pcre_target(self):
'''
Ensure proper parsing for grains PCRE matching
'''
p_tgt = 'P@a:b'
ret = salt.utils.minions.parse_target(p_tgt)
self.assertDictEqual(ret, {'engine': 'P', 'pattern': 'a:b', 'delimiter': None})
def test_parse_pillar_pcre_target(self):
'''
Ensure proper parsing for pillar PCRE matching
'''
j_tgt = 'J@a:b'
ret = salt.utils.minions.parse_target(j_tgt)
self.assertDictEqual(ret, {'engine': 'J', 'pattern': 'a:b', 'delimiter': None})
def test_parse_list_target(self):
'''
Ensure proper parsing for list matching
'''
l_tgt = 'L@a:b'
ret = salt.utils.minions.parse_target(l_tgt)
self.assertDictEqual(ret, {'engine': 'L', 'pattern': 'a:b', 'delimiter': None})
def test_parse_nodegroup_target(self):
'''
Ensure proper parsing for pillar matching
'''
n_tgt = 'N@a:b'
ret = salt.utils.minions.parse_target(n_tgt)
self.assertDictEqual(ret, {'engine': 'N', 'pattern': 'a:b', 'delimiter': None})
def test_parse_subnet_target(self):
'''
Ensure proper parsing for subnet matching
'''
s_tgt = 'S@a:b'
ret = salt.utils.minions.parse_target(s_tgt)
self.assertDictEqual(ret, {'engine': 'S', 'pattern': 'a:b', 'delimiter': None})
def test_parse_minion_pcre_target(self):
'''
Ensure proper parsing for minion PCRE matching
'''
e_tgt = 'E@a:b'
ret = salt.utils.minions.parse_target(e_tgt)
self.assertDictEqual(ret, {'engine': 'E', 'pattern': 'a:b', 'delimiter': None})
def test_parse_range_target(self):
'''
Ensure proper parsing for range matching
'''
r_tgt = 'R@a:b'
ret = salt.utils.minions.parse_target(r_tgt)
self.assertDictEqual(ret, {'engine': 'R', 'pattern': 'a:b', 'delimiter': None})
def test_parse_multiword_target(self):
'''
Ensure proper parsing for multi-word targets
Refs https://github.com/saltstack/salt/issues/37231
'''
mw_tgt = 'G@a:b c'
ret = salt.utils.minions.parse_target(mw_tgt)
self.assertEqual(ret['pattern'], 'a:b c')
class NodegroupCompTest(TestCase):
'''
Test nodegroup comparisons found in
salt.utils.minions.nodgroup_comp()
'''
def test_simple_nodegroup(self):
'''
Smoke test a very simple nodegroup. No recursion.
'''
simple_nodegroup = {'group1': 'L@foo.domain.com,bar.domain.com,baz.domain.com or bl*.domain.com'}
ret = salt.utils.minions.nodegroup_comp('group1', simple_nodegroup)
expected_ret = ['L@foo.domain.com,bar.domain.com,baz.domain.com', 'or', 'bl*.domain.com']
self.assertListEqual(ret, expected_ret)
def test_simple_expression_nodegroup(self):
'''
Smoke test a nodegroup with a simple expression. No recursion.
'''
simple_nodegroup = {'group1': '[foo,bar,baz].domain.com'}
ret = salt.utils.minions.nodegroup_comp('group1', simple_nodegroup)
expected_ret = ['E@[foo,bar,baz].domain.com']
self.assertListEqual(ret, expected_ret)
def test_simple_recurse(self):
'''
Test a case where one nodegroup contains a second nodegroup
'''
referenced_nodegroups = {
'group1': 'L@foo.domain.com,bar.domain.com,baz.domain.com or bl*.domain.com',
'group2': 'G@os:Debian and N@group1'
}
ret = salt.utils.minions.nodegroup_comp('group2', referenced_nodegroups)
expected_ret = [
'(',
'G@os:Debian',
'and',
'(',
'L@foo.domain.com,bar.domain.com,baz.domain.com',
'or',
'bl*.domain.com',
')',
')'
]
self.assertListEqual(ret, expected_ret)
def test_circular_nodegroup_reference(self):
'''
Test to see what happens if A refers to B
and B in turn refers back to A
'''
referenced_nodegroups = {
'group1': 'N@group2',
'group2': 'N@group1'
}
# If this works, it should also print an error to the console
ret = salt.utils.minions.nodegroup_comp('group1', referenced_nodegroups)
self.assertEqual(ret, [])