salt/tests/unit/states/test_ldap.py
Erik Johnson 3184168365 Use explicit unicode strings + break up salt.utils
This PR is part of what will be an ongoing effort to use explicit
unicode strings in Salt. Because Python 3 does not suport Python 2's raw
unicode string syntax (i.e. `ur'\d+'`), we must use
`salt.utils.locales.sdecode()` to ensure that the raw string is unicode.
However, because of how `salt/utils/__init__.py` has evolved into the
hulking monstrosity it is today, this means importing a large module in
places where it is not needed, which could negatively impact
performance. For this reason, this PR also breaks out some of the
functions from `salt/utils/__init__.py` into new/existing modules under
`salt/utils/`. The long term goal will be that the modules within this
directory do not depend on importing `salt.utils`.

A summary of the changes in this PR is as follows:

* Moves the following functions from `salt.utils` to new locations
  (including a deprecation warning if invoked from `salt.utils`):
  `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`,
  `dequote`, `is_hex`, `is_bin_str`, `rand_string`,
  `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`,
  `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`,
  `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`,
  `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`,
  `is_openbsd`, `is_aix`
* Moves the functions already deprecated by @rallytime to the bottom of
  `salt/utils/__init__.py` for better organization, so we can keep the
  deprecated ones separate from the ones yet to be deprecated as we
  continue to break up `salt.utils`
* Updates `salt/*.py` and all files under `salt/client/` to use explicit
  unicode string literals.
* Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils
  import foo` becomes `import salt.utils.foo as foo`).
* Renames the `test.rand_str` function to `test.random_hash` to more
  accurately reflect what it does
* Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`)
  such that it returns a string matching the passed size. Previously
  this function would get `size` bytes from `os.urandom()`,
  base64-encode it, and return the result, which would in most cases not
  be equal to the passed size.
2017-08-08 13:33:43 -05:00

314 lines
9.5 KiB
Python

