Re-work req_channel in the master daemons, now we don't pass it down the stack, the master-side funcs need to return a dict of opts which specify which send_func to use

This commit is contained in:
Thomas Jackson 2015-01-27 08:12:56 -08:00
parent 8c66c4e857
commit 422d1c28a3
3 changed files with 53 additions and 27 deletions

View File

@ -654,8 +654,19 @@ class MWorker(multiprocessing.Process):
while True:
try:
payload = self.req_channel.recv()
ret = self._handle_payload(payload)
self.req_channel.send(ret)
ret, req_opts = self._handle_payload(payload)
# req_opts defines our response
# req_fun: default to send_clear
req_fun = req_opts.get('fun', 'send_clear')
if req_fun == 'send_clear':
self.req_channel.send_clear(ret)
elif req_fun == 'send':
self.req_channel.send(ret)
elif req_fun == 'send_private':
self.req_channel.send_private(ret, req_opts['key'], req_opts['tgt'])
else:
log.error('Unknown req_fun {0}'.format(req_fun))
# don't catch keyboard interrupts, just re-raise them
except KeyboardInterrupt:
raise
@ -719,7 +730,7 @@ class MWorker(multiprocessing.Process):
log.info('Clear payload received with command {cmd}'.format(**load))
if load['cmd'].startswith('__'):
return False
return getattr(self.clear_funcs, load['cmd'])(load)
return getattr(self.clear_funcs, load['cmd'])(load), {'fun': 'send_clear'}
def _handle_aes(self, data):
'''
@ -746,7 +757,7 @@ class MWorker(multiprocessing.Process):
self.opts,
self.key,
self.mkey)
self.aes_funcs = AESFuncs(self.opts, self.req_channel)
self.aes_funcs = AESFuncs(self.opts)
self.__bind()
@ -757,7 +768,7 @@ class AESFuncs(object):
'''
# The AES Functions:
#
def __init__(self, opts, req_channel):
def __init__(self, opts):
'''
Create a new AESFuncs
@ -770,7 +781,6 @@ class AESFuncs(object):
self.opts = opts
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.serial = salt.payload.Serial(opts)
self.req_channel = req_channel
self.ckminions = salt.utils.minions.CkMinions(opts)
# Make a client
self.local = salt.client.get_local_client(self.opts['conf_file'])
@ -1344,7 +1354,7 @@ class AESFuncs(object):
# Don't honor private functions
if func.startswith('__'):
# TODO: return some error? Seems odd to return {}
return self.req_channel.encrypt({})
return {}, {'fun': 'send'}
# Run the func
if hasattr(self, func):
try:
@ -1368,18 +1378,18 @@ class AESFuncs(object):
func
)
)
return self.req_channel.encrypt(False)
return False, {'fun': 'send'}
# Don't encrypt the return value for the _return func
# (we don't care about the return value, so why encrypt it?)
if func == '_return':
return ret
return ret, {'fun': 'send'}
if func == '_pillar' and 'id' in load:
if load.get('ver') != '2' and self.opts['pillar_version'] == 1:
# Authorized to return old pillar proto
return self.req_channel.encrypt(ret)
return self.req_channel.encrypt_private(ret, 'pillar', load['id'])
return ret, {'fun': 'send'}
return ret, {'fun': 'send_private', 'key': 'pillar', 'tgt': load['id']}
# Encrypt the return
return self.req_channel.encrypt(ret)
return ret, {'fun': 'send'}
class ClearFuncs(object):

View File

@ -71,18 +71,22 @@ class ReqServerChannel(object):
'''
raise NotImplementedError()
# TODO: use a send method? or have the recv return one?
def send(self, load, tries=3, timeout=60):
def send_clear(self, payload):
'''
Send "load" to the master.
Send a response to a recv()'d payload
'''
raise NotImplementedError()
# TODO:
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
def send(self, payload):
'''
Send "load" to the master in a way that the load is only readable by
the minion and the master (not other minions etc.)
Send a response to a recv()'d payload
'''
raise NotImplementedError()
def send_private(self, payload, dictkey, target):
'''
Send a response to a recv()'d payload encrypted privately for target
'''
raise NotImplementedError()

View File

@ -345,14 +345,19 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
'''
if self.opts['aes'].value != self.crypticle.key_string:
self.crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
return True
return False
def _decode_payload(self, payload):
# we need to decrypt it
if payload['enc'] == 'aes':
self._update_aes() # check if you need to update the aes key
try:
payload['load'] = self.crypticle.loads(payload['load'])
try:
payload['load'] = self.crypticle.loads(payload['load'])
except salt.crypt.AuthenticationError:
if not self._update_aes():
raise
payload['load'] = self.crypticle.loads(payload['load'])
except Exception:
# send something back to the client so the client so they know
# their load was malformed
@ -390,19 +395,26 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
# TODO? maybe have recv() return this function, so this class isn't tied to
# a send/recv order
def send(self, payload):
def send_clear(self, payload):
'''
Send a response to a recv()'d payload
'''
self._socket.send(self.serial.dumps(payload))
def encrypt(self, payload):
def send(self, payload):
'''
Regular encryption
Send a response to a recv()'d payload
'''
return self.crypticle.dumps(payload)
self.send_clear(self.crypticle.dumps(payload))
def encrypt_private(self, ret, dictkey, target):
def send_private(self, payload, dictkey, target):
'''
Send a response to a recv()'d payload encrypted privately for target
'''
self.send_clear(self._encrypt_private(payload, dictkey, target))
def _encrypt_private(self, ret, dictkey, target):
'''
The server equivalent of ReqChannel.crypted_transfer_decode_dictentry
'''