mirror of
https://github.com/valitydev/salt.git
synced 2024-11-06 16:45:27 +00:00
Run transsport test loops in single thread
This commit is contained in:
parent
a13cb3eae6
commit
22b9d383d3
@ -613,10 +613,17 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
|
|||||||
raise exc
|
raise exc
|
||||||
self._socket.close()
|
self._socket.close()
|
||||||
self._socket = None
|
self._socket = None
|
||||||
if hasattr(self.req_server, 'stop'):
|
if hasattr(self.req_server, 'shutdown'):
|
||||||
|
try:
|
||||||
|
self.req_server.shutdown()
|
||||||
|
except Exception as exc:
|
||||||
|
log.exception('TCPReqServerChannel close generated an exception: %s', str(exc))
|
||||||
|
elif hasattr(self.req_server, 'stop'):
|
||||||
try:
|
try:
|
||||||
self.req_server.stop()
|
self.req_server.stop()
|
||||||
except Exception as exc:
|
except socket.error as exc:
|
||||||
|
if exc.errno != 9:
|
||||||
|
raise
|
||||||
log.exception('TCPReqServerChannel close generated an exception: %s', str(exc))
|
log.exception('TCPReqServerChannel close generated an exception: %s', str(exc))
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@ -662,7 +669,8 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
|
|||||||
self._socket.setblocking(0)
|
self._socket.setblocking(0)
|
||||||
self._socket.bind((self.opts['interface'], int(self.opts['ret_port'])))
|
self._socket.bind((self.opts['interface'], int(self.opts['ret_port'])))
|
||||||
self.req_server = SaltMessageServer(self.handle_message,
|
self.req_server = SaltMessageServer(self.handle_message,
|
||||||
ssl_options=self.opts.get('ssl'))
|
ssl_options=self.opts.get('ssl'),
|
||||||
|
io_loop=self.io_loop)
|
||||||
self.req_server.add_socket(self._socket)
|
self.req_server.add_socket(self._socket)
|
||||||
self._socket.listen(self.backlog)
|
self._socket.listen(self.backlog)
|
||||||
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop)
|
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop)
|
||||||
@ -748,11 +756,12 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
|
|||||||
messages that are sent through to us
|
messages that are sent through to us
|
||||||
'''
|
'''
|
||||||
def __init__(self, message_handler, *args, **kwargs):
|
def __init__(self, message_handler, *args, **kwargs):
|
||||||
|
io_loop = kwargs.pop('io_loop', None) or tornado.ioloop.IOLoop.current()
|
||||||
super(SaltMessageServer, self).__init__(*args, **kwargs)
|
super(SaltMessageServer, self).__init__(*args, **kwargs)
|
||||||
self.io_loop = tornado.ioloop.IOLoop.current()
|
self.io_loop = io_loop
|
||||||
|
|
||||||
self.clients = []
|
self.clients = []
|
||||||
self.message_handler = message_handler
|
self.message_handler = message_handler
|
||||||
|
self._shutting_down = False
|
||||||
|
|
||||||
@tornado.gen.coroutine
|
@tornado.gen.coroutine
|
||||||
def handle_stream(self, stream, address):
|
def handle_stream(self, stream, address):
|
||||||
@ -776,20 +785,34 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
|
|||||||
|
|
||||||
except tornado.iostream.StreamClosedError:
|
except tornado.iostream.StreamClosedError:
|
||||||
log.trace('req client disconnected %s', address)
|
log.trace('req client disconnected %s', address)
|
||||||
self.clients.remove((stream, address))
|
self.remove_client((stream, address))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.trace('other master-side exception: %s', e)
|
log.trace('other master-side exception: %s', e)
|
||||||
self.clients.remove((stream, address))
|
self.remove_client((stream, address))
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|
||||||
|
def remove_client(self, client):
|
||||||
|
try:
|
||||||
|
self.clients.remove(client)
|
||||||
|
except ValueError:
|
||||||
|
log.trace("Message server client was not in list to remove")
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
'''
|
'''
|
||||||
Shutdown the whole server
|
Shutdown the whole server
|
||||||
'''
|
'''
|
||||||
|
if self._shutting_down:
|
||||||
|
return
|
||||||
|
self._shutting_down = True
|
||||||
for item in self.clients:
|
for item in self.clients:
|
||||||
client, address = item
|
client, address = item
|
||||||
client.close()
|
client.close()
|
||||||
self.clients.remove(item)
|
self.remove_client(item)
|
||||||
|
try:
|
||||||
|
self.stop()
|
||||||
|
except socket.error as exc:
|
||||||
|
if exc.errno != 9:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
if USE_LOAD_BALANCER:
|
if USE_LOAD_BALANCER:
|
||||||
|
@ -8,9 +8,30 @@ import salt.transport.client
|
|||||||
|
|
||||||
# Import 3rd-party libs
|
# Import 3rd-party libs
|
||||||
from salt.ext import six
|
from salt.ext import six
|
||||||
|
import tornado.gen
|
||||||
|
|
||||||
|
|
||||||
|
def run_loop_in_thread(loop, evt):
|
||||||
|
'''
|
||||||
|
Run the provided loop until an event is set
|
||||||
|
'''
|
||||||
|
loop.make_current()
|
||||||
|
@tornado.gen.coroutine
|
||||||
|
def stopper():
|
||||||
|
while True:
|
||||||
|
if evt.is_set():
|
||||||
|
loop.stop()
|
||||||
|
break
|
||||||
|
yield tornado.gen.sleep(.3)
|
||||||
|
loop.add_callback(stopper)
|
||||||
|
try:
|
||||||
|
loop.start()
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
class ReqChannelMixin(object):
|
class ReqChannelMixin(object):
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
'''
|
'''
|
||||||
Test a variety of messages, make sure we get the expected responses
|
Test a variety of messages, make sure we get the expected responses
|
||||||
|
@ -29,7 +29,7 @@ from tests.support.unit import TestCase, skipIf
|
|||||||
from tests.support.helpers import get_unused_localhost_port, flaky
|
from tests.support.helpers import get_unused_localhost_port, flaky
|
||||||
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
|
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
|
||||||
from tests.support.mock import MagicMock, patch
|
from tests.support.mock import MagicMock, patch
|
||||||
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin
|
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin, run_loop_in_thread
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -72,28 +72,21 @@ class BaseTCPReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
|
|
||||||
cls.server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
|
cls.server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
|
||||||
cls.server_channel.pre_fork(cls.process_manager)
|
cls.server_channel.pre_fork(cls.process_manager)
|
||||||
|
|
||||||
cls.io_loop = tornado.ioloop.IOLoop()
|
cls.io_loop = tornado.ioloop.IOLoop()
|
||||||
|
cls.stop = threading.Event()
|
||||||
def run_loop_in_thread(loop):
|
|
||||||
loop.make_current()
|
|
||||||
loop.start()
|
|
||||||
|
|
||||||
cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
|
cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
|
||||||
|
cls.server_thread = threading.Thread(
|
||||||
cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls.io_loop,))
|
target=run_loop_in_thread,
|
||||||
cls.server_thread.daemon = True
|
args=(cls.io_loop, cls.stop,),
|
||||||
|
)
|
||||||
cls.server_thread.start()
|
cls.server_thread.start()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
if not hasattr(cls, '_handle_payload'):
|
cls.server_channel.close()
|
||||||
return
|
cls.stop.set()
|
||||||
if hasattr(cls, 'io_loop'):
|
|
||||||
cls.io_loop.add_callback(cls.io_loop.stop)
|
|
||||||
cls.server_thread.join()
|
cls.server_thread.join()
|
||||||
cls.process_manager.kill_children()
|
cls.process_manager.kill_children()
|
||||||
cls.server_channel.close()
|
|
||||||
del cls.server_channel
|
del cls.server_channel
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -195,16 +188,13 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
# we also require req server for auth
|
# we also require req server for auth
|
||||||
cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
|
cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
|
||||||
cls.req_server_channel.pre_fork(cls.process_manager)
|
cls.req_server_channel.pre_fork(cls.process_manager)
|
||||||
|
cls.io_loop = tornado.ioloop.IOLoop()
|
||||||
cls._server_io_loop = tornado.ioloop.IOLoop()
|
cls.stop = threading.Event()
|
||||||
cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls._server_io_loop)
|
cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
|
||||||
|
cls.server_thread = threading.Thread(
|
||||||
def run_loop_in_thread(loop):
|
target=run_loop_in_thread,
|
||||||
loop.make_current()
|
args=(cls.io_loop, cls.stop,),
|
||||||
loop.start()
|
)
|
||||||
|
|
||||||
cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls._server_io_loop,))
|
|
||||||
cls.server_thread.daemon = True
|
|
||||||
cls.server_thread.start()
|
cls.server_thread.start()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -216,10 +206,11 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
cls._server_io_loop.add_callback(cls._server_io_loop.stop)
|
cls.req_server_channel.close()
|
||||||
|
cls.server_channel.close()
|
||||||
|
cls.stop.set()
|
||||||
cls.server_thread.join()
|
cls.server_thread.join()
|
||||||
cls.process_manager.kill_children()
|
cls.process_manager.kill_children()
|
||||||
cls.req_server_channel.close()
|
|
||||||
del cls.req_server_channel
|
del cls.req_server_channel
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -44,7 +44,7 @@ from tests.support.unit import TestCase, skipIf
|
|||||||
from tests.support.helpers import flaky, get_unused_localhost_port
|
from tests.support.helpers import flaky, get_unused_localhost_port
|
||||||
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
|
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
|
||||||
from tests.support.mock import MagicMock, patch
|
from tests.support.mock import MagicMock, patch
|
||||||
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin
|
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin, run_loop_in_thread
|
||||||
|
|
||||||
ON_SUSE = False
|
ON_SUSE = False
|
||||||
if 'SuSE' in linux_distribution(full_distribution_name=False):
|
if 'SuSE' in linux_distribution(full_distribution_name=False):
|
||||||
@ -93,11 +93,9 @@ class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
cls.server_channel.pre_fork(cls.process_manager)
|
cls.server_channel.pre_fork(cls.process_manager)
|
||||||
|
|
||||||
cls.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
cls.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
||||||
cls.io_loop.make_current()
|
cls.evt = threading.Event()
|
||||||
cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
|
cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
|
||||||
|
cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls.io_loop, cls.evt))
|
||||||
cls.server_thread = threading.Thread(target=cls.io_loop.start)
|
|
||||||
cls.server_thread.daemon = True
|
|
||||||
cls.server_thread.start()
|
cls.server_thread.start()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -108,7 +106,7 @@ class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
# Let the test suite handle this instead.
|
# Let the test suite handle this instead.
|
||||||
cls.process_manager.stop_restarting()
|
cls.process_manager.stop_restarting()
|
||||||
cls.process_manager.kill_children()
|
cls.process_manager.kill_children()
|
||||||
cls.io_loop.add_callback(cls.io_loop.stop)
|
cls.evt.set()
|
||||||
cls.server_thread.join()
|
cls.server_thread.join()
|
||||||
time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
|
time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
|
||||||
cls.server_channel.close()
|
cls.server_channel.close()
|
||||||
@ -238,10 +236,9 @@ class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
cls.req_server_channel.pre_fork(cls.process_manager)
|
cls.req_server_channel.pre_fork(cls.process_manager)
|
||||||
|
|
||||||
cls._server_io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
cls._server_io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
||||||
|
cls.evt = threading.Event()
|
||||||
cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls._server_io_loop)
|
cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls._server_io_loop)
|
||||||
|
cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt))
|
||||||
cls.server_thread = threading.Thread(target=cls._server_io_loop.start)
|
|
||||||
cls.server_thread.daemon = True
|
|
||||||
cls.server_thread.start()
|
cls.server_thread.start()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -249,7 +246,7 @@ class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
|
|||||||
cls.process_manager.kill_children()
|
cls.process_manager.kill_children()
|
||||||
cls.process_manager.stop_restarting()
|
cls.process_manager.stop_restarting()
|
||||||
time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
|
time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
|
||||||
cls.io_loop.add_callback(cls.io_loop.stop)
|
cls.evt.set()
|
||||||
cls.server_thread.join()
|
cls.server_thread.join()
|
||||||
cls.req_server_channel.close()
|
cls.req_server_channel.close()
|
||||||
cls.server_channel.close()
|
cls.server_channel.close()
|
||||||
|
Loading…
Reference in New Issue
Block a user