Don't send REQ while another one is waiting for response.

The message has to be removed from the queue the only *after* it's
already processed to don't confuse send() functionality that expects
empty queue means: there's no active sendings.
This commit is contained in:
Dmitry Kuzmenko 2016-02-02 18:23:53 +03:00
parent cb2f252252
commit 950098cf9c
2 changed files with 16 additions and 5 deletions

View File

@ -736,7 +736,7 @@ class SaltMessageClient(object):
while not self._connecting_future.done() or self._connecting_future.result() is not True: while not self._connecting_future.done() or self._connecting_future.result() is not True:
yield self._connecting_future yield self._connecting_future
while len(self.send_queue) > 0: while len(self.send_queue) > 0:
message_id, item = self.send_queue.pop(0) message_id, item = self.send_queue[0]
try: try:
yield self._stream.write(item) yield self._stream.write(item)
# if the connection is dead, lets fail this send, and make sure we # if the connection is dead, lets fail this send, and make sure we
@ -744,6 +744,7 @@ class SaltMessageClient(object):
except tornado.iostream.StreamClosedError as e: except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception()) self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id) self.remove_message_timeout(message_id)
del self.send_queue[0]
if self._closing: if self._closing:
return return
if self.disconnect_callback: if self.disconnect_callback:
@ -752,6 +753,7 @@ class SaltMessageClient(object):
if self._connecting_future.done(): if self._connecting_future.done():
self._connecting_future = self.connect() self._connecting_future = self.connect()
yield self._connecting_future yield self._connecting_future
del self.send_queue[0]
def _message_id(self): def _message_id(self):
wrap = False wrap = False

View File

@ -865,8 +865,12 @@ class AsyncReqMessageClient(object):
@tornado.gen.coroutine @tornado.gen.coroutine
def _internal_send_recv(self): def _internal_send_recv(self):
while len(self.send_queue) > 0: while len(self.send_queue) > 0:
message = self.send_queue.pop(0) message = self.send_queue[0]
future = self.send_future_map.pop(message) future = self.send_future_map.get(message, None)
if future is None:
# Timedout
del self.send_queue[0]
continue
# send # send
def mark_future(msg): def mark_future(msg):
@ -879,14 +883,19 @@ class AsyncReqMessageClient(object):
ret = yield future ret = yield future
except: # pylint: disable=W0702 except: # pylint: disable=W0702
self._init_socket() # re-init the zmq socket (no other way in zmq) self._init_socket() # re-init the zmq socket (no other way in zmq)
del self.send_queue[0]
continue continue
del self.send_queue[0]
self.send_future_map.pop(message, None)
self.remove_message_timeout(message) self.remove_message_timeout(message)
def remove_message_timeout(self, message): def remove_message_timeout(self, message):
if message not in self.send_timeout_map: if message not in self.send_timeout_map:
return return
timeout = self.send_timeout_map.pop(message) timeout = self.send_timeout_map.pop(message, None)
self.io_loop.remove_timeout(timeout) if timeout is not None:
# Hasn't been already timedout
self.io_loop.remove_timeout(timeout)
def timeout_message(self, message): def timeout_message(self, message):
''' '''