Merge pull request #47106 from DSRCorporation/bugs/tornado50

Tornado50 compatibility fixes
This commit is contained in:
Nicole Thomas 2018-04-25 11:32:36 -04:00 committed by GitHub
commit 0d9d55e013
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 100 additions and 94 deletions

View File

@ -1,4 +1,4 @@
-r base.txt
-r base-py2.txt
mock>=2.0.0
apache-libcloud>=0.14.0
@ -6,6 +6,6 @@ boto>=2.32.1
boto3>=1.2.1
moto>=0.3.6
SaltPyLint>=v2017.3.6
pytest
pytest>=3.5.0
git+https://github.com/eisensheng/pytest-catchlog.git@develop#egg=Pytest-catchlog
git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt

View File

@ -1,4 +1,4 @@
-r base.txt
-r base-py3.txt
mock>=2.0.0
apache-libcloud>=0.14.0
@ -11,5 +11,5 @@ moto>=0.3.6
# prevent it from being successfully installed (at least on Python 3.4).
httpretty
SaltPyLint>=v2017.2.29
pytest
pytest>=3.5.0
git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt

View File

@ -1,3 +1,3 @@
pytest
pytest>=3.5.0
pytest-helpers-namespace
pytest-tempdir

View File

@ -94,14 +94,15 @@ class IRCClient(object):
self.allow_nicks = allow_nicks
self.disable_query = disable_query
self.io_loop = tornado.ioloop.IOLoop(make_current=False)
self.io_loop.make_current()
self._connect()
def _connect(self):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if self.ssl is True:
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE}, io_loop=self.io_loop)
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE})
else:
self._stream = tornado.iostream.IOStream(_sock, io_loop=self.io_loop)
self._stream = tornado.iostream.IOStream(_sock)
self._stream.set_close_callback(self.on_closed)
self._stream.connect((self.host, self.port), self.on_connect)

View File

@ -81,6 +81,7 @@ def start(address=None, port=5000, ssl_crt=None, ssl_key=None):
if all([ssl_crt, ssl_key]):
ssl_options = {"certfile": ssl_crt, "keyfile": ssl_key}
io_loop = tornado.ioloop.IOLoop(make_current=False)
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options, io_loop=io_loop)
io_loop.make_current()
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options)
http_server.listen(port, address=address)
io_loop.start()

View File

