Merge branch '2018.3' into 2018.3

This commit is contained in:
Lex Vella 2018-11-27 15:10:29 -05:00 committed by GitHub
commit 42e4ab3c53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 660 additions and 51 deletions

View File

@ -1247,7 +1247,7 @@ def user_exists(user,
args['password'] = password_hash
if run_verify:
if not verify_login(user, host, password):
if not verify_login(user, password, **connection_args):
return False
try:
_execute(cur, qry, args)
@ -2235,7 +2235,7 @@ def showglobal(**connection_args):
return rtnv
def verify_login(user, host='localhost', password=None, **connection_args):
def verify_login(user, password=None, **connection_args):
'''
Attempt to login using the provided credentials.
If successful, return true. Otherwise, return False.
@ -2244,11 +2244,10 @@ def verify_login(user, host='localhost', password=None, **connection_args):
.. code-block:: bash
salt '*' mysql.verify_login root localhost password
salt '*' mysql.verify_login root password
'''
# Override the connection args
# Override the connection args for username and password
connection_args['connection_user'] = user
connection_args['connection_host'] = host
connection_args['connection_pass'] = password
dbc = _connect(**connection_args)

View File

@ -1736,14 +1736,15 @@ class State(object):
try:
ret = self.states[cdata['full']](*cdata['args'],
**cdata['kwargs'])
except Exception:
except Exception as exc:
log.debug('An exception occurred in this state: %s', exc,
exc_info_on_loglevel=logging.DEBUG)
trb = traceback.format_exc()
ret = {
'result': False,
'name': name,
'changes': {},
'comment': 'An exception occurred in this state: {0}'.format(
trb)
'comment': 'An exception occurred in this state: {0}'.format(trb)
}
utc_finish_time = datetime.datetime.utcnow()
@ -1919,7 +1920,9 @@ class State(object):
self.states.inject_globals = {}
if 'check_cmd' in low and '{0[state]}.mod_run_check_cmd'.format(low) not in self.states:
ret.update(self._run_check_cmd(low))
except Exception:
except Exception as exc:
log.debug('An exception occurred in this state: %s', exc,
exc_info_on_loglevel=logging.DEBUG)
trb = traceback.format_exc()
# There are a number of possibilities to not have the cdata
# populated with what we might have expected, so just be smart
@ -1934,8 +1937,7 @@ class State(object):
'result': False,
'name': name,
'changes': {},
'comment': 'An exception occurred in this state: {0}'.format(
trb)
'comment': 'An exception occurred in this state: {0}'.format(trb)
}
finally:
if low.get('__prereq__'):

View File

