Add un-tested SaltMessageClient

This commit is contained in:
Thomas Jackson 2015-03-07 11:08:01 -08:00
parent 5bee5649db
commit 9d606294dd

View File

@ -44,6 +44,7 @@ import tornado
import tornado.tcpserver
import tornado.gen
import tornado.concurrent
import tornado.tcpclient
log = logging.getLogger(__name__)
@ -268,15 +269,16 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
try:
payload = self._decode_payload(payload)
except Exception:
stream.write(frame_msg(self.serial.dumps('bad load')))
stream.write(frame_msg(self.serial.dumps('bad load'), header=header))
raise tornado.gen.Return()
# intercept the "_auth" commands, since the main daemon shouldn't know
# anything about our key auth
if payload['enc'] == 'clear' and payload['load']['cmd'] == '_auth':
stream.write(frame_msg(self.serial.dumps(self._auth(payload['load']))))
yield stream.write(frame_msg(self.serial.dumps(self._auth(payload['load'])), header=header))
raise tornado.gen.Return()
# TODO: handle exceptions
try:
ret, req_opts = self.payload_handler(payload) # TODO: check if a future
@ -301,23 +303,6 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
stream.close()
def _send(self, payload):
'''
Helper function to serialize and send payload
'''
print ('sending??')
try:
self.client.write(frame_msg(self.serial.dumps(payload)))
# if there was an error, close the socket out
except socket.error as e:
self.client.close()
raise salt.exceptions.SaltClientError(e)
# always reset self.client
finally:
self.client = None
print ('sent')
class SaltMessageServer(tornado.tcpserver.TCPServer):
'''
Raw TCP server which will recieve all of the TCP streams and re-assemble
@ -339,15 +324,24 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
self.clients.append((stream, address))
try:
while True:
print ('first read until')
header_len = yield stream.read_until(' ')
print ('header_len', header_len)
header_raw = yield stream.read_bytes(int(header_len))
header = msgpack.loads(header_raw)
print ('header')
body_raw = yield stream.read_bytes(int(header['msgLen']))
body = msgpack.loads(body_raw)
print ('body')
self.message_handler(stream, header, body)
except tornado.iostream.StreamClosedError:
print ('req client disconnected {0}'.format(address))
self.clients.remove((stream, address))
except Exception as e:
print ('other master-side exception??', e, e.__module__, e.extra)
self.clients.remove((stream, address))
stream.close()
def shutdown(self):
'''
@ -359,10 +353,99 @@ class SaltMessageServer(tornado.tcpserver.TCPServer):
self.clients.remove(item)
class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, host, port, io_loop):
self.host = host
self.port = port
self.io_loop = io_loop
# TODO: subclass tcpclient
#class SaltMessageClient()
self._tcp_client = tornado.tcpclient.TCPClient(io_loop=io_loop)
self.connected = False
self._connect_future = None
self._mid = 1
self.send_queue = [] # queue of messages to be sent
self.future_map = {} # mapping of request_id -> Future
self.io_loop.add_callback(self._stream_return)
self.io_loop.add_callback(self._stream_send)
@tornado.gen.coroutine
def stream(self):
if self._connect_future:
self._stream = yield self._connect_future
elif self.connected is False:
self._connect_future = self._tcp_client.connect(self.host, self.port)
self._stream = yield self._connect_future
self.connected = True
raise tornado.gen.Return(self._stream)
@tornado.gen.coroutine
def _stream_return(self):
stream = yield self.stream()
while True:
try:
header_len = yield stream.read_until(' ')
header_raw = yield stream.read_bytes(int(header_len))
header = msgpack.loads(header_raw)
body = yield stream.read_bytes(int(header['msgLen']))
message_id = header['mid']
if message_id in self.future_map:
self.future_map[message_id].set_result(body)
del self.future_map[message_id]
else:
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
# TODO: if we know what message_id it is, we can save the stream
except Exception as e:
log.error('Exception parsing response', exc_info=True)
raise
stream.close()
self.connected = False # force a reconnect
for future in self.future_map.itervalues():
future.set_exception(e)
self.future_map = {}
stream = yield self.stream()
@tornado.gen.coroutine
def _stream_send(self):
stream = yield self.stream()
while True:
try:
item = self.send_queue.pop(0)
yield stream.write(item)
except IndexError:
yield tornado.gen.sleep(1) # TODO: remove...
# TODO: wrap? or use UUID?
def _message_id(self):
ret = self._mid
self._mid += 1
return ret
def send(self, msg, callback=None):
'''
Send given message, and return a future
'''
message_id = self._message_id()
header = {'mid': message_id}
future = tornado.concurrent.Future()
if callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
# Add this future to the mapping
self.future_map[message_id] = future
self.send_queue.append(frame_msg(msg, header=header))
return future
class PubServer(tornado.tcpserver.TCPServer):