Merge branch '2017.7' into merge-2017

Conflicts:
	tests/unit/transport/test_zeromq.py
This commit is contained in:
Gareth J. Greenaway 2018-11-26 11:37:07 -08:00
commit 45a1aa3bdc
No known key found for this signature in database
GPG Key ID: 10B62F8A7CAD7A41
2 changed files with 289 additions and 28 deletions

View File

@ -13,6 +13,7 @@ import signal
import hashlib
import logging
import weakref
import threading
from random import randint
# Import Salt Libs
@ -760,6 +761,9 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
Encapsulate synchronous operations for a publisher channel
'''
_sock_data = threading.local()
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(self.opts) # TODO: in init?
@ -795,9 +799,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_sock.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
pub_sock.setsockopt(zmq.LINGER, -1)
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_sock.setsockopt(zmq.LINGER, -1)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
@ -860,15 +866,14 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
raise exc
except KeyboardInterrupt:
# Cleanly close the sockets if we're shutting down
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if pull_sock.closed is False:
pull_sock.setsockopt(zmq.LINGER, 1)
pull_sock.close()
if context.closed is False:
context.term()
log.trace('Publish daemon caught Keyboard interupt, tearing down')
# Cleanly close the sockets if we're shutting down
if pub_sock.closed is False:
pub_sock.close()
if pull_sock.closed is False:
pull_sock.close()
if context.closed is False:
context.term()
def pre_fork(self, process_manager, kwargs=None):
'''
@ -880,23 +885,29 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
process_manager.add_process(self._publish_daemon, kwargs=kwargs)
def publish(self, load):
@property
def pub_sock(self):
'''
Publish "load" to minions
:param dict load: A load to be sent across the wire to minions
This thread's zmq publisher socket. This socket is stored on the class
so that multiple instantiations in the same thread will re-use a single
zmq socket.
'''
payload = {'enc': 'aes'}
try:
return self._sock_data.sock
except AttributeError:
pass
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
# Send 0MQ to the publisher
context = zmq.Context(1)
pub_sock = context.socket(zmq.PUSH)
def pub_connect(self):
'''
Create and connect this thread's zmq socket. If a publisher socket
already exists "pub_close" is called before creating and connecting a
new socket.
'''
if self.pub_sock:
self.pub_close()
ctx = zmq.Context.instance()
self._sock_data.sock = ctx.socket(zmq.PUSH)
self.pub_sock.setsockopt(zmq.LINGER, -1)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_publish_pull', 4514)
@ -905,7 +916,33 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
log.debug("Connecting to pub server: %s", pull_uri)
self.pub_sock.connect(pull_uri)
return self._sock_data.sock
def pub_close(self):
'''
Disconnect an existing publisher socket and remove it from the local
thread's cache.
'''
if hasattr(self._sock_data, 'sock'):
self._sock_data.sock.close()
delattr(self._sock_data, 'sock')
def publish(self, load):
'''
Publish "load" to minions. This send the load to the publisher daemon
process with does the actual sending to minions.
:param dict load: A load to be sent across the wire to minions
'''
payload = {'enc': 'aes'}
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
int_payload = {'payload': self.serial.dumps(payload)}
# add some targeting stuff for lists only (for now)
@ -928,12 +965,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'Sending payload to publish daemon. jid=%s size=%d',
load.get('jid', None), len(payload),
)
pub_sock.send(payload)
if not self.pub_sock:
self.pub_connect()
self.pub_sock.send(payload)
log.debug('Sent payload to publish daemon.')
pub_sock.close()
context.term()
class AsyncReqMessageClientPool(salt.transport.MessageClientPool):
'''

View File

@ -8,6 +8,9 @@ from __future__ import absolute_import, print_function, unicode_literals
import os
import time
import threading
import multiprocessing
import ctypes
from concurrent.futures.thread import ThreadPoolExecutor
# linux_distribution deprecated in py3.7
try:
@ -25,6 +28,7 @@ import tornado.gen
# Import Salt libs
import salt.config
import salt.log.setup
from salt.ext import six
import salt.utils.process
import salt.transport.server
@ -338,3 +342,224 @@ class ZMQConfigTest(TestCase):
assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
master_port=m_port,
source_port=s_port) == 'tcp://0.0.0.0:{0};{1}:{2}'.format(s_port, m_ip, m_port)
class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
@classmethod
def setUpClass(cls):
ret_port = get_unused_localhost_port()
publish_port = get_unused_localhost_port()
tcp_master_pub_port = get_unused_localhost_port()
tcp_master_pull_port = get_unused_localhost_port()
tcp_master_publish_pull = get_unused_localhost_port()
tcp_master_workers = get_unused_localhost_port()
cls.master_config = cls.get_temp_config(
'master',
**{'transport': 'zeromq',
'auto_accept': True,
'ret_port': ret_port,
'publish_port': publish_port,
'tcp_master_pub_port': tcp_master_pub_port,
'tcp_master_pull_port': tcp_master_pull_port,
'tcp_master_publish_pull': tcp_master_publish_pull,
'tcp_master_workers': tcp_master_workers,
'sign_pub_messages': False,
}
)
salt.master.SMaster.secrets['aes'] = {
'secret': multiprocessing.Array(
ctypes.c_char,
six.b(salt.crypt.Crypticle.generate_key_string()),
),
}
cls.minion_config = cls.get_temp_config(
'minion',
**{'transport': 'zeromq',
'master_ip': '127.0.0.1',
'master_port': ret_port,
'auth_timeout': 5,
'auth_tries': 1,
'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
)
@classmethod
def tearDownClass(cls):
del cls.minion_config
del cls.master_config
def setUp(self):
# Start the event loop, even though we dont directly use this with
# ZeroMQPubServerChannel, having it running seems to increase the
# likely hood of dropped messages.
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
self.io_loop.make_current()
self.io_loop_thread = threading.Thread(target=self.io_loop.start)
self.io_loop_thread.start()
self.process_manager = salt.utils.process.ProcessManager(name='PubServer_ProcessManager')
def tearDown(self):
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop_thread.join()
self.process_manager.stop_restarting()
self.process_manager.kill_children()
del self.io_loop
del self.io_loop_thread
del self.process_manager
@staticmethod
def _gather_results(opts, pub_uri, results, timeout=120):
'''
Gather results until then number of seconds specified by timeout passes
without reveiving a message
'''
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.LINGER, -1)
sock.setsockopt(zmq.SUBSCRIBE, b'')
sock.connect(pub_uri)
last_msg = time.time()
serial = salt.payload.Serial(opts)
crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value)
while time.time() - last_msg < timeout:
try:
payload = sock.recv(zmq.NOBLOCK)
except zmq.ZMQError:
time.sleep(.01)
else:
payload = crypticle.loads(serial.loads(payload)['load'])
if 'stop' in payload:
break
last_msg = time.time()
results.append(payload['jid'])
return results
@skipIf(salt.utils.is_windows(), 'Skip on Windows OS')
def test_publish_to_pubserv_ipc(self):
'''
Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
ZMQ's ipc transport not supported on Windows
'''
opts = dict(self.master_config, ipc_mode='ipc', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
send_num = 10000
expect = []
results = []
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
# Allow time for server channel to start, especially on windows
time.sleep(2)
for i in range(send_num):
expect.append(i)
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
server_channel.publish(load)
server_channel.publish(
{'tgt_type': 'glob', 'tgt': '*', 'stop': True}
)
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
def test_publish_to_pubserv_tcp(self):
'''
Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
'''
opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
send_num = 10000
expect = []
results = []
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
# Allow time for server channel to start, especially on windows
time.sleep(2)
for i in range(send_num):
expect.append(i)
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
server_channel.publish(load)
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
@staticmethod
def _send_small(opts, sid, num=10):
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)}
server_channel.publish(load)
@staticmethod
def _send_large(opts, sid, num=10, size=250000 * 3):
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
for i in range(num):
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size}
server_channel.publish(load)
def test_issue_36469_tcp(self):
'''
Test sending both large and small messags to publisher using TCP
https://github.com/saltstack/salt/issues/36469
'''
opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
send_num = 10 * 4
expect = []
results = []
pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
# Allow time for server channel to start, especially on windows
time.sleep(2)
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(self._send_small, opts, 1)
executor.submit(self._send_small, opts, 2)
executor.submit(self._send_small, opts, 3)
executor.submit(self._send_large, opts, 4)
expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))
@skipIf(salt.utils.is_windows(), 'Skip on Windows OS')
def test_issue_36469_udp(self):
'''
Test sending both large and small messags to publisher using UDP
https://github.com/saltstack/salt/issues/36469
'''
opts = dict(self.master_config, ipc_mode='udp', pub_hwm=0)
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
server_channel.pre_fork(self.process_manager, kwargs={
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
})
send_num = 10 * 4
expect = []
results = []
pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
# Allow time for server channel to start, especially on windows
time.sleep(2)
gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
gather.start()
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(self._send_small, opts, 1)
executor.submit(self._send_small, opts, 2)
executor.submit(self._send_small, opts, 3)
executor.submit(self._send_large, opts, 4)
expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
gather.join()
server_channel.pub_close()
assert len(results) == send_num, (len(results), set(expect).difference(results))