# -*- coding: utf-8 -*-
'''Test cases for the ``ldap`` state module
This code is gross. I started out trying to remove some of the
duplicate code in the test cases, and before I knew it the test code
was an ugly second implementation.
I'm leaving it for now, but this should really be gutted and replaced
with something sensible.
'''
from __future__ import absolute_import
import copy
from salt.ext import six
import salt.states.ldap
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON
# emulates the LDAP database. each key is the DN of an entry and it
# maps to a dict which maps attribute names to sets of values.
db = {}
def _init_db(newdb=None):
if newdb is None:
newdb = {}
global db
db = newdb
def _complex_db():
return {
'dnfoo': {
'attrfoo1': set((
'valfoo1.1',
'valfoo1.2',
)),
'attrfoo2': set((
'valfoo2.1',
)),
},
'dnbar': {
'attrbar1': set((
'valbar1.1',
'valbar1.2',
)),
'attrbar2': set((
'valbar2.1',
)),
},
}
class _dummy_ctx(object):
def __init__(self):
pass
def __enter__(self):
return self
def __exit__(self, *exc):
pass
def _dummy_connect(connect_spec):
return _dummy_ctx()
def _dummy_search(connect_spec, base, scope):
if base not in db:
return {}
return {base: dict(((attr, sorted(db[base][attr]))
for attr in db[base]
if len(db[base][attr])))}
def _dummy_add(connect_spec, dn, attributes):
assert dn not in db
assert len(attributes)
db[dn] = {}
for attr, vals in six.iteritems(attributes):
assert len(vals)
db[dn][attr] = set(vals)
return True
def _dummy_delete(connect_spec, dn):
assert dn in db
del db[dn]
return True
def _dummy_change(connect_spec, dn, before, after):
assert before != after
assert len(before)
assert len(after)
assert dn in db
e = db[dn]
assert e == before
all_attrs = set()
all_attrs.update(before)
all_attrs.update(after)
directives = []
for attr in all_attrs:
if attr not in before:
assert attr in after
assert len(after[attr])
directives.append(('add', attr, after[attr]))
elif attr not in after:
assert attr in before
assert len(before[attr])
directives.append(('delete', attr, ()))
else:
assert len(before[attr])
assert len(after[attr])
to_del = before[attr] - after[attr]
if len(to_del):
directives.append(('delete', attr, to_del))
to_add = after[attr] - before[attr]
if len(to_add):
directives.append(('add', attr, to_add))
return _dummy_modify(connect_spec, dn, directives)
def _dummy_modify(connect_spec, dn, directives):
assert dn in db
e = db[dn]
for op, attr, vals in directives:
if op == 'add':
assert len(vals)
existing_vals = e.setdefault(attr, set())
for val in vals:
assert val not in existing_vals
existing_vals.add(val)
elif op == 'delete':
assert attr in e
existing_vals = e[attr]
assert len(existing_vals)
if not len(vals):
del e[attr]
continue
for val in vals:
assert val in existing_vals
existing_vals.remove(val)
if not len(existing_vals):
del e[attr]
elif op == 'replace':
e.pop(attr, None)
e[attr] = set(vals)
else:
raise ValueError()
return True
def _dump_db(d=None):
if d is None:
d = db
return dict(((dn, dict(((attr, sorted(d[dn][attr]))
for attr in d[dn])))
for dn in d))
@skipIf(NO_MOCK, NO_MOCK_REASON)
class LDAPTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
salt_dunder = {}
for fname in ('connect', 'search', 'add', 'delete', 'change', 'modify'):
salt_dunder['ldap3.{}'.format(fname)] = globals()['_dummy_' + fname]
return {
salt.states.ldap: {
'__opts__': {'test': False},
'__salt__': salt_dunder
}
}
def _test_helper(self, init_db, expected_ret, replace,
delete_others=False):
_init_db(copy.deepcopy(init_db))
old = _dump_db()
new = _dump_db()
expected_db = copy.deepcopy(init_db)
for dn, attrs in six.iteritems(replace):
for attr, vals in six.iteritems(attrs):
if len(vals):
new.setdefault(dn, {})[attr] = sorted(set(vals))
expected_db.setdefault(dn, {})[attr] = set(vals)
elif dn in expected_db:
new[dn].pop(attr, None)
expected_db[dn].pop(attr, None)
if not len(expected_db.get(dn, {})):
new.pop(dn, None)
expected_db.pop(dn, None)
if delete_others:
dn_to_delete = set()
for dn, attrs in six.iteritems(expected_db):
if dn in replace:
to_delete = set()
for attr, vals in six.iteritems(attrs):
if attr not in replace[dn]:
to_delete.add(attr)
for attr in to_delete:
del attrs[attr]
del new[dn][attr]
if not len(attrs):
dn_to_delete.add(dn)
for dn in dn_to_delete:
del new[dn]
del expected_db[dn]
name = 'ldapi:///'
expected_ret['name'] = name
expected_ret.setdefault('result', True)
expected_ret.setdefault('comment', 'Successfully updated LDAP entries')
expected_ret.setdefault('changes', dict(
((dn, {'old': dict((attr, vals)
for attr, vals in six.iteritems(old[dn])
if vals != new.get(dn, {}).get(attr, ()))
if dn in old else None,
'new': dict((attr, vals)
for attr, vals in six.iteritems(new[dn])
if vals != old.get(dn, {}).get(attr, ()))
if dn in new else None})
for dn in replace
if old.get(dn, {}) != new.get(dn, {}))))
entries = [{dn: [{'replace': attrs},
{'delete_others': delete_others}]}
for dn, attrs in six.iteritems(replace)]
actual = salt.states.ldap.managed(name, entries)
self.assertDictEqual(expected_ret, actual)
self.assertDictEqual(expected_db, db)
def _test_helper_success(self, init_db, replace, delete_others=False):
self._test_helper(init_db, {}, replace, delete_others)
def _test_helper_nochange(self, init_db, replace, delete_others=False):
expected = {
'changes': {},
'comment': 'LDAP entries already set',
}
self._test_helper(init_db, expected, replace, delete_others)
def test_managed_empty(self):
_init_db()
name = 'ldapi:///'
expected = {
'name': name,
'changes': {},
'result': True,
'comment': 'LDAP entries already set',
}
actual = salt.states.ldap.managed(name, {})
self.assertDictEqual(expected, actual)
def test_managed_add_entry(self):
self._test_helper_success(
{},
{'dummydn': {'foo': ['bar', 'baz']}})
def test_managed_add_attr(self):
self._test_helper_success(
_complex_db(),
{'dnfoo': {'attrfoo3': ['valfoo3.1']}})
def test_managed_simplereplace(self):
self._test_helper_success(
_complex_db(),
{'dnfoo': {'attrfoo1': ['valfoo1.3']}})
def test_managed_deleteattr(self):
self._test_helper_success(
_complex_db(),
{'dnfoo': {'attrfoo1': []}})
def test_managed_deletenonexistattr(self):
self._test_helper_nochange(
_complex_db(),
{'dnfoo': {'dummyattr': []}})
def test_managed_deleteentry(self):
self._test_helper_success(
_complex_db(),
{'dnfoo': {}},
True)
def test_managed_deletenonexistentry(self):
self._test_helper_nochange(
_complex_db(),
{'dummydn': {}},
True)
def test_managed_deletenonexistattrinnonexistentry(self):
self._test_helper_nochange(
_complex_db(),
{'dummydn': {'dummyattr': []}})
def test_managed_add_attr_delete_others(self):
self._test_helper_success(
_complex_db(),
{'dnfoo': {'dummyattr': ['dummyval']}},
True)
def test_managed_no_net_change(self):
self._test_helper_nochange(
_complex_db(),
{'dnfoo': {'attrfoo1': ['valfoo1.2', 'valfoo1.1']}})
def test_managed_repeated_values(self):
self._test_helper_success(
{},
{'dummydn': {'dummyattr': ['dummyval', 'dummyval']}})