Merge pull request #31823 from rallytime/bp-31364

Back-port #31364 to 2016.3
This commit is contained in:
Nicole Thomas 2016-03-11 14:46:24 -07:00
commit 349216787d
2 changed files with 16 additions and 5 deletions

View File

@ -736,7 +736,7 @@ class SaltMessageClient(object):
while not self._connecting_future.done() or self._connecting_future.result() is not True:
yield self._connecting_future
while len(self.send_queue) > 0:
message_id, item = self.send_queue.pop(0)
message_id, item = self.send_queue[0]
try:
yield self._stream.write(item)
# if the connection is dead, lets fail this send, and make sure we
@ -744,6 +744,7 @@ class SaltMessageClient(object):
except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id)
del self.send_queue[0]
if self._closing:
return
if self.disconnect_callback:
@ -752,6 +753,7 @@ class SaltMessageClient(object):
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
del self.send_queue[0]
def _message_id(self):
wrap = False

View File

@ -852,8 +852,12 @@ class AsyncReqMessageClient(object):
@tornado.gen.coroutine
def _internal_send_recv(self):
while len(self.send_queue) > 0:
message = self.send_queue.pop(0)
future = self.send_future_map.pop(message)
message = self.send_queue[0]
future = self.send_future_map.get(message, None)
if future is None:
# Timedout
del self.send_queue[0]
continue
# send
def mark_future(msg):
@ -866,14 +870,19 @@ class AsyncReqMessageClient(object):
ret = yield future
except: # pylint: disable=W0702
self._init_socket() # re-init the zmq socket (no other way in zmq)
del self.send_queue[0]
continue
del self.send_queue[0]
self.send_future_map.pop(message, None)
self.remove_message_timeout(message)
def remove_message_timeout(self, message):
if message not in self.send_timeout_map:
return
timeout = self.send_timeout_map.pop(message)
self.io_loop.remove_timeout(timeout)
timeout = self.send_timeout_map.pop(message, None)
if timeout is not None:
# Hasn't been already timedout
self.io_loop.remove_timeout(timeout)
def timeout_message(self, message):
'''