Since we're shutting down. Set ZMQ socket's linger to 1.

This discards unsent messages as fast as it can. The value needs to be bigger than 0.
This commit is contained in:
Pedro Algarvio 2013-01-12 12:22:27 +00:00
parent 85eaeb0da5
commit 68a12646d4
3 changed files with 14 additions and 14 deletions

View File

@ -334,10 +334,10 @@ class Publisher(multiprocessing.Process):
except KeyboardInterrupt:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if pull_sock.closed is False:
pull_sock.setsockopt(zmq.LINGER, 2500)
pull_sock.setsockopt(zmq.LINGER, 1)
pull_sock.close()
finally:
if context.closed is False:
@ -424,10 +424,10 @@ class ReqServer(object):
def destroy(self):
if self.clients.closed is False:
self.clients.setsockopt(zmq.LINGER, 2500)
self.clients.setsockopt(zmq.LINGER, 1)
self.clients.close()
if self.workers.closed is False:
self.workers.setsockopt(zmq.LINGER, 2500)
self.workers.setsockopt(zmq.LINGER, 1)
self.workers.close()
if self.context.closed is False:
self.context.term()
@ -1017,7 +1017,7 @@ class AESFuncs(object):
)
finally:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if context.closed is False:
context.term()
@ -1035,7 +1035,7 @@ class AESFuncs(object):
return ret
finally:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if context.closed is False:
context.term()
@ -1652,7 +1652,7 @@ class ClearFuncs(object):
}
finally:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if context.closed is False:
context.term()

View File

@ -172,11 +172,11 @@ class SREQ(object):
def destroy(self):
for socket in self.poller.sockets.keys():
if socket.closed is False:
socket.setsockopt(zmq.LINGER, 2500)
socket.setsockopt(zmq.LINGER, 1)
socket.close()
self.poller.unregister(socket)
if self.socket.closed is False:
self.socket.setsockopt(zmq.LINGER, 2500)
self.socket.setsockopt(zmq.LINGER, 1)
self.socket.close()
if self.context.closed is False:
self.context.term()

View File

@ -175,10 +175,10 @@ class SaltEvent(object):
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
self.sub.setsockopt(zmq.LINGER, 2500)
self.sub.setsockopt(zmq.LINGER, 1)
self.sub.close()
if self.cpush is True and self.push.closed is False:
self.push.setsockopt(zmq.LINGER, 2500)
self.push.setsockopt(zmq.LINGER, 1)
self.push.close()
# If socket's are not unregistered from a poller, nothing which touches
# that poller get's garbage collected. The Poller itself, it's
@ -186,7 +186,7 @@ class SaltEvent(object):
for socket in self.poller.sockets.keys():
if socket.closed is False:
# Should already be closed from above, but....
socket.setsockopt(zmq.LINGER, 2500)
socket.setsockopt(zmq.LINGER, 1)
socket.close()
self.poller.unregister(socket)
if self.context.closed is False:
@ -276,10 +276,10 @@ class EventPublisher(Process):
raise exc
except KeyboardInterrupt:
if self.epub_sock.closed is False:
self.epub_sock.setsockopt(zmq.LINGER, 2500)
self.epub_sock.setsockopt(zmq.LINGER, 1)
self.epub_sock.close()
if self.epull_sock.closed is False:
self.epull_sock.setsockopt(zmq.LINGER, 2500)
self.epull_sock.setsockopt(zmq.LINGER, 1)
self.epull_sock.close()
finally:
if self.context.closed is False: