Merge pull request #38212 from disaster123/develop_zmq_backlog

ZMQ: add an option for zmq.BACKLOG to salt master (zmq_backlog)
This commit is contained in:
Mike Place 2016-12-20 07:41:42 -07:00 committed by GitHub
commit df462cbe3e
4 changed files with 11 additions and 0 deletions

View File

@ -234,6 +234,9 @@
# Set the ZeroMQ high water marks
# http://api.zeromq.org/3-2:zmq-setsockopt
# The listen queue size / backlog
#zmq_backlog: 1000
# The publisher interface ZeroMQPubServerChannel
#pub_hwm: 1000

View File

@ -508,6 +508,9 @@ VALID_OPTS = {
# TODO unknown option!
'auth_mode': int,
# listen queue size / backlog
'zmq_backlog': int,
# Set the zeromq high water mark on the publisher interface.
# http://api.zeromq.org/3-2:zmq-setsockopt
'pub_hwm': int,
@ -1213,6 +1216,7 @@ DEFAULT_MINION_OPTS = {
DEFAULT_MASTER_OPTS = {
'interface': '0.0.0.0',
'publish_port': 4505,
'zmq_backlog': 1000,
'pub_hwm': 1000,
# ZMQ HWM for SaltEvent pub socket - different for minion vs. master
'salt_event_pub_hwm': 2000,

View File

@ -95,6 +95,7 @@ class ZmqRet(multiprocessing.Process):
except AttributeError:
self.clients.setsockopt(zmq.SNDHWM, self.opts['rep_hwm'])
self.clients.setsockopt(zmq.RCVHWM, self.opts['rep_hwm'])
self.clients.setsockopt(zmq.BACKLOG, self.opts['zmq_backlog'])
self.workers = self.context.socket(zmq.DEALER)
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
@ -182,6 +183,7 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
if self.opts.value['ipv6'] is True and hasattr(zmq, 'IPV4ONLY'):
# IPv6 sockets work for both IPv6 and IPv4 addresses
self.pub_sock.setsockopt(zmq.IPV4ONLY, 0)
self.pub_sock.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
self.pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts.value)
log.info('Starting the Salt ZeroMQ Publisher on {0}'.format(self.pub_uri))
self.pub_sock.bind(self.pub_uri)

View File

@ -461,6 +461,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
if self.opts['ipv6'] is True and hasattr(zmq, 'IPV4ONLY'):
# IPv6 sockets work for both IPv6 and IPv4 addresses
self.clients.setsockopt(zmq.IPV4ONLY, 0)
self.clients.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
import threading
@ -715,6 +716,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
if self.opts['ipv6'] is True and hasattr(zmq, 'IPV4ONLY'):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_sock.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)