Major overhaul of tcp implementation

Now using the same crypto stuff as the zeromq implementation
This commit is contained in:
Thomas Jackson 2015-02-28 13:03:46 -08:00
parent 11195c4f2c
commit fc83b44fae
6 changed files with 604 additions and 510 deletions

View File

@ -720,8 +720,6 @@ class MWorker(multiprocessing.Process):
'''
key = payload['enc']
load = payload['load']
if load['cmd'] in dir(ClearFuncs):
key = 'clear'
return {'aes': self._handle_aes,
'clear': self._handle_clear}[key](load)

View File

@ -1592,7 +1592,7 @@ class Minion(MinionBase):
# TODO: rename?? Maybe do_pub_recv and take a list of them?
# for some reason, native FDs sometimes return event 5, whatever that is...
if socks.get(self.pub_channel.poll_key):
print ('got stuff from pub_channel')
print ('got stuff from pub_channel', socks)
self._do_socket_recv()
# Check the event system

View File

View File

@ -0,0 +1,470 @@
import multiprocessing
import ctypes
import logging
import os
import hashlib
from M2Crypto import RSA
# salt libs
import salt.crypt
import salt.payload
import salt.master
import salt.utils.event
log = logging.getLogger(__name__)
class AESPubClientMixin(object):
def _verify_master_signature(self, payload):
if payload.get('sig') and self.opts.get('sign_pub_messages'):
# Verify that the signature is valid
master_pubkey_path = os.path.join(self.opts['pki_dir'], 'minion_master.pub')
if not salt.crypt.verify_signature(master_pubkey_path, load, payload.get('sig')):
raise salt.crypt.AuthenticationError('Message signature failed to validate.')
def _decode_payload(self, payload):
# we need to decrypt it
if payload['enc'] == 'aes':
self._verify_master_signature(payload)
try:
payload['load'] = self.auth.crypticle.loads(payload['load'])
except salt.crypt.AuthenticationError:
self.auth.authenticate()
payload['load'] = self.auth.crypticle.loads(payload['load'])
return payload
# TODO: rename?
class AESReqServerMixin(object):
'''
Mixin to house all of the master-side auth crypto
'''
def pre_fork(self, _):
'''
Pre-fork we need to create the zmq router device
'''
salt.master.SMaster.secrets['aes'] = {'secret': multiprocessing.Array(ctypes.c_char,
salt.crypt.Crypticle.generate_key_string()),
'reload': salt.crypt.Crypticle.generate_key_string,
}
def post_fork(self):
self.serial = salt.payload.Serial(self.opts)
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
# other things needed for _auth
# Create the event manager
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.auto_key = salt.daemons.masterapi.AutoKey(self.opts)
# only create a con_cache-client if the con_cache is active
if self.opts['con_cache']:
self.cache_cli = CacheCli(self.opts)
else:
self.cache_cli = False
# Make an minion checker object
self.ckminions = salt.utils.minions.CkMinions(self.opts)
self.master_key = salt.crypt.MasterKeys(self.opts)
def send_clear(self, payload):
'''
Send a response to a recv()'d payload
'''
payload['enc'] = 'clear' # make sure we set enc
self._send(payload)
def send(self, payload):
'''
Send a response to a recv()'d payload
'''
self._send(self.crypticle.dumps(payload))
def send_private(self, payload, dictkey, target):
'''
Send a response to a recv()'d payload encrypted privately for target
'''
self._send(self._encrypt_private(payload, dictkey, target))
def _encrypt_private(self, ret, dictkey, target):
'''
The server equivalent of ReqChannel.crypted_transfer_decode_dictentry
'''
# encrypt with a specific AES key
pubfn = os.path.join(self.opts['pki_dir'],
'minions',
target)
key = salt.crypt.Crypticle.generate_key_string()
pcrypt = salt.crypt.Crypticle(
self.opts,
key)
try:
pub = RSA.load_pub_key(pubfn)
except RSA.RSAError:
return self.crypticle.dumps({})
pret = {}
pret['key'] = pub.public_encrypt(key, 4)
pret[dictkey] = pcrypt.dumps(
ret if ret is not False else {}
)
return pret
def _update_aes(self):
'''
Check to see if a fresh AES key is available and update the components
of the worker
'''
if salt.master.SMaster.secrets['aes']['secret'].value != self.crypticle.key_string:
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
return True
return False
def _decode_payload(self, payload):
# we need to decrypt it
if payload['enc'] == 'aes':
try:
try:
payload['load'] = self.crypticle.loads(payload['load'])
except salt.crypt.AuthenticationError:
if not self._update_aes():
raise
payload['load'] = self.crypticle.loads(payload['load'])
except Exception:
# send something back to the client so the client so they know
# their load was malformed
self.send('bad load')
raise
# intercept the "_auth" commands, since the main daemon shouldn't know
# anything about our key auth
if payload['enc'] == 'clear' and payload['load']['cmd'] == '_auth':
self.send_clear(self._auth(payload['load']))
return None
return payload
def _auth(self, load):
'''
Authenticate the client, use the sent public key to encrypt the AES key
which was generated at start up.
This method fires an event over the master event manager. The event is
tagged "auth" and returns a dict with information about the auth
event
# Verify that the key we are receiving matches the stored key
# Store the key if it is not there
# Make an RSA key with the pub key
# Encrypt the AES key as an encrypted salt.payload
# Package the return and return it
'''
if not salt.utils.verify.valid_id(self.opts, load['id']):
log.info(
'Authentication request from invalid id {id}'.format(**load)
)
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication request from {id}'.format(**load))
# 0 is default which should be 'unlimited'
if self.opts['max_minions'] > 0:
# use the ConCache if enabled, else use the minion utils
if self.cache_cli:
minions = self.cache_cli.get_cached()
else:
minions = self.ckminions.connected_ids()
if len(minions) > 1000:
log.info('With large numbers of minions it is advised '
'to enable the ConCache with \'con_cache: True\' '
'in the masters configuration file.')
if not len(minions) <= self.opts['max_minions']:
# we reject new minions, minions that are already
# connected must be allowed for the mine, highstate, etc.
if load['id'] not in minions:
msg = ('Too many minions connected (max_minions={0}). '
'Rejecting connection from id '
'{1}'.format(self.opts['max_minions'],
load['id']))
log.info(msg)
eload = {'result': False,
'act': 'full',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': 'full'}}
# Check if key is configured to be auto-rejected/signed
auto_reject = self.auto_key.check_autoreject(load['id'])
auto_sign = self.auto_key.check_autosign(load['id'])
pubfn = os.path.join(self.opts['pki_dir'],
'minions',
load['id'])
pubfn_pend = os.path.join(self.opts['pki_dir'],
'minions_pre',
load['id'])
pubfn_rejected = os.path.join(self.opts['pki_dir'],
'minions_rejected',
load['id'])
pubfn_denied = os.path.join(self.opts['pki_dir'],
'minions_denied',
load['id'])
if self.opts['open_mode']:
# open mode is turned on, nuts to checks and overwrite whatever
# is there
pass
elif os.path.isfile(pubfn_rejected):
# The key has been rejected, don't place it in pending
log.info('Public key rejected for {0}. Key is present in '
'rejection key dir.'.format(load['id']))
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
elif os.path.isfile(pubfn):
# The key has been accepted, check it
if salt.utils.fopen(pubfn, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'keys did not match. This may be an attempt to compromise '
'the Salt cluster.'.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
elif not os.path.isfile(pubfn_pend):
# The key has not been accepted, this is a new minion
if os.path.isdir(pubfn_pend):
# The key path is a directory, error out
log.info(
'New public key {id} is a directory'.format(**load)
)
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
if auto_reject:
key_path = pubfn_rejected
log.info('New public key for {id} rejected via autoreject_file'
.format(**load))
key_act = 'reject'
key_result = False
elif not auto_sign:
key_path = pubfn_pend
log.info('New public key for {id} placed in pending'
.format(**load))
key_act = 'pend'
key_result = True
else:
# The key is being automatically accepted, don't do anything
# here and let the auto accept logic below handle it.
key_path = None
if key_path is not None:
# Write the key to the appropriate location
with salt.utils.fopen(key_path, 'w+') as fp_:
fp_.write(load['pub'])
ret = {'enc': 'clear',
'load': {'ret': key_result}}
eload = {'result': key_result,
'act': key_act,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret
elif os.path.isfile(pubfn_pend):
# This key is in the pending dir and is awaiting acceptance
if auto_reject:
# We don't care if the keys match, this minion is being
# auto-rejected. Move the key file from the pending dir to the
# rejected dir.
try:
shutil.move(pubfn_pend, pubfn_rejected)
except (IOError, OSError):
pass
log.info('Pending public key for {id} rejected via '
'autoreject_file'.format(**load))
ret = {'enc': 'clear',
'load': {'ret': False}}
eload = {'result': False,
'act': 'reject',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret
elif not auto_sign:
# This key is in the pending dir and is not being auto-signed.
# Check if the keys are the same and error out if this is the
# case. Otherwise log the fact that the minion is still
# pending.
if salt.utils.fopen(pubfn_pend, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'key in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
else:
log.info(
'Authentication failed from host {id}, the key is in '
'pending and needs to be accepted with salt-key '
'-a {id}'.format(**load)
)
eload = {'result': True,
'act': 'pend',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': True}}
else:
# This key is in pending and has been configured to be
# auto-signed. Check to see if it is the same key, and if
# so, pass on doing anything here, and let it get automatically
# accepted below.
if salt.utils.fopen(pubfn_pend, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'keys in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
else:
pass
else:
# Something happened that I have not accounted for, FAIL!
log.warn('Unaccounted for authentication failure')
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication accepted from {id}'.format(**load))
# only write to disk if you are adding the file, and in open mode,
# which implies we accept any key from a minion.
if not os.path.isfile(pubfn) and not self.opts['open_mode']:
with salt.utils.fopen(pubfn, 'w+') as fp_:
fp_.write(load['pub'])
elif self.opts['open_mode']:
disk_key = ''
if os.path.isfile(pubfn):
with salt.utils.fopen(pubfn, 'r') as fp_:
disk_key = fp_.read()
if load['pub'] and load['pub'] != disk_key:
log.debug('Host key change detected in open mode.')
with salt.utils.fopen(pubfn, 'w+') as fp_:
fp_.write(load['pub'])
pub = None
# the con_cache is enabled, send the minion id to the cache
if self.cache_cli:
self.cache_cli.put_cache([load['id']])
# The key payload may sometimes be corrupt when using auto-accept
# and an empty request comes in
try:
pub = RSA.load_pub_key(pubfn)
except RSA.RSAError as err:
log.error('Corrupt public key "{0}": {1}'.format(pubfn, err))
return {'enc': 'clear',
'load': {'ret': False}}
ret = {'enc': 'pub',
'pub_key': self.master_key.get_pub_str(),
'publish_port': self.opts['publish_port']}
# sign the masters pubkey (if enabled) before it is
# send to the minion that was just authenticated
if self.opts['master_sign_pubkey']:
# append the pre-computed signature to the auth-reply
if self.master_key.pubkey_signature():
log.debug('Adding pubkey signature to auth-reply')
log.debug(self.master_key.pubkey_signature())
ret.update({'pub_sig': self.master_key.pubkey_signature()})
else:
# the master has its own signing-keypair, compute the master.pub's
# signature and append that to the auth-reply
log.debug("Signing master public key before sending")
pub_sign = salt.crypt.sign_message(self.master_key.get_sign_paths()[1],
ret['pub_key'])
ret.update({'pub_sig': binascii.b2a_base64(pub_sign)})
if self.opts['auth_mode'] >= 2:
if 'token' in load:
try:
mtoken = self.master_key.key.private_decrypt(load['token'], 4)
aes = '{0}_|-{1}'.format(salt.master.SMaster.secrets['aes']['secret'].value, mtoken)
except Exception:
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
else:
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(aes, 4)
else:
if 'token' in load:
try:
mtoken = self.master_key.key.private_decrypt(
load['token'], 4
)
ret['token'] = pub.public_encrypt(mtoken, 4)
except Exception:
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(salt.master.SMaster.secrets['aes']['secret'].value, 4)
# Be aggressive about the signature
digest = hashlib.sha256(aes).hexdigest()
ret['sig'] = self.master_key.key.private_encrypt(digest, 5)
eload = {'result': True,
'act': 'accept',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret

View File

@ -1,5 +1,13 @@
'''
Zeromq transport classes
TCP transport classes
Wire protocol:
#### msg
# == len of msg
'''
import socket
@ -32,18 +40,43 @@ from collections import defaultdict
import salt.transport.client
import salt.transport.server
import salt.transport.mixins.auth
# for IPC (for now)
import zmq
log = logging.getLogger(__name__)
# TODO: put in some lib?
def frame_msg(msg):
return '{0} {1}'.format(len(msg), msg)
def unframe_msg(frame):
'''
Return a tuple of (remaining_bits, msg)
'''
msg_len, msg = frame.split(' ', 1)
return (int(msg_len) - len(msg), msg)
def socket_frame_recv(socket, recv_size=4096):
'''
Retrieve a frame from socket
'''
ret_frame = socket.recv(recv_size)
remain, ret_msg = unframe_msg(ret_frame)
while remain > 0:
data = socket.recv(recv_size)
ret_msg += data
remain -= len(data)
return ret_msg
class TCPReqChannel(salt.transport.client.ReqChannel):
'''
Encapsulate sending routines to tcp.
TODO:
- add crypto-- clear for starters
- add timeouts
- keepalive?
'''
@ -53,29 +86,84 @@ class TCPReqChannel(salt.transport.client.ReqChannel):
self.serial = salt.payload.Serial(self.opts)
# crypt defaults to 'aes'
self.crypt = kwargs.get('crypt', 'aes')
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
self.auth = salt.crypt.SAuth(self.opts)
@property
def master_addr(self):
# TODO: opts...
return ('127.0.0.1',
4506)
def _package_load(self, load):
return self.serial.dumps({
'enc': self.crypt,
'load': load,
})
@property
def socket(self):
if not hasattr(self, '_socket'):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(self.master_addr)
return self._socket
def _send_recv(self, msg):
'''
Do a blocking send/recv combo
'''
self.socket.send(frame_msg(msg))
return socket_frame_recv(self.socket)
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
return self.send(load, tries=tries, timeout=timeout)
# send msg
ret = self._send_recv(self._package_load(self.auth.crypticle.dumps(load)))
# wait for response
ret = self.serial.loads(ret)
key = self.auth.get_keys()
aes = key.private_decrypt(ret['key'], 4)
pcrypt = salt.crypt.Crypticle(self.opts, aes)
return pcrypt.loads(ret[dictkey])
def _crypted_transfer(self, load, tries=3, timeout=60):
'''
In case of authentication errors, try to renegotiate authentication
and retry the method.
Indeed, we can fail too early in case of a master restart during a
minion state execution call
'''
def _do_transfer():
data = self._send_recv(self._package_load(self.auth.crypticle.dumps(load)))
data = self.serial.loads(data)
# we may not have always data
# as for example for saltcall ret submission, this is a blind
# communication, we do not subscribe to return events, we just
# upload the results to the master
if data:
data = self.auth.crypticle.loads(data)
return data
try:
return _do_transfer()
except salt.crypt.AuthenticationError:
self.auth.authenticate()
return _do_transfer()
def _uncrypted_transfer(self, load, tries=3, timeout=60):
ret = self._send_recv(self._package_load(load))
return self.serial.loads(ret)
def send(self, load, tries=3, timeout=60):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect
s.connect(self.master_addr)
# send msg
s.send(self.serial.dumps(load))
# wait for response
data = s.recv(self.recv_size)
s.close()
print self.master_addr, len(self.serial.dumps(load))
return self.serial.loads(data)
if self.crypt == 'clear': # for sign-in requests
return self._uncrypted_transfer(load, tries, timeout)
else: # for just about everything else
return self._crypted_transfer(load, tries, timeout)
class TCPPubChannel(salt.transport.client.PubChannel):
class TCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.PubChannel):
recv_size = 16384
def __init__(self,
@ -83,6 +171,7 @@ class TCPPubChannel(salt.transport.client.PubChannel):
**kwargs):
self.opts = opts
self.auth = salt.crypt.SAuth(self.opts)
self.serial = salt.payload.Serial(self.opts)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -106,13 +195,6 @@ class TCPPubChannel(salt.transport.client.PubChannel):
return (self.opts['master_ip'],
4505)
def _verify_master_signature(self, payload):
if payload.get('sig') and self.opts.get('sign_pub_messages'):
# Verify that the signature is valid
master_pubkey_path = os.path.join(self.opts['pki_dir'], 'minion_master.pub')
if not salt.crypt.verify_signature(master_pubkey_path, load, payload.get('sig')):
raise salt.crypt.AuthenticationError('Message signature failed to validate.')
def recv(self, timeout=0):
'''
Get a pub job, with an optional timeout (0==forever)
@ -121,8 +203,8 @@ class TCPPubChannel(salt.transport.client.PubChannel):
timeout = None
socks = select.select([self.socket], [], [], timeout)
if self.socket in socks[0]:
data = self.socket.recv(self.recv_size)
return self.serial.loads(data)
data = socket_frame_recv(self.socket)
return self._decode_payload(self.serial.loads(data))
else:
return None
@ -134,13 +216,13 @@ class TCPPubChannel(salt.transport.client.PubChannel):
print ('noblock get??')
socks = select.select([self.socket], [], [], 0) #nonblocking select
if self.socket in socks[0]:
data = self.socket.recv(self.recv_size)
return self.serial.loads(data)
data = socket_frame_recv(self.socket)
return self._decode_payload(self.serial.loads(data))
else:
return None
class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
# TODO: opts!
backlog = 5
size = 16384
@ -153,6 +235,7 @@ class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
'''
Pre-fork we need to create the zmq router device
'''
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.bind((self.opts['interface'], int(self.opts['ret_port'])))
@ -167,16 +250,7 @@ class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
self.serial = salt.payload.Serial(self.opts)
# other things needed for _auth
# Create the event manager
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.auto_key = salt.daemons.masterapi.AutoKey(self.opts)
self.master_key = salt.crypt.MasterKeys(self.opts)
def payload_wrap(self, load):
return {'enc': 'aes',
'load': load}
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self)
def recv(self, timeout=0):
'''
@ -184,13 +258,19 @@ class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
'''
if timeout == 0:
timeout = None
log.error('recv')
socks = select.select([self.socket], [], [], timeout)
if self.socket in socks[0]:
self.client, address = self.socket.accept()
print (address)
data = self.client.recv(self.size)
return self.payload_wrap(self.serial.loads(data))
payload = socket_frame_recv(self.client)
payload = self.serial.loads(payload)
payload = self._decode_payload(payload)
# if timeout was 0 and we got a None, we intercepted the job,
# so just queue up another recv()
if payload is None and timeout == None:
return self.recv(timeout=timeout)
return payload
else:
return None
@ -199,12 +279,13 @@ class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
Get a req job in a non-blocking manner.
Return load or None
'''
log.error('recv_noblock')
socks = select.select([self.socket], [], [], 0)
if self.socket in socks[0]:
self.client, address = self.socket.accept()
data = self.client.recv(self.size)
return self.payload_wrap(self.serial.loads(data))
package = socket_frame_recv(self.client)
payload = self.serial.loads(package)
payload = self._decode_payload(payload)
return payload
else:
return None
@ -212,28 +293,10 @@ class TCPReqServerChannel(salt.transport.server.ReqServerChannel):
'''
Helper function to serialize and send payload
'''
self.client.send(self.serial.dumps(payload))
self.client.send(frame_msg(self.serial.dumps(payload)))
self.client.close()
self.client = None
def send_clear(self, payload):
'''
Send a response to a recv()'d payload
'''
self._send(payload)
def send(self, payload):
'''
Send a response to a recv()'d payload
'''
self._send(payload)
def send_private(self, payload, dictkey, target):
'''
Send a response to a recv()'d payload encrypted privately for target
'''
self._send(payload)
class TCPPubServerChannel(salt.transport.server.PubServerChannel):
# TODO: opts!
@ -291,7 +354,7 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
# is it a publish job?
elif pull_sock in socks and socks[pull_sock] == zmq.POLLIN:
package = pull_sock.recv()
payload = salt.payload.unpackage(package)['payload']
payload = frame_msg(salt.payload.unpackage(package)['payload'])
print ('clients', clients)
for s in clients:
s.send(payload)
@ -325,11 +388,8 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
'''
payload = {'enc': 'aes'}
# TODO: re-enable
#crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
#payload['load'] = crypticle.dumps(load)
payload['load'] = load
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")