@ -13,6 +13,7 @@ import signal
import hashlib
import logging
import weakref
import threading
from random import randint
# Import Salt Libs
@ -760,6 +761,9 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
Encapsulate synchronous operations for a publisher channel
'''
_sock_data = threading.local()
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(self.opts) # TODO: in init?
@ -795,9 +799,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_sock.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
pub_sock.setsockopt(zmq.LINGER, -1)
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_sock.setsockopt(zmq.LINGER, -1)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
@ -860,15 +866,14 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
raise exc
except KeyboardInterrupt:
# Cleanly close the sockets if we're shutting down
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if pull_sock.closed is False:
pull_sock.setsockopt(zmq.LINGER, 1)
pull_sock.close()
if context.closed is False:
context.term()
log.trace('Publish daemon caught Keyboard interupt, tearing down')
# Cleanly close the sockets if we're shutting down
if pub_sock.closed is False:
pub_sock.close()
if pull_sock.closed is False:
pull_sock.close()
if context.closed is False:
context.term()
def pre_fork(self, process_manager, kwargs=None):
'''
@ -880,23 +885,29 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
process_manager.add_process(self._publish_daemon, kwargs=kwargs)
def publish(self, load):
@property
def pub_sock(self):
'''
Publish "load" to minions
:param dict load: A load to be sent across the wire to minions
This thread's zmq publisher socket. This socket is stored on the class
so that multiple instantiations in the same thread will re-use a single
zmq socket.
'''
payload = {'enc': 'aes'}
try:
return self._sock_data.sock
except AttributeError:
pass
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
# Send 0MQ to the publisher
context = zmq.Context(1)
pub_sock = context.socket(zmq.PUSH)
def pub_connect(self):
'''
Create and connect this thread's zmq socket. If a publisher socket
already exists "pub_close" is called before creating and connecting a
new socket.
'''
if self.pub_sock:
self.pub_close()
ctx = zmq.Context.instance()
self._sock_data.sock = ctx.socket(zmq.PUSH)
self.pub_sock.setsockopt(zmq.LINGER, -1)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_publish_pull', 4514)
@ -905,7 +916,33 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
log.debug("Connecting to pub server: %s", pull_uri)
self.pub_sock.connect(pull_uri)
return self._sock_data.sock
def pub_close(self):
'''
Disconnect an existing publisher socket and remove it from the local
thread's cache.
'''
if hasattr(self._sock_data, 'sock'):
self._sock_data.sock.close()
delattr(self._sock_data, 'sock')
def publish(self, load):
'''
Publish "load" to minions. This send the load to the publisher daemon
process with does the actual sending to minions.
:param dict load: A load to be sent across the wire to minions
'''
payload = {'enc': 'aes'}
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
int_payload = {'payload': self.serial.dumps(payload)}
# add some targeting stuff for lists only (for now)
@ -928,12 +965,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'Sending payload to publish daemon. jid=%s size=%d',
load.get('jid', None), len(payload),
)
pub_sock.send(payload)
if not self.pub_sock:
self.pub_connect()
self.pub_sock.send(payload)
log.debug('Sent payload to publish daemon.')
pub_sock.close()
context.term()
class AsyncReqMessageClientPool(salt.transport.MessageClientPool):
'''

View File

@ -5,9 +5,16 @@ Decorators for salt.state
:codeauthor: :email:`Bo Maryniuk (bo@suse.de)`
'''
# Import Python libs
from __future__ import absolute_import, unicode_literals
import logging
# Import salt libs
import salt.utils.stringutils
from salt.exceptions import SaltException
log = logging.getLogger(__name__)
class OutputUnifier(object):
def __init__(self, *policies):
@ -24,12 +31,14 @@ class OutputUnifier(object):
for pls in self.policies:
try:
result = pls(result)
except Exception as ex:
except Exception as exc:
log.debug('An exception occurred in this state: %s', exc,
exc_info_on_loglevel=logging.DEBUG)
result = {
'result': False,
'name': 'later',
'changes': {},
'comment': 'An exception occurred in this state: {0}'.format(ex)
'comment': 'An exception occurred in this state: {0}'.format(exc)
}
return result
return _func
@ -75,7 +84,9 @@ class OutputUnifier(object):
:return:
'''
if isinstance(result.get('comment'), list):
result['comment'] = '\n'.join([str(elm) for elm in result['comment']])
result['comment'] = u'\n'.join([
salt.utils.stringutils.to_unicode(elm) for elm in result['comment']
])
if result.get('result') is not None:
result['result'] = bool(result['result'])

View File

@ -39,6 +39,98 @@ from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)
def get_running_jobs(opts):
'''
Return the running jobs on this minion
'''
ret = []
proc_dir = os.path.join(opts['cachedir'], 'proc')
if not os.path.isdir(proc_dir):
return ret
for fn_ in os.listdir(proc_dir):
path = os.path.join(proc_dir, fn_)
try:
data = _read_proc_file(path, opts)
if data is not None:
ret.append(data)
except (IOError, OSError):
# proc files may be removed at any time during this process by
# the master process that is executing the JID in question, so
# we must ignore ENOENT during this process
log.trace('%s removed during processing by master process', path)
return ret
def _read_proc_file(path, opts):
'''
Return a dict of JID metadata, or None
'''
serial = salt.payload.Serial(opts)
with salt.utils.files.fopen(path, 'rb') as fp_:
buf = fp_.read()
fp_.close()
if buf:
data = serial.loads(buf)
else:
# Proc file is empty, remove
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
if not isinstance(data, dict):
# Invalid serial object
return None
if not salt.utils.process.os_is_running(data['pid']):
# The process is no longer running, clear out the file and
# continue
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
if not _check_cmdline(data):
pid = data.get('pid')
if pid:
log.warning(
'PID %s exists but does not appear to be a salt process.', pid
)
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
return None
return data
def _check_cmdline(data):
'''
In some cases where there are an insane number of processes being created
on a system a PID can get recycled or assigned to a non-Salt process.
On Linux this fn checks to make sure the PID we are checking on is actually
a Salt process.
For non-Linux systems we punt and just return True
'''
if not salt.utils.platform.is_linux():
return True
pid = data.get('pid')
if not pid:
return False
if not os.path.isdir('/proc'):
return True
path = os.path.join('/proc/{0}/cmdline'.format(pid))
if not os.path.isfile(path):
return False
try:
with salt.utils.files.fopen(path, 'rb') as fp_:
return b'salt' in fp_.read()
except (OSError, IOError):
return False
class MasterPillarUtil(object):
'''
Helper utility for easy access to targeted minion grain and
@ -721,6 +813,7 @@ def get_values_of_matching_keys(pattern_dict, user_name):
ret.extend(pattern_dict[expr])
return ret
# test code for the ConCache class
if __name__ == '__main__':

