salt/tests/unit/utils/test_verify.py
2019-08-15 23:09:08 +00:00

326 lines
12 KiB
Python

# -*- coding: utf-8 -*-
'''
Test the verification routines
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import getpass
import os
import sys
import stat
import shutil
import tempfile
import socket
# Import third party libs
if sys.platform.startswith('win'):
import win32file
else:
import resource
# Import Salt Testing libs
from tests.support.unit import skipIf, TestCase
from tests.support.paths import TMP
from tests.support.helpers import (
requires_network,
TestsLoggingHandler
)
from tests.support.mock import (
MagicMock,
patch,
NO_MOCK,
NO_MOCK_REASON
)
# Import salt libs
import salt.utils.files
import salt.utils.platform
from salt.utils.verify import (
check_user,
verify_env,
verify_socket,
zmq_version,
check_max_open_files,
valid_id,
log,
verify_log,
verify_logs_filter,
verify_log_files,
)
# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
class TestVerify(TestCase):
'''
Verify module tests
'''
def test_valid_id_exception_handler(self):
'''
Ensure we just return False if we pass in invalid or undefined paths.
Refs #8259
'''
opts = {'pki_dir': '/tmp/whatever'}
self.assertFalse(valid_id(opts, None))
def test_valid_id_pathsep(self):
'''
Path separators in id should make it invalid
'''
opts = {'pki_dir': '/tmp/whatever'}
# We have to test both path separators because os.path.normpath will
# convert forward slashes to backslashes on Windows.
for pathsep in ('/', '\\'):
self.assertFalse(valid_id(opts, pathsep.join(('..', 'foobar'))))
def test_zmq_verify(self):
self.assertTrue(zmq_version())
def test_zmq_verify_insufficient(self):
import zmq
with patch.object(zmq, '__version__', '2.1.0'):
self.assertFalse(zmq_version())
def test_user(self):
self.assertTrue(check_user(getpass.getuser()))
def test_no_user(self):
# Catch sys.stderr here since no logging is configured and
# check_user WILL write to sys.stderr
class FakeWriter(object):
def __init__(self):
self.output = ""
def write(self, data):
self.output += data
stderr = sys.stderr
writer = FakeWriter()
sys.stderr = writer
# Now run the test
if sys.platform.startswith('win'):
self.assertTrue(check_user('nouser'))
else:
self.assertFalse(check_user('nouser'))
# Restore sys.stderr
sys.stderr = stderr
if writer.output != 'CRITICAL: User not found: "nouser"\n':
# If there's a different error catch, write it to sys.stderr
sys.stderr.write(writer.output)
@skipIf(salt.utils.platform.is_windows(), 'No verify_env Windows')
def test_verify_env(self):
root_dir = tempfile.mkdtemp(dir=TMP)
var_dir = os.path.join(root_dir, 'var', 'log', 'salt')
key_dir = os.path.join(root_dir, 'key_dir')
verify_env([var_dir], getpass.getuser(), root_dir=root_dir)
self.assertTrue(os.path.exists(var_dir))
dir_stat = os.stat(var_dir)
self.assertEqual(dir_stat.st_uid, os.getuid())
self.assertEqual(dir_stat.st_mode & stat.S_IRWXU, stat.S_IRWXU)
self.assertEqual(dir_stat.st_mode & stat.S_IRWXG, 40)
self.assertEqual(dir_stat.st_mode & stat.S_IRWXO, 5)
@requires_network(only_local_network=True)
def test_verify_socket(self):
self.assertTrue(verify_socket('', 18000, 18001))
if socket.has_ipv6:
# Only run if Python is built with IPv6 support; otherwise
# this will just fail.
try:
self.assertTrue(verify_socket('::', 18000, 18001))
except socket.error as serr:
# Python has IPv6 enabled, but the system cannot create
# IPv6 sockets (otherwise the test would return a bool)
# - skip the test
#
# FIXME - possibly emit a message that the system does
# not support IPv6.
pass
def test_max_open_files(self):
with TestsLoggingHandler() as handler:
logmsg_dbg = (
'DEBUG:This salt-master instance has accepted {0} minion keys.'
)
logmsg_chk = (
'{0}:The number of accepted minion keys({1}) should be lower '
'than 1/4 of the max open files soft setting({2}). According '
'to the system\'s hard limit, there\'s still a margin of {3} '
'to raise the salt\'s max_open_files setting. Please consider '
'raising this value.'
)
logmsg_crash = (
'{0}:The number of accepted minion keys({1}) should be lower '
'than 1/4 of the max open files soft setting({2}). '
'salt-master will crash pretty soon! According to the '
'system\'s hard limit, there\'s still a margin of {3} to '
'raise the salt\'s max_open_files setting. Please consider '
'raising this value.'
)
if sys.platform.startswith('win'):
logmsg_crash = (
'{0}:The number of accepted minion keys({1}) should be lower '
'than 1/4 of the max open files soft setting({2}). '
'salt-master will crash pretty soon! Please consider '
'raising this value.'
)
if sys.platform.startswith('win'):
# Check the Windows API for more detail on this
# http://msdn.microsoft.com/en-us/library/xt874334(v=vs.71).aspx
# and the python binding http://timgolden.me.uk/pywin32-docs/win32file.html
mof_s = mof_h = win32file._getmaxstdio()
else:
mof_s, mof_h = resource.getrlimit(resource.RLIMIT_NOFILE)
tempdir = tempfile.mkdtemp(prefix='fake-keys')
keys_dir = os.path.join(tempdir, 'minions')
os.makedirs(keys_dir)
mof_test = 256
if sys.platform.startswith('win'):
win32file._setmaxstdio(mof_test)
else:
resource.setrlimit(resource.RLIMIT_NOFILE, (mof_test, mof_h))
try:
prev = 0
for newmax, level in ((24, None), (66, 'INFO'),
(127, 'WARNING'), (196, 'CRITICAL')):
for n in range(prev, newmax):
kpath = os.path.join(keys_dir, six.text_type(n))
with salt.utils.files.fopen(kpath, 'w') as fp_:
fp_.write(str(n)) # future lint: disable=blacklisted-function
opts = {
'max_open_files': newmax,
'pki_dir': tempdir
}
check_max_open_files(opts)
if level is None:
# No log message is triggered, only the DEBUG one which
# tells us how many minion keys were accepted.
self.assertEqual(
[logmsg_dbg.format(newmax)], handler.messages
)
else:
self.assertIn(
logmsg_dbg.format(newmax), handler.messages
)
self.assertIn(
logmsg_chk.format(
level,
newmax,
mof_test,
mof_test - newmax if sys.platform.startswith('win') else mof_h - newmax,
),
handler.messages
)
handler.clear()
prev = newmax
newmax = mof_test
for n in range(prev, newmax):
kpath = os.path.join(keys_dir, six.text_type(n))
with salt.utils.files.fopen(kpath, 'w') as fp_:
fp_.write(str(n)) # future lint: disable=blacklisted-function
opts = {
'max_open_files': newmax,
'pki_dir': tempdir
}
check_max_open_files(opts)
self.assertIn(logmsg_dbg.format(newmax), handler.messages)
self.assertIn(
logmsg_crash.format(
'CRITICAL',
newmax,
mof_test,
mof_test - newmax if sys.platform.startswith('win') else mof_h - newmax,
),
handler.messages
)
handler.clear()
except IOError as err:
if err.errno == 24:
# Too many open files
self.skipTest('We\'ve hit the max open files setting')
raise
finally:
if sys.platform.startswith('win'):
win32file._setmaxstdio(mof_h)
else:
resource.setrlimit(resource.RLIMIT_NOFILE, (mof_s, mof_h))
shutil.rmtree(tempdir)
@skipIf(NO_MOCK, NO_MOCK_REASON)
def test_verify_log(self):
'''
Test that verify_log works as expected
'''
message = 'Insecure logging configuration detected! Sensitive data may be logged.'
mock_cheese = MagicMock()
with patch.object(log, 'warning', mock_cheese):
verify_log({'log_level': 'cheeseshop'})
mock_cheese.assert_called_once_with(message)
mock_trace = MagicMock()
with patch.object(log, 'warning', mock_trace):
verify_log({'log_level': 'trace'})
mock_trace.assert_called_once_with(message)
mock_none = MagicMock()
with patch.object(log, 'warning', mock_none):
verify_log({})
mock_none.assert_called_once_with(message)
mock_info = MagicMock()
with patch.object(log, 'warning', mock_info):
verify_log({'log_level': 'info'})
self.assertTrue(mock_info.call_count == 0)
class TestVerifyLog(TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_verify_logs_filter(self):
filtered = verify_logs_filter(
['udp://foo', 'tcp://bar', '/tmp/foo', 'file://tmp/bar']
)
assert filtered == ['/tmp/foo'], filtered
@skipIf(salt.utils.platform.is_windows(), 'Not applicable on Windows')
def test_verify_log_files_udp_scheme(self):
verify_log_files(['udp://foo'], getpass.getuser())
self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), 'udp:')))
@skipIf(salt.utils.platform.is_windows(), 'Not applicable on Windows')
def test_verify_log_files_tcp_scheme(self):
verify_log_files(['udp://foo'], getpass.getuser())
self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), 'tcp:')))
@skipIf(salt.utils.platform.is_windows(), 'Not applicable on Windows')
def test_verify_log_files_file_scheme(self):
verify_log_files(['file://{}'], getpass.getuser())
self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), 'file:')))
@skipIf(salt.utils.platform.is_windows(), 'Not applicable on Windows')
def test_verify_log_files(self):
path = os.path.join(self.tmpdir, 'foo', 'bar.log')
self.assertFalse(os.path.exists(path))
verify_log_files([path], getpass.getuser())
self.assertTrue(os.path.exists(path))