From bd6c46dfe392a1f5046c78c6a601454eb99a7dd8 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Mon, 7 Jan 2019 19:13:35 +0000 Subject: [PATCH] Cleanup the singleton instances map. This prevents closed resources from being reused. --- salt/transport/ipc.py | 12 +++++++----- salt/transport/tcp.py | 12 ++++++++++++ salt/transport/zeromq.py | 12 ++++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 2bc1030750..8081349acf 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -257,6 +257,7 @@ class IPCClient(object): client = object.__new__(cls) # FIXME client.__singleton_init__(io_loop=io_loop, socket_path=socket_path) + client._instance_key = key loop_instance_map[key] = client client._refcount = 1 client._refcount_lock = threading.RLock() @@ -400,11 +401,12 @@ class IPCClient(object): # that a closed entry may not be reused. # This forces this operation even if the reference # count of the entry has not yet gone to zero. - if self.io_loop in IPCClient.instance_map: - loop_instance_map = IPCClient.instance_map[self.io_loop] - key = six.text_type(self.socket_path) - if key in loop_instance_map: - del loop_instance_map[key] + if self.io_loop in self.__class__.instance_map: + loop_instance_map = self.__class__.instance_map[self.io_loop] + if self._instance_key in loop_instance_map: + del loop_instance_map[self._instance_key] + if not loop_instance_map: + del self.__class__.instance_map[self.io_loop] class IPCMessageClient(IPCClient): diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 82753a21fe..5215aef827 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -241,6 +241,7 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): # references it-- this forces a reference while we return to the caller obj = object.__new__(cls) obj.__singleton_init__(opts, **kwargs) + obj._instance_key = key loop_instance_map[key] = obj obj._refcount = 1 obj._refcount_lock = threading.RLock() @@ -308,6 +309,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): self._closing = True self.message_client.close() + # Remove the entry from the instance map so that a closed entry may not + # be reused. + # This forces this operation even if the reference count of the entry + # has not yet gone to zero. + if self.io_loop in self.__class__.instance_map: + loop_instance_map = self.__class__.instance_map[self.io_loop] + if self._instance_key in loop_instance_map: + del loop_instance_map[self._instance_key] + if not loop_instance_map: + del self.__class__.instance_map[self.io_loop] + def __del__(self): with self._refcount_lock: # Make sure we actually close no matter if something diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index d603fe553b..eb8922ffb2 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -135,6 +135,7 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): # references it-- this forces a reference while we return to the caller obj = object.__new__(cls) obj.__singleton_init__(opts, **kwargs) + obj._instance_key = key loop_instance_map[key] = obj obj._refcount = 1 obj._refcount_lock = threading.RLock() @@ -230,6 +231,17 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): else: log.debug('No message_client attr for AsyncZeroMQReqChannel found. Not destroying sockets.') + # Remove the entry from the instance map so that a closed entry may not + # be reused. + # This forces this operation even if the reference count of the entry + # has not yet gone to zero. + if self._io_loop in self.__class__.instance_map: + loop_instance_map = self.__class__.instance_map[self._io_loop] + if self._instance_key in loop_instance_map: + del loop_instance_map[self._instance_key] + if not loop_instance_map: + del self.__class__.instance_map[self._io_loop] + def __del__(self): with self._refcount_lock: # Make sure we actually close no matter if something