View File

@ -27,6 +27,7 @@ from collections import defaultdict
import salt.transport.client
import salt.transport.server
import salt.transport.mixins.auth
import zmq
@ -140,7 +141,7 @@ class ZeroMQReqChannel(salt.transport.client.ReqChannel):
return self._crypted_transfer(load, tries, timeout)
class ZeroMQPubChannel(salt.transport.client.PubChannel):
class ZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.PubChannel):
def __init__(self,
opts,
**kwargs):
@ -221,13 +222,6 @@ class ZeroMQPubChannel(salt.transport.client.PubChannel):
return 'tcp://{ip}:{port}'.format(ip=self.opts['master_ip'],
port=self.publish_port)
def _verify_master_signature(self, payload):
if payload.get('sig') and self.opts.get('sign_pub_messages'):
# Verify that the signature is valid
master_pubkey_path = os.path.join(self.opts['pki_dir'], 'minion_master.pub')
if not salt.crypt.verify_signature(master_pubkey_path, load, payload.get('sig')):
raise salt.crypt.AuthenticationError('Message signature failed to validate.')
def _decode_messages(self, messages):
'''
Take the zmq messages, decrypt/decode them into a payload
@ -242,17 +236,7 @@ class ZeroMQPubChannel(salt.transport.client.PubChannel):
else:
raise Exception(('Invalid number of messages ({0}) in zeromq pub'
'message from master').format(len(messages_len)))
# we need to decrypt it
if payload['enc'] == 'aes':
self._verify_master_signature(payload)
try:
payload['load'] = self.auth.crypticle.loads(payload['load'])
except salt.crypt.AuthenticationError:
self.auth.authenticate()
payload['load'] = self.auth.crypticle.loads(payload['load'])
return payload
return self._decode_payload(payload)
def recv(self, timeout=0):
'''
@ -287,7 +271,7 @@ class ZeroMQPubChannel(salt.transport.client.PubChannel):
return self.socket
class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
def zmq_device(self):
'''
Multiprocessing target for the zmq queue device
@ -329,10 +313,7 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
'''
Pre-fork we need to create the zmq router device
'''
salt.master.SMaster.secrets['aes'] = {'secret': multiprocessing.Array(ctypes.c_char,
salt.crypt.Crypticle.generate_key_string()),
'reload': salt.crypt.Crypticle.generate_key_string,
}
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
process_manager.add_process(self.zmq_device)
def post_fork(self):
@ -348,56 +329,7 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
log.info('Worker binding to socket {0}'.format(self.w_uri))
self._socket.connect(self.w_uri)
self.serial = salt.payload.Serial(self.opts)
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
# other things needed for _auth
# Create the event manager
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
self.auto_key = salt.daemons.masterapi.AutoKey(self.opts)
# only create a con_cache-client if the con_cache is active
if self.opts['con_cache']:
self.cache_cli = CacheCli(self.opts)
else:
self.cache_cli = False
# Make an minion checker object
self.ckminions = salt.utils.minions.CkMinions(self.opts)
self.master_key = salt.crypt.MasterKeys(self.opts)
def _update_aes(self):
'''
Check to see if a fresh AES key is available and update the components
of the worker
'''
if salt.master.SMaster.secrets['aes']['secret'].value != self.crypticle.key_string:
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
return True
return False
def _decode_payload(self, payload):
# we need to decrypt it
if payload['enc'] == 'aes':
try:
try:
payload['load'] = self.crypticle.loads(payload['load'])
except salt.crypt.AuthenticationError:
if not self._update_aes():
raise
payload['load'] = self.crypticle.loads(payload['load'])
except Exception:
# send something back to the client so the client so they know
# their load was malformed
self.send('bad load')
raise
# intercept the "_auth" commands, since the main daemon shouldn't know
# anything about our key auth
if payload['enc'] == 'clear' and payload['load']['cmd'] == '_auth':
self.send_clear(self._auth(payload['load']))
return None
return payload
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self)
def recv(self, timeout=0):
'''
@ -437,49 +369,6 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
'''
self._socket.send(self.serial.dumps(payload))
def send_clear(self, payload):
'''
Send a response to a recv()'d payload
'''
payload['enc'] = 'clear' # make sure we set enc
self._send(payload)
def send(self, payload):
'''
Send a response to a recv()'d payload
'''
self._send(self.crypticle.dumps(payload))
def send_private(self, payload, dictkey, target):
'''
Send a response to a recv()'d payload encrypted privately for target
'''
self._send(self._encrypt_private(payload, dictkey, target))
def _encrypt_private(self, ret, dictkey, target):
'''
The server equivalent of ReqChannel.crypted_transfer_decode_dictentry
'''
# encrypt with a specific AES key
pubfn = os.path.join(self.opts['pki_dir'],
'minions',
target)
key = salt.crypt.Crypticle.generate_key_string()
pcrypt = salt.crypt.Crypticle(
self.opts,
key)
try:
pub = RSA.load_pub_key(pubfn)
except RSA.RSAError:
return self.crypticle.dumps({})
pret = {}
pret['key'] = pub.public_encrypt(key, 4)
pret[dictkey] = pcrypt.dumps(
ret if ret is not False else {}
)
return pret
@property
def socket(self):
'''
@ -487,329 +376,6 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
'''
self._socket
def _auth(self, load):
'''
Authenticate the client, use the sent public key to encrypt the AES key
which was generated at start up.
This method fires an event over the master event manager. The event is
tagged "auth" and returns a dict with information about the auth
event
# Verify that the key we are receiving matches the stored key
# Store the key if it is not there
# Make an RSA key with the pub key
# Encrypt the AES key as an encrypted salt.payload
# Package the return and return it
'''
if not salt.utils.verify.valid_id(self.opts, load['id']):
log.info(
'Authentication request from invalid id {id}'.format(**load)
)
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication request from {id}'.format(**load))
# 0 is default which should be 'unlimited'
if self.opts['max_minions'] > 0:
# use the ConCache if enabled, else use the minion utils
if self.cache_cli:
minions = self.cache_cli.get_cached()
else:
minions = self.ckminions.connected_ids()
if len(minions) > 1000:
log.info('With large numbers of minions it is advised '
'to enable the ConCache with \'con_cache: True\' '
'in the masters configuration file.')
if not len(minions) <= self.opts['max_minions']:
# we reject new minions, minions that are already
# connected must be allowed for the mine, highstate, etc.
if load['id'] not in minions:
msg = ('Too many minions connected (max_minions={0}). '
'Rejecting connection from id '
'{1}'.format(self.opts['max_minions'],
load['id']))
log.info(msg)
eload = {'result': False,
'act': 'full',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': 'full'}}
# Check if key is configured to be auto-rejected/signed
auto_reject = self.auto_key.check_autoreject(load['id'])
auto_sign = self.auto_key.check_autosign(load['id'])
pubfn = os.path.join(self.opts['pki_dir'],
'minions',
load['id'])
pubfn_pend = os.path.join(self.opts['pki_dir'],
'minions_pre',
load['id'])
pubfn_rejected = os.path.join(self.opts['pki_dir'],
'minions_rejected',
load['id'])
pubfn_denied = os.path.join(self.opts['pki_dir'],
'minions_denied',
load['id'])
if self.opts['open_mode']:
# open mode is turned on, nuts to checks and overwrite whatever
# is there
pass
elif os.path.isfile(pubfn_rejected):
# The key has been rejected, don't place it in pending
log.info('Public key rejected for {0}. Key is present in '
'rejection key dir.'.format(load['id']))
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
elif os.path.isfile(pubfn):
# The key has been accepted, check it
if salt.utils.fopen(pubfn, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'keys did not match. This may be an attempt to compromise '
'the Salt cluster.'.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
elif not os.path.isfile(pubfn_pend):
# The key has not been accepted, this is a new minion
if os.path.isdir(pubfn_pend):
# The key path is a directory, error out
log.info(
'New public key {id} is a directory'.format(**load)
)
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
if auto_reject:
key_path = pubfn_rejected
log.info('New public key for {id} rejected via autoreject_file'
.format(**load))
key_act = 'reject'
key_result = False
elif not auto_sign:
key_path = pubfn_pend
log.info('New public key for {id} placed in pending'
.format(**load))
key_act = 'pend'
key_result = True
else:
# The key is being automatically accepted, don't do anything
# here and let the auto accept logic below handle it.
key_path = None
if key_path is not None:
# Write the key to the appropriate location
with salt.utils.fopen(key_path, 'w+') as fp_:
fp_.write(load['pub'])
ret = {'enc': 'clear',
'load': {'ret': key_result}}
eload = {'result': key_result,
'act': key_act,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret
elif os.path.isfile(pubfn_pend):
# This key is in the pending dir and is awaiting acceptance
if auto_reject:
# We don't care if the keys match, this minion is being
# auto-rejected. Move the key file from the pending dir to the
# rejected dir.
try:
shutil.move(pubfn_pend, pubfn_rejected)
except (IOError, OSError):
pass
log.info('Pending public key for {id} rejected via '
'autoreject_file'.format(**load))
ret = {'enc': 'clear',
'load': {'ret': False}}
eload = {'result': False,
'act': 'reject',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret
elif not auto_sign:
# This key is in the pending dir and is not being auto-signed.
# Check if the keys are the same and error out if this is the
# case. Otherwise log the fact that the minion is still
# pending.
if salt.utils.fopen(pubfn_pend, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'key in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
else:
log.info(
'Authentication failed from host {id}, the key is in '
'pending and needs to be accepted with salt-key '
'-a {id}'.format(**load)
)
eload = {'result': True,
'act': 'pend',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': True}}
else:
# This key is in pending and has been configured to be
# auto-signed. Check to see if it is the same key, and if
# so, pass on doing anything here, and let it get automatically
# accepted below.
if salt.utils.fopen(pubfn_pend, 'r').read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'keys in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
)
# put denied minion key into minions_denied
with salt.utils.fopen(pubfn_denied, 'w+') as fp_:
fp_.write(load['pub'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
else:
pass
else:
# Something happened that I have not accounted for, FAIL!
log.warn('Unaccounted for authentication failure')
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication accepted from {id}'.format(**load))
# only write to disk if you are adding the file, and in open mode,
# which implies we accept any key from a minion.
if not os.path.isfile(pubfn) and not self.opts['open_mode']:
with salt.utils.fopen(pubfn, 'w+') as fp_:
fp_.write(load['pub'])
elif self.opts['open_mode']:
disk_key = ''
if os.path.isfile(pubfn):
with salt.utils.fopen(pubfn, 'r') as fp_:
disk_key = fp_.read()
if load['pub'] and load['pub'] != disk_key:
log.debug('Host key change detected in open mode.')
with salt.utils.fopen(pubfn, 'w+') as fp_:
fp_.write(load['pub'])
pub = None
# the con_cache is enabled, send the minion id to the cache
if self.cache_cli:
self.cache_cli.put_cache([load['id']])
# The key payload may sometimes be corrupt when using auto-accept
# and an empty request comes in
try:
pub = RSA.load_pub_key(pubfn)
except RSA.RSAError as err:
log.error('Corrupt public key "{0}": {1}'.format(pubfn, err))
return {'enc': 'clear',
'load': {'ret': False}}
ret = {'enc': 'pub',
'pub_key': self.master_key.get_pub_str(),
'publish_port': self.opts['publish_port']}
# sign the masters pubkey (if enabled) before it is
# send to the minion that was just authenticated
if self.opts['master_sign_pubkey']:
# append the pre-computed signature to the auth-reply
if self.master_key.pubkey_signature():
log.debug('Adding pubkey signature to auth-reply')
log.debug(self.master_key.pubkey_signature())
ret.update({'pub_sig': self.master_key.pubkey_signature()})
else:
# the master has its own signing-keypair, compute the master.pub's
# signature and append that to the auth-reply
log.debug("Signing master public key before sending")
pub_sign = salt.crypt.sign_message(self.master_key.get_sign_paths()[1],
ret['pub_key'])
ret.update({'pub_sig': binascii.b2a_base64(pub_sign)})
if self.opts['auth_mode'] >= 2:
if 'token' in load:
try:
mtoken = self.master_key.key.private_decrypt(load['token'], 4)
aes = '{0}_|-{1}'.format(salt.master.SMaster.secrets['aes']['secret'].value, mtoken)
except Exception:
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
else:
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(aes, 4)
else:
if 'token' in load:
try:
mtoken = self.master_key.key.private_decrypt(
load['token'], 4
)
ret['token'] = pub.public_encrypt(mtoken, 4)
except Exception:
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(salt.master.SMaster.secrets['aes']['secret'].value, 4)
# Be aggressive about the signature
digest = hashlib.sha256(aes).hexdigest()
ret['sig'] = self.master_key.key.private_encrypt(digest, 5)
eload = {'result': True,
'act': 'accept',
'id': load['id'],
'pub': load['pub']}
self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth'))
return ret
class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):