Misc cleanup

This commit is contained in:
Thomas Jackson 2015-03-07 12:26:19 -08:00
parent 9d606294dd
commit 35977e42f9

View File

@ -117,6 +117,9 @@ class TCPReqChannel(salt.transport.client.ReqChannel):
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self.io_loop = tornado.ioloop.IOLoop()
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop)
def _package_load(self, load):
return self.serial.dumps({
'enc': self.crypt,
@ -255,8 +258,8 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
self.payload_handler = payload_handler
self.io_loop = io_loop
self.req_server = SaltMessageServer(self.handle_message, io_loop=io_loop)
self.socket.listen(self.backlog)
self.req_server.add_socket(self.socket)
self.socket.listen(self.backlog)
self.serial = salt.payload.Serial(self.opts)
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self)
@ -324,15 +327,11 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
self.clients.append((stream, address))
try:
while True:
print ('first read until')
header_len = yield stream.read_until(' ')
print ('header_len', header_len)
header_raw = yield stream.read_bytes(int(header_len))
header_raw = yield stream.read_bytes(int(header_len.strip()))
header = msgpack.loads(header_raw)
print ('header')
body_raw = yield stream.read_bytes(int(header['msgLen']))
body = msgpack.loads(body_raw)
print ('body')
self.message_handler(stream, header, body)
except tornado.iostream.StreamClosedError:
@ -353,6 +352,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
self.clients.remove(item)
# TODO: test and use, cleanup send/recv queue stuff
class SaltMessageClient(object):
'''
Low-level message sending client
@ -362,37 +362,23 @@ class SaltMessageClient(object):
self.port = port
self.io_loop = io_loop
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=io_loop)
self.connected = False
self._connect_future = None
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=self.io_loop)
self._mid = 1
self.send_queue = [] # queue of messages to be sent
self.future_map = {} # mapping of request_id -> Future
self.io_loop.add_callback(self._stream_return)
self.io_loop.add_callback(self._stream_send)
@tornado.gen.coroutine
def stream(self):
if self._connect_future:
self._stream = yield self._connect_future
elif self.connected is False:
self._connect_future = self._tcp_client.connect(self.host, self.port)
self._stream = yield self._connect_future
self.connected = True
raise tornado.gen.Return(self._stream)
self._stream = None
@tornado.gen.coroutine
def _stream_return(self):
stream = yield self.stream()
while True:
try:
header_len = yield stream.read_until(' ')
header_raw = yield stream.read_bytes(int(header_len))
header_len = yield self._stream.read_until(' ')
header_raw = yield stream.read_bytes(int(header_len.strip()))
header = msgpack.loads(header_raw)
body = yield stream.read_bytes(int(header['msgLen']))
body = yield self._stream.read_bytes(int(header['msgLen']))
message_id = header['mid']
if message_id in self.future_map:
@ -404,20 +390,21 @@ class SaltMessageClient(object):
except Exception as e:
log.error('Exception parsing response', exc_info=True)
raise
stream.close()
self.connected = False # force a reconnect
for future in self.future_map.itervalues():
future.set_exception(e)
self.future_map = {}
stream = yield self.stream()
raise tornado.gen.Return()
@tornado.gen.coroutine
def _stream_send(self):
stream = yield self.stream()
if not self._stream:
self._stream = yield self._tcp_client.connect(self.host, self.port)
while True:
try:
item = self.send_queue.pop(0)
yield stream.write(item)
yield self._stream.write(item)
self.io_loop.add_callback(self._stream_return)
raise tornado.gen.Return()
except IndexError:
yield tornado.gen.sleep(1) # TODO: remove...
@ -444,6 +431,7 @@ class SaltMessageClient(object):
self.future_map[message_id] = future
self.send_queue.append(frame_msg(msg, header=header))
self.io_loop.add_callback(self._stream_send)
return future