Merge pull request #27192 from DSRCompany/issues/26885_salt_call_zmq_monitor_fix

Correclty install zeromq ioloop.
This commit is contained in:
Mike Place 2015-09-17 11:26:18 -06:00
commit 3717330d04
3 changed files with 25 additions and 12 deletions

View File

@ -603,6 +603,7 @@ class MultiMinion(MinionBase):
self.auth_wait = self.opts['acceptance_wait_time']
self.max_auth_wait = self.opts['acceptance_wait_time_max']
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
def _spawn_minions(self):
@ -681,9 +682,11 @@ class Minion(MinionBase):
self.win_proc = []
self.loaded_base_name = loaded_base_name
self.io_loop = io_loop or zmq.eventloop.ioloop.ZMQIOLoop()
if not self.io_loop.initialized():
self.io_loop.install()
if io_loop is None:
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
else:
self.io_loop = io_loop
# Warn if ZMQ < 3.2
if HAS_ZMQ:
@ -1971,10 +1974,10 @@ class MultiSyndic(MinionBase):
self.jid_forward_cache = set()
if io_loop is None:
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
else:
self.io_loop = io_loop
self.io_loop.install()
def _spawn_syndics(self):
'''

View File

@ -192,7 +192,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.serial = salt.payload.Serial(self.opts)
self.io_loop = kwargs['io_loop'] or tornado.ioloop.IOLoop.current()
self.io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
self.connected = False
def __del__(self):

View File

@ -63,7 +63,10 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
'''
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
io_loop = kwargs.get('io_loop')
if io_loop is None:
zmq.eventloop.ioloop.install()
io_loop = tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
@ -114,7 +117,10 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
if 'master_uri' in kwargs:
self.opts['master_uri'] = kwargs['master_uri']
self._io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
self._io_loop = kwargs.get('io_loop')
if self._io_loop is None:
zmq.eventloop.ioloop.install()
self._io_loop = tornado.ioloop.IOLoop.current()
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
@ -232,10 +238,10 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
self.opts = opts
self.ttype = 'zeromq'
if 'io_loop' in kwargs:
self.io_loop = kwargs['io_loop']
else:
self.io_loop = tornado.ioloop.IOLoop()
self.io_loop = kwargs.get('io_loop')
if self.io_loop is None:
zmq.eventloop.ioloop.install()
self.io_loop = tornado.ioloop.IOLoop.current()
self.hexid = hashlib.sha1(self.opts['id']).hexdigest()
@ -697,7 +703,11 @@ class AsyncReqMessageClient(object):
self.opts = opts
self.addr = addr
self.linger = linger
self.io_loop = io_loop or zmq.eventloop.ioloop.ZMQIOLoop.current()
if io_loop is None:
zmq.eventloop.ioloop.install()
tornado.ioloop.IOLoop.current()
else:
self.io_loop = io_loop
self.serial = salt.payload.Serial(self.opts)