Merge pull request #28614 from skizunov/develop2

Fixed memory leak in AsyncTCPReqChannel
This commit is contained in:
Mike Place 2015-11-05 14:16:25 -07:00
commit 64e881e634
4 changed files with 57 additions and 27 deletions

View File

@ -43,9 +43,9 @@ class IPCServer(object):
self.payload_handler = payload_handler
# Placeholders for attributes to be populated by method calls
self.stream = None
self.sock = None
self.io_loop = io_loop or IOLoop.current()
self._closing = False
def start(self):
'''
@ -137,8 +137,9 @@ class IPCServer(object):
Sockets and filehandles should be closed explicitely, to prevent
leaks.
'''
if hasattr(self.stream, 'close'):
self.stream.close()
if self._closing:
return
self._closing = True
if hasattr(self.sock, 'close'):
self.sock.close()
@ -260,6 +261,8 @@ class IPCClient(object):
Sockets and filehandles should be closed explicitely, to prevent
leaks.
'''
if self._closing:
return
self._closing = True
if hasattr(self, 'stream'):
self.stream.close()

View File

@ -113,11 +113,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self._closing = False
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)
def close(self):
if self._closing:
return
self._closing = True
self.message_client.close()
def __del__(self):
self.message_client.destroy()
self.close()
def _package_load(self, load):
return {
@ -194,10 +200,17 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
self.connected = False
self._closing = False
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, 'message_client'):
self.message_client.close()
def __del__(self):
if hasattr(self, 'message_client'):
self.message_client.destroy()
self.close()
@tornado.gen.coroutine
def connect(self):
@ -403,29 +416,18 @@ class SaltMessageClient(object):
self.send_future_map = {} # mapping of request_id -> Future
self.send_timeout_map = {} # request_id -> timeout_callback
self._connecting_future = self.connect()
self._read_until_future = None
self.io_loop.spawn_callback(self._stream_return)
self._on_recv = None
self._closing = False
self._connecting_future = self.connect()
self.io_loop.spawn_callback(self._stream_return)
# TODO: timeout inflight sessions
def destroy(self):
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, '_stream') and not self._stream.closed():
if (self._read_until_future is not None and
hasattr(self._stream.io_loop, '_closing') and
self._stream.io_loop._closing):
# The io_loop is closed probably due to
# 'salt.utils.async.SyncWrapper.__del__'
# The call to set_exception(StreamClosedError()) on this read
# future (that will be called when closing the stream) will
# thus cause:
# 'raise RuntimeError("IOLoop is closing")'
# We want to prevent this.
if hasattr(self._stream, '_read_future'):
self._stream._read_future = None
self._stream.close()
if self._read_until_future is not None:
# This will prevent this message from showing up:
@ -438,7 +440,7 @@ class SaltMessageClient(object):
self._tcp_client.close()
def __del__(self):
self.destroy()
self.close()
def connect(self, callback=None):
'''
@ -478,9 +480,11 @@ class SaltMessageClient(object):
@tornado.gen.coroutine
def _stream_return(self):
while not self._connecting_future.done() or self._connecting_future.result() is not True:
while not self._closing and (
not self._connecting_future.done() or
self._connecting_future.result() is not True):
yield self._connecting_future
while True:
while not self._closing:
try:
self._read_until_future = self._stream.read_until(' ')
framed_msg_len = yield self._read_until_future
@ -503,6 +507,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -512,6 +518,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -529,6 +537,8 @@ class SaltMessageClient(object):
except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id)
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()

View File

@ -406,6 +406,11 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
self._closing = False
def zmq_device(self):
'''
Multiprocessing target for the zmq queue device
@ -453,6 +458,9 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
'''
Cleanly shutdown the router socket
'''
if self._closing:
return
self._closing = True
if hasattr(self, '_monitor') and self._monitor is not None:
self._monitor.stop()
self._monitor = None

View File

@ -86,6 +86,15 @@ class SyncWrapper(object):
'''
On deletion of the async wrapper, make sure to clean up the async stuff
'''
self.io_loop.close()
if hasattr(self, 'async'):
if hasattr(self.async, 'close'):
# Certain things such as streams should be closed before
# their associated io_loop is closed to allow for proper
# cleanup.
self.async.close()
self.io_loop.close()
# Other things should be deallocated after the io_loop closes.
# See Issue #26889.
del self.async
else:
self.io_loop.close()