View File

@ -42,6 +42,9 @@ def running(opts):
def cache_jobs(opts, jid, ret):
'''
Write job information to cache
'''
serial = salt.payload.Serial(opts=opts)
fn_ = os.path.join(
@ -73,7 +76,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if not isinstance(data, dict):
# Invalid serial object
@ -84,7 +87,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if opts.get('multiprocessing'):
if data.get('pid') == pid:
@ -94,7 +97,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if data.get('jid') == current_thread:
return None
@ -102,7 +105,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
if not _check_cmdline(data):
@ -114,7 +117,7 @@ def _read_proc_file(path, opts):
try:
os.remove(path)
except IOError:
pass
log.debug('Unable to remove proc file %s.', path)
return None
return data

View File

@ -32,6 +32,7 @@ import salt.utils.error
import salt.utils.event
import salt.utils.files
import salt.utils.jid
import salt.utils.master
import salt.utils.minion
import salt.utils.platform
import salt.utils.process
@ -177,7 +178,11 @@ class Schedule(object):
data['run'] = True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for job in salt.utils.minion.running(self.opts):
if self.opts['__role'] == 'master':
current_jobs = salt.utils.master.get_running_jobs(self.opts)
else:
current_jobs = salt.utils.minion.running(self.opts)
for job in current_jobs:
if 'schedule' in job:
log.debug(
'schedule.handle_func: Checking job against fun '

View File

@ -15,6 +15,8 @@ import os
import sys
import glob
import time
import operator
import platform
try:
from urllib2 import urlopen
except ImportError:
@ -136,6 +138,74 @@ exec(compile(open(SALT_VERSION).read(), SALT_VERSION, 'exec'))
# ----- Helper Functions -------------------------------------------------------------------------------------------->
def _parse_op(op):
'''
>>> _parse_op('>')
'gt'
>>> _parse_op('>=')
'ge'
>>> _parse_op('=>')
'ge'
>>> _parse_op('=> ')
'ge'
>>> _parse_op('<')
'lt'
>>> _parse_op('<=')
'le'
>>> _parse_op('==')
'eq'
>>> _parse_op(' <= ')
'le'
'''
op = op.strip()
if '>' in op:
if '=' in op:
return 'ge'
else:
return 'gt'
elif '<' in op:
if '=' in op:
return 'le'
else:
return 'lt'
elif '!' in op:
return 'ne'
else:
return 'eq'
def _parse_ver(ver):
'''
>>> _parse_ver("'3.4' # pyzmq 17.1.0 stopped building wheels for python3.4")
'3.4'
>>> _parse_ver('"3.4"')
'3.4'
>>> _parse_ver('"2.6.17"')
'2.6.17'
'''
if '#' in ver:
ver, _ = ver.split('#', 1)
ver = ver.strip()
return ver.strip('\'').strip('"')
def _check_ver(pyver, op, wanted):
'''
>>> _check_ver('2.7.15', 'gt', '2.7')
True
>>> _check_ver('2.7.15', 'gt', '2.7.15')
False
>>> _check_ver('2.7.15', 'ge', '2.7.15')
True
>>> _check_ver('2.7.15', 'eq', '2.7.15')
True
'''
pyver = distutils.version.LooseVersion(pyver)
wanted = distutils.version.LooseVersion(wanted)
return getattr(operator, '__{}__'.format(op))(pyver, wanted)
def _parse_requirements_file(requirements_file):
parsed_requirements = []
with open(requirements_file) as rfh:
@ -150,7 +220,16 @@ def _parse_requirements_file(requirements_file):
# Python 3 already has futures, installing it will only break
# the current python installation whenever futures is imported
continue
parsed_requirements.append(line)
try:
pkg, pyverspec = line.rsplit(';', 1)
except ValueError:
pkg, pyverspec = line, ''
pyverspec = pyverspec.strip()
if pyverspec:
_, op, ver = pyverspec.split(' ', 2)
if not _check_ver(platform.python_version(), _parse_op(op), _parse_ver(ver)):
continue
parsed_requirements.append(pkg)
return parsed_requirements
# <---- Helper Functions ---------------------------------------------------------------------------------------------
@ -363,7 +442,6 @@ class DownloadWindowsDlls(Command):
if getattr(self.distribution, 'salt_download_windows_dlls', None) is None:
print('This command is not meant to be called on it\'s own')
exit(1)
import platform
import pip
# pip has moved many things to `_internal` starting with pip 10
if LooseVersion(pip.__version__) < LooseVersion('10.0'):
@ -941,7 +1019,6 @@ class SaltDistribution(distutils.dist.Distribution):
if IS_WINDOWS_PLATFORM:
install_requires = _parse_requirements_file(SALT_WINDOWS_REQS)
return install_requires
@property

View File

@ -219,6 +219,7 @@ salt/utils/schedule.py:
- integration.scheduler.test_eval
- integration.scheduler.test_postpone
- integration.scheduler.test_skip
- integration.scheduler.test_maxrunning
salt/utils/vt.py:
- integration.cli.test_custom_module

View File

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import
import copy
import logging
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
# Import Salt Testing Libs
from tests.support.mock import MagicMock, patch
import tests.integration as integration
# Import Salt libs
import salt.utils.schedule
from salt.modules.test import ping as ping
try:
import croniter # pylint: disable=W0611
HAS_CRONITER = True
except ImportError:
HAS_CRONITER = False
log = logging.getLogger(__name__)
ROOT_DIR = os.path.join(integration.TMP, 'schedule-unit-tests')
SOCK_DIR = os.path.join(ROOT_DIR, 'test-socks')
DEFAULT_CONFIG = salt.config.minion_config(None)
DEFAULT_CONFIG['conf_dir'] = ROOT_DIR
DEFAULT_CONFIG['root_dir'] = ROOT_DIR
DEFAULT_CONFIG['sock_dir'] = SOCK_DIR
DEFAULT_CONFIG['pki_dir'] = os.path.join(ROOT_DIR, 'pki')
DEFAULT_CONFIG['cachedir'] = os.path.join(ROOT_DIR, 'cache')
class SchedulerMaxRunningTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the pkg module
'''
def setUp(self):
with patch('salt.utils.schedule.clean_proc_dir', MagicMock(return_value=None)):
functions = {'test.ping': ping}
self.schedule = salt.utils.schedule.Schedule(copy.deepcopy(DEFAULT_CONFIG), functions, returners={})
self.schedule.opts['loop_interval'] = 1
def tearDown(self):
self.schedule.reset()
def test_maxrunning_minion(self):
'''
verify that scheduled job runs
'''
self.schedule.opts['__role'] = 'minion'
job = {
'schedule': {
'maxrunning_minion': {
'function': 'test.ping',
'seconds': 10,
'maxrunning': 1
}
}
}
job_data = {'function': 'test.ping',
'run': False,
'name': 'maxrunning_minion',
'seconds': 10,
'_seconds': 10,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': [],
'jid': '20181018165923360935',
'schedule': 'maxrunning_minion',
'pid': 15338,
'fun': 'test.sleep',
'id': 'host'}]
with patch('salt.utils.minion.running',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('test.ping',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])
def test_maxrunning_master(self):
'''
verify that scheduled job runs
'''
self.schedule.opts['__role'] = 'master'
job = {
'schedule': {
'maxrunning_master': {
'function': 'state.orch',
'args': ['test.orch_test'],
'minutes': 1,
'maxrunning': 1
}
}
}
job_data = {'function': 'state.orch',
'fun_args': ['test.orch_test'],
'run': False,
'name': 'maxrunning_master',
'minutes': 1,
'jid_include': True,
'maxrunning': 1}
# Add the job to the scheduler
self.schedule.opts.update(job)
running_data = [{'fun_args': ['test.orch_test'],
'jid': '20181018165923360935',
'schedule': 'maxrunning_master',
'pid': 15338,
'fun': 'state.orch',
'id': 'host'}]
with patch('salt.utils.master.get_running_jobs',
MagicMock(return_value=running_data)):
with patch('salt.utils.process.os_is_running',
MagicMock(return_value=True)):
ret = self.schedule._check_max_running('state.orch',
job_data,
self.schedule.opts)
self.assertIn('_skip_reason', ret)
self.assertEqual('maxrunning', ret['_skip_reason'])
self.assertEqual(False, ret['run'])

View File

@ -66,6 +66,19 @@ class MySQLTestCase(TestCase, LoaderModuleMockMixin):
password='BLUECOW'
)
with patch.object(mysql, 'version', return_value='8.0.11'):
self._test_call(mysql.user_exists,
{'sql': ('SELECT User,Host FROM mysql.user WHERE '
'User = %(user)s AND Host = %(host)s'),
'sql_args': {'host': '%',
'user': 'mytestuser'
}
},
user='mytestuser',
host='%',
password='BLUECOW'
)
# test_user_create_when_user_exists(self):
# ensure we don't try to create a user when one already exists
# mock the version of MySQL

View File

@ -146,6 +146,7 @@ class BadTestModuleNamesTestCase(TestCase):
'integration.scheduler.test_eval',
'integration.scheduler.test_postpone',
'integration.scheduler.test_skip',
'integration.scheduler.test_maxrunning',
'integration.shell.test_spm',
'integration.shell.test_cp',
'integration.shell.test_syndic',

View File

@ -8,6 +8,9 @@ from __future__ import absolute_import, print_function, unicode_literals
import os
import time
import threading
import multiprocessing
import ctypes
from concurrent.futures.thread import ThreadPoolExecutor
# linux_distribution deprecated in py3.7
try:
@ -25,6 +28,7 @@ import tornado.gen
# Import Salt libs
import salt.config
import salt.log.setup
from salt.ext import six
import salt.utils.process
import salt.transport.server
@ -338,3 +342,225 @@ class ZMQConfigTest(TestCase):
assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
master_port=m_port,
source_port=s_port) == 'tcp://0.0.0.0:{0};{1}:{2}'.format(s_port, m_ip, m_port)
class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
@classmethod
def setUpClass(cls):
ret_port = get_unused_localhost_port()
publish_port = get_unused_localhost_port()
tcp_master_pub_port = get_unused_localhost_port()
tcp_master_pull_port = get_unused_localhost_port()
tcp_master_publish_pull = get_unused_localhost_port()
tcp_master_workers = get_unused_localhost_port()
cls.master_config = cls.get_temp_config(
'master',
**{'transport': 'zeromq',
'auto_accept': True,
'ret_port': ret_port,
'publish_port': publish_port,
'tcp_master_pub_port': tcp_master_pub_port,
'tcp_master_pull_port': tcp_master_pull_port,
'tcp_master_publish_pull': tcp_master_publish_pull,
'tcp_master_workers': tcp_master_workers,
'sign_pub_messages': False,
}
)
salt.master.SMaster.secrets['aes'] = {
'secret': multiprocessing.Array(
ctypes.c_char,
six.b(salt.crypt.Crypticle.generate_key_string()),
),
}
cls.minion_config = cls.get_temp_config(
'minion',
**{'transport': 'zeromq',
'master_ip': '127.0.0.1',
'master_port': ret_port,
'auth_timeout': 5,
'auth_tries': 1,
'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
)
@classmethod
def tearDownClass(cls):
del cls.minion_config
del cls.master_config
def setUp(self):
# Start the event loop, even though we dont directly use this with
# ZeroMQPubServerChannel, having it running seems to increase the
# likely hood of dropped messages.
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
self.io_loop.make_current()
self.io_loop_thread = threading.Thread(target=self.io_loop.start)
self.io_loop_thread.start()
self.process_manager = salt.utils.process.ProcessManager(name='PubServer_ProcessManager')
def tearDown(self):
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop_thread.join()
self.process_manager.stop_restarting()
self.process_manager.kill_children()
del self.io_loop
del self.io_loop_thread
del self.process_manager
@staticmethod
def _gather_results(opts, pub_uri, results, timeout=120):
'''
Gather results until then number of seconds specified by timeout passes
without reveiving a message
'''
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.LINGER, -1)
sock.setsockopt(zmq.SUBSCRIBE, b'')
sock.connect(pub_uri)
last_msg = time.time()
serial = salt.payload.Serial(opts)
crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value)
while time.time() - last_msg < timeout:
try:
payload = sock.recv(zmq.NOBLOCK)
except zmq.ZMQError:
time.sleep(.01)
else:
payload = crypticle.loads(serial.loads(payload)['load'])
if 'stop' in payload:
break
last_msg = time.time()
results.append(payload['jid'])
return results
@skipIf(salt.utils.is_windows(), 'Skip on Windows OS')
def test_publish_to_pubserv_ipc(self):
'''
Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
ZMQ's ipc transport not supported on Windows
'''
opts = dict(self.master_config, ipc_mode='ipc', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
send_num = 10000
expect = []
results = []
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
# Allow time for server channel to start, especially on windows
time.sleep(2)
for i in range(send_num):
expect.append(i)
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
server_channel.publish(load)
server_channel.publish(
{'tgt_type': 'glob', 'tgt': '*', 'stop': True}
)
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
def test_publish_to_pubserv_tcp(self):
'''
Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
'''
opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
send_num = 10000
expect = []
results = []
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
# Allow time for server channel to start, especially on windows
time.sleep(2)
for i in range(send_num):
expect.append(i)
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
server_channel.publish(load)
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
@staticmethod
def _send_small(opts, sid, num=10):
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)}
server_channel.publish(load)
@staticmethod
def _send_large(opts, sid, num=10, size=250000 * 3):
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size}
server_channel.publish(load)
def test_issue_36469_tcp(self):
'''
Test sending both large and small messags to publisher using TCP
https://github.com/saltstack/salt/issues/36469
'''
opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
send_num = 10 * 4
expect = []
results = []
pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
# Allow time for server channel to start, especially on windows
time.sleep(2)
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(self._send_small, opts, 1)
executor.submit(self._send_small, opts, 2)
executor.submit(self._send_small, opts, 3)
executor.submit(self._send_large, opts, 4)
expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
@skipIf(salt.utils.is_windows(), 'Skip on Windows OS')
def test_issue_36469_udp(self):
'''
Test sending both large and small messags to publisher using UDP
https://github.com/saltstack/salt/issues/36469
'''
opts = dict(self.master_config, ipc_mode='udp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
send_num = 10 * 4
expect = []
results = []
pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
# Allow time for server channel to start, especially on windows
time.sleep(2)
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(self._send_small, opts, 1)
executor.submit(self._send_small, opts, 2)
executor.submit(self._send_small, opts, 3)
executor.submit(self._send_large, opts, 4)
expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))