@ -36,13 +36,15 @@ try:
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
HAS_ZMQ = True
except ImportError:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
from salt.utils.async import LOOP_CLASS
import tornado.gen # pylint: disable=F0401
# Import salt libs
@ -856,7 +858,7 @@ class MWorker(SignalHandlingMultiprocessingProcess):
Bind to the local port
'''
# using ZMQIOLoop since we *might* need zmq in there
if HAS_ZMQ:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()
self.io_loop.make_current()

View File

@ -31,6 +31,7 @@ else:
import salt.ext.ipaddress as ipaddress
from salt.ext.six.moves import range
# pylint: enable=no-name-in-module,redefined-builtin
from salt.utils.async import LOOP_CLASS
# Import third party libs
try:
@ -40,13 +41,13 @@ try:
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
HAS_ZMQ = True
except ImportError:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
HAS_RANGE = False
try:
import seco.range
@ -656,7 +657,7 @@ class SMinion(MinionBase):
# Clean out the proc directory (default /var/cache/salt/minion/proc)
if (self.opts.get('file_client', 'remote') == 'remote'
or self.opts.get('use_master_when_local', False)):
if self.opts['transport'] == 'zeromq' and HAS_ZMQ:
if self.opts['transport'] == 'zeromq' and HAS_ZMQ and not TORNADO_50:
io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
else:
io_loop = LOOP_CLASS.current()
@ -805,7 +806,7 @@ class MinionManager(MinionBase):
self.minions = []
self.jid_queue = []
if HAS_ZMQ:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
self.process_manager = ProcessManager(name='MultiMinionProcessManager')
@ -954,7 +955,7 @@ class Minion(MinionBase):
self.periodic_callbacks = {}
if io_loop is None:
if HAS_ZMQ:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
else:
@ -2250,13 +2251,15 @@ class Minion(MinionBase):
if beacons and self.connected:
self._fire_master(events=beacons)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(
handle_beacons, loop_interval * 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_beacons()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2309,14 +2312,15 @@ class Minion(MinionBase):
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_schedule()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2372,7 +2376,7 @@ class Minion(MinionBase):
self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler)
except Exception:
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000)
self.periodic_callbacks['ping'].start()
# add handler to subscriber
@ -2632,7 +2636,7 @@ class SyndicManager(MinionBase):
self.jid_forward_cache = set()
if io_loop is None:
if HAS_ZMQ:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
else:
@ -2816,7 +2820,7 @@ class SyndicManager(MinionBase):
# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
self.opts['syndic_event_forward_timeout'] * 1000,
io_loop=self.io_loop)
)
self.forward_events.start()
# Make sure to gracefully handle SIGUSR1

View File

@ -128,6 +128,6 @@ def start():
raise SystemExit(1)
try:
tornado.ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
raise SystemExit(0)

View File

@ -205,14 +205,17 @@ import tornado.ioloop
import tornado.web
import tornado.gen
from tornado.concurrent import Future
from zmq.eventloop import ioloop
import salt.ext.six as six
import tornado
# pylint: enable=import-error
TORNADO_50 = tornado.version_info >= (5,)
# instantiate the zmq IOLoop (specialized poller)
ioloop.install()
if not TORNADO_50:
import zmq.eventloop.ioloop
# instantiate the zmq IOLoop (specialized poller)
zmq.eventloop.ioloop.install()
# salt imports
import salt.ext.six as six
import salt.netapi
import salt.utils
import salt.utils.event

View File

@ -130,11 +130,11 @@ class IPCServer(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -197,10 +197,10 @@ class IPCServer(object):
log.trace('IPCServer: Handling connection '
'to address: {0}'.format(address))
try:
stream = IOStream(
connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
)
self.io_loop.spawn_callback(self.handle_stream, stream)
except Exception as exc:
log.error('IPC streaming error: {0}'.format(exc))
@ -330,10 +330,10 @@ class IPCClient(object):
break
if self.stream is None:
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
)
try:
log.trace('IPCClient: Connecting to socket: {0}'.format(self.socket_path))
@ -511,11 +511,11 @@ class IPCMessagePublisher(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -546,17 +546,14 @@ class IPCMessagePublisher(object):
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection to address: {0}'.format(address))
try:
kwargs = {}
if self.opts['ipc_write_buffer'] > 0:
kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer']
log.trace('Setting IPC connection write buffer: {0}'.format((self.opts['ipc_write_buffer'])))
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
io_loop=self.io_loop,
max_write_buffer_size=self.opts['ipc_write_buffer']
)
else:
stream = IOStream(
connection,
io_loop=self.io_loop
**kwargs
)
self.streams.add(stream)

View File

@ -764,10 +764,9 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
'''
Override _create_stream() in TCPClient to enable keep alive support.
'''
def __init__(self, opts, resolver=None, io_loop=None):
def __init__(self, opts, resolver=None):
self.opts = opts
super(TCPClientKeepAlive, self).__init__(
resolver=resolver, io_loop=io_loop)
super(TCPClientKeepAlive, self).__init__(resolver=resolver)
def _create_stream(self, max_buffer_size, af, addr, **kwargs): # pylint: disable=unused-argument
'''
@ -783,7 +782,6 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
_set_tcp_keepalive(sock, self.opts)
stream = tornado.iostream.IOStream(
sock,
io_loop=self.io_loop,
max_buffer_size=max_buffer_size)
return stream.connect(addr)
@ -842,8 +840,8 @@ class SaltMessageClient(object):
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self._tcp_client = TCPClientKeepAlive(
opts, io_loop=self.io_loop, resolver=resolver)
with salt.utils.async.current_ioloop(self.io_loop):
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
self._mid = 1
self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap
@ -932,9 +930,10 @@ class SaltMessageClient(object):
if self._closing:
break
try:
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'))
with salt.utils.async.current_ioloop(self.io_loop):
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'))
self._connecting_future.set_result(True)
break
except Exception as e:
@ -1137,7 +1136,8 @@ class PubServer(tornado.tcpserver.TCPServer, object):
TCP publisher
'''
def __init__(self, opts, io_loop=None):
super(PubServer, self).__init__(io_loop=io_loop, ssl_options=opts.get('ssl'))
super(PubServer, self).__init__(ssl_options=opts.get('ssl'))
self.io_loop = io_loop
self.opts = opts
self._closing = False
self.clients = set()

View File

@ -50,6 +50,7 @@ PYZMQ_VERSION = tuple(map(int, zmq.pyzmq_version().split('.')))
import tornado
import tornado.gen
import tornado.concurrent
TORNADO_50 = tornado.version_info >= (5,)
# Import third party libs
try:
@ -78,7 +79,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop')
if io_loop is None:
zmq.eventloop.ioloop.install()
if not TORNADO_50:
zmq.eventloop.ioloop.install()
io_loop = tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
@ -146,7 +148,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
self._io_loop = kwargs.get('io_loop')
if self._io_loop is None:
zmq.eventloop.ioloop.install()
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self._io_loop = tornado.ioloop.IOLoop.current()
if self.crypt != 'clear':
@ -290,7 +293,8 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
self.io_loop = kwargs.get('io_loop')
if self.io_loop is None:
zmq.eventloop.ioloop.install()
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = tornado.ioloop.IOLoop.current()
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
@ -897,7 +901,8 @@ class AsyncReqMessageClient(object):
self.addr = addr
self.linger = linger
if io_loop is None:
zmq.eventloop.ioloop.install()
if not TORNADO_50:
zmq.eventloop.ioloop.install()
tornado.ioloop.IOLoop.current()
else:
self.io_loop = io_loop

View File

@ -13,12 +13,19 @@ try:
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
HAS_ZMQ = True
except ImportError:
LOOP_CLASS = tornado.ioloop.IOLoop
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
if HAS_ZMQ and not TORNADO_50:
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
else:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
import contextlib

View File

@ -54,13 +54,17 @@ import salt.log.setup
from salt.utils.odict import OrderedDict
# Define the pytest plugins we rely on
pytest_plugins = ['pytest_catchlog', 'tempdir', 'helpers_namespace'] # pylint: disable=invalid-name
pytest_plugins = ['tempdir', 'helpers_namespace'] # pylint: disable=invalid-name
# Define where not to collect tests from
collect_ignore = ['setup.py']
log = logging.getLogger('salt.testsuite')
# Reset logging root handlers
for handler in logging.root.handlers:
logging.root.removeHandler(handler)
def pytest_tempdir_basename():
'''
@ -196,25 +200,6 @@ def pytest_configure(config):
called after command line options have been parsed
and all plugins and initial conftest files been loaded.
'''
# Configure the console logger based on the catch_log settings.
# Most importantly, shutdown Salt's null, store and temporary logging queue handlers
catch_log = config.pluginmanager.getplugin('_catch_log')
cli_logging_handler = catch_log.log_cli_handler
# Add the pytest_catchlog CLI log handler to the logging root
logging.root.addHandler(cli_logging_handler)
cli_level = cli_logging_handler.level
cli_level = config._catchlog_log_cli_level
cli_format = cli_logging_handler.formatter._fmt
cli_date_format = cli_logging_handler.formatter.datefmt
# Setup the console logger which shuts down the null and the temporary queue handlers
salt.log.setup_console_logger(
log_level=salt.log.setup.LOG_VALUES_TO_LEVELS.get(cli_level, 'error'),
log_format=cli_format,
date_format=cli_date_format
)
# Disable the store logging queue handler
salt.log.setup.setup_extended_logging({'extension_modules': ''})
config.addinivalue_line('norecursedirs', os.path.join(CODE_DIR, 'templates'))
config.addinivalue_line(
'markers',

View File

@ -11,6 +11,7 @@ import os
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.helpers import skip_if_not_root
# Import salt libs
from salt import minion
@ -24,7 +25,7 @@ __opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class MinionTestCase(TestCase, tornado.testing.AsyncTestCase):
class MinionTestCase(TestCase, tornado.testing.AsyncTestCase, AdaptedConfigurationTestCaseMixin):
def test_invalid_master_address(self):
with patch.dict(__opts__, {'ipv6': False, 'master': float('127.0'), 'master_port': '4555', 'retry_dns': False}):
self.assertRaises(SaltSystemExit, minion.resolve_dns, __opts__)
@ -145,7 +146,7 @@ class MinionTestCase(TestCase, tornado.testing.AsyncTestCase):
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts = self.get_config('minion', from_scratch=True)
mock_opts['beacons_before_connect'] = True
try:
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
@ -169,7 +170,7 @@ class MinionTestCase(TestCase, tornado.testing.AsyncTestCase):
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts = self.get_config('minion', from_scratch=True)
mock_opts['scheduler_before_connect'] = True
try:
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())