Tornado 5.0 compatibility fixes

This commit is contained in:
Dmitry Kuzmenko 2018-04-16 22:41:44 +03:00
parent 7390b72808
commit 34058c181e
No known key found for this signature in database
GPG Key ID: 4C7CAD30C95651DA
7 changed files with 56 additions and 58 deletions

View File

@ -97,14 +97,15 @@ class IRCClient(object):
self.allow_nicks = allow_nicks
self.disable_query = disable_query
self.io_loop = tornado.ioloop.IOLoop(make_current=False)
self.io_loop.make_current()
self._connect()
def _connect(self):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if self.ssl is True:
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE}, io_loop=self.io_loop)
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE})
else:
self._stream = tornado.iostream.IOStream(_sock, io_loop=self.io_loop)
self._stream = tornado.iostream.IOStream(_sock)
self._stream.set_close_callback(self.on_closed)
self._stream.connect((self.host, self.port), self.on_connect)

View File

@ -81,6 +81,7 @@ def start(address=None, port=5000, ssl_crt=None, ssl_key=None):
if all([ssl_crt, ssl_key]):
ssl_options = {"certfile": ssl_crt, "keyfile": ssl_key}
io_loop = tornado.ioloop.IOLoop(make_current=False)
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options, io_loop=io_loop)
io_loop.make_current()
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options)
http_server.listen(port, address=address)
io_loop.start()

View File

@ -2495,13 +2495,15 @@ class Minion(MinionBase):
if beacons and self.connected:
self._fire_master(events=beacons)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(
handle_beacons, loop_interval * 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_beacons()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2554,14 +2556,15 @@ class Minion(MinionBase):
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_schedule()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2618,7 +2621,7 @@ class Minion(MinionBase):
self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler)
except Exception:
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000)
self.periodic_callbacks['ping'].start()
# add handler to subscriber
@ -3087,7 +3090,7 @@ class SyndicManager(MinionBase):
# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
self.opts['syndic_event_forward_timeout'] * 1000,
io_loop=self.io_loop)
)
self.forward_events.start()
# Make sure to gracefully handle SIGUSR1

View File

@ -128,6 +128,6 @@ def start():
raise SystemExit(1)
try:
tornado.ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
raise SystemExit(0)

View File

@ -202,14 +202,12 @@ import tornado.ioloop
import tornado.web
import tornado.gen
from tornado.concurrent import Future
from zmq.eventloop import ioloop
from salt.ext import six
# pylint: enable=import-error
# instantiate the zmq IOLoop (specialized poller)
ioloop.install()
import salt.utils
salt.utils.install_zmq()
# salt imports
import salt.ext.six as six
import salt.netapi
import salt.utils.args
import salt.utils.event

View File

@ -130,11 +130,11 @@ class IPCServer(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -196,10 +196,10 @@ class IPCServer(object):
log.trace('IPCServer: Handling connection '
'to address: %s', address)
try:
stream = IOStream(
connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
)
self.io_loop.spawn_callback(self.handle_stream, stream)
except Exception as exc:
log.error('IPC streaming error: %s', exc)
@ -329,10 +329,10 @@ class IPCClient(object):
break
if self.stream is None:
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
)
try:
log.trace('IPCClient: Connecting to socket: %s', self.socket_path)
@ -510,11 +510,11 @@ class IPCMessagePublisher(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -545,17 +545,14 @@ class IPCMessagePublisher(object):
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection to address: %s', address)
try:
kwargs = {}
if self.opts['ipc_write_buffer'] > 0:
kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer']
log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer']))
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
io_loop=self.io_loop,
max_write_buffer_size=self.opts['ipc_write_buffer']
)
else:
stream = IOStream(
connection,
io_loop=self.io_loop
**kwargs
)
self.streams.add(stream)

View File

@ -775,10 +775,9 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
'''
Override _create_stream() in TCPClient to enable keep alive support.
'''
def __init__(self, opts, resolver=None, io_loop=None):
def __init__(self, opts, resolver=None):
self.opts = opts
super(TCPClientKeepAlive, self).__init__(
resolver=resolver, io_loop=io_loop)
super(TCPClientKeepAlive, self).__init__(resolver=resolver)
def _create_stream(self, max_buffer_size, af, addr, **kwargs): # pylint: disable=unused-argument
'''
@ -794,7 +793,6 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
_set_tcp_keepalive(sock, self.opts)
stream = tornado.iostream.IOStream(
sock,
io_loop=self.io_loop,
max_buffer_size=max_buffer_size)
return stream.connect(addr)
@ -856,8 +854,8 @@ class SaltMessageClient(object):
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self._tcp_client = TCPClientKeepAlive(
opts, io_loop=self.io_loop, resolver=resolver)
with salt.utils.async.current_ioloop(self.io_loop):
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
self._mid = 1
self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap
@ -946,18 +944,17 @@ class SaltMessageClient(object):
if self._closing:
break
try:
if (self.source_ip or self.source_port) and tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'),
source_ip=self.source_ip,
source_port=self.source_port)
else:
if self.source_ip or self.source_port:
kwargs = {}
if (self.source_ip or self.source_port):
if tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
kwargs = {'source_ip': self.source_ip,
'source_port': self.source_port}
else:
log.warning('If you need a certain source IP/port, consider upgrading Tornado >= 4.5')
with salt.utils.async.current_ioloop(self.io_loop):
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'))
@ -1163,7 +1160,8 @@ class PubServer(tornado.tcpserver.TCPServer, object):
TCP publisher
'''
def __init__(self, opts, io_loop=None):
super(PubServer, self).__init__(io_loop=io_loop, ssl_options=opts.get('ssl'))
super(PubServer, self).__init__(ssl_options=opts.get('ssl'))
self.io_loop = io_loop
self.opts = opts
self._closing = False
self.clients = set()