Hacking to fix various things broken in rebase

This commit is contained in:
Thomas Jackson 2015-02-10 08:18:09 -08:00
parent 8e0316d613
commit 0a0caf6304
5 changed files with 34 additions and 26 deletions

View File

@ -28,7 +28,7 @@ from salt.ext.six.moves import input
# Import salt libs
import salt.config
import salt.loader
import salt.transport.channel
import salt.transport.client
import salt.utils
import salt.utils.minions
import salt.payload
@ -340,13 +340,13 @@ class Resolver(object):
if self.opts['transport'] == 'zeromq':
master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \
':' + str(self.opts['ret_port'])
channel = salt.transport.channel.ReqChannel.factory(self.opts,
channel = salt.transport.client.ReqChannel.factory(self.opts,
crypt='clear',
master_uri=master_uri)
return channel.send(load)
elif self.opts['transport'] == 'raet':
channel = salt.transport.channel.ReqChannel.factory(self.opts)
channel = salt.transport.client.ReqChannel.factory(self.opts)
channel.dst = (None, None, 'local_cmd')
return channel.send(load)

View File

@ -28,7 +28,7 @@ except ImportError:
import salt.defaults.exitcodes
import salt.utils
import salt.payload
import salt.transport.channel
import salt.transport.client
import salt.utils.verify
import salt.version
from salt.exceptions import (
@ -686,7 +686,7 @@ class SAuth(object):
auth['master_uri'] = self.opts['master_uri']
channel = salt.transport.channel.ReqChannel.factory(self.opts, crypt='clear')
channel = salt.transport.client.ReqChannel.factory(self.opts, crypt='clear')
try:
payload = channel.send(

View File

@ -88,7 +88,7 @@ class SMaster(object):
'''
Create a simple salt-master, this will generate the top-level master
'''
aes = None
secrets = {} # mapping of key -> {'secret': multiprocessing type, 'reload': FUNCTION}
def __init__(self, opts):
'''
@ -97,7 +97,6 @@ class SMaster(object):
:param dict opts: The salt options dictionary
'''
self.opts = opts
SMaster.aes = multiprocessing.Array(ctypes.c_char, salt.crypt.Crypticle.generate_key_string())
self.master_key = salt.crypt.MasterKeys(self.opts)
self.key = self.__prep_key()
@ -243,15 +242,16 @@ class Maintenance(multiprocessing.Process):
if to_rotate:
log.info('Rotating master AES key')
# should be unecessary-- since no one else should be modifying
with SMaster.aes.get_lock():
SMaster.aes.value = salt.crypt.Crypticle.generate_key_string()
self.event.fire_event({'rotate_aes_key': True}, tag='key')
for secret_key, secret_map in SMaster.secrets.iteritems():
# should be unecessary-- since no one else should be modifying
with secret_map['secret'].get_lock():
secret_map['secret'].value = secret_map['reload']()
self.event.fire_event({'rotate_{0}_key'.format(secret_key): True}, tag='key')
self.rotate = now
if self.opts.get('ping_on_rotate'):
# Ping all minions to get them to pick up the new key
log.debug('Pinging all connected minions '
'due to AES key rotation')
'due to key rotation')
salt.utils.master.ping_all_connected_minions(self.opts)
def handle_pillargit(self):
@ -442,15 +442,17 @@ class Master(SMaster):
log.info('Creating master process manager')
process_manager = salt.utils.process.ProcessManager()
log.info('Creating master maintenance process')
process_manager.add_process(Maintenance, args=(self.opts,))
log.info('Creating master publisher process')
publish_channel = salt.transport.server.PubServerChannel.factory(self.opts)
publish_channel.pre_fork(process_manager)
log.info('Creating master event publisher process')
process_manager.add_process(salt.utils.event.EventPublisher, args=(self.opts,))
salt.engines.start_engines(self.opts, process_manager)
# must be after channels
process_manager.add_process(Maintenance, args=(self.opts,))
log.info('Creating master publisher process')
if self.opts.get('reactor'):
log.info('Creating master reactor process')
process_manager.add_process(salt.utils.reactor.Reactor, args=(self.opts,))

View File

@ -759,7 +759,7 @@ class Minion(MinionBase):
self.opts['master_list'] = local_masters
try:
self.pub_channel = salt.transport.channel.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
self.pub_channel = salt.transport.client.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
conn = True
break
except SaltClientError:
@ -781,7 +781,7 @@ class Minion(MinionBase):
else:
opts.update(resolve_dns(opts))
super(Minion, self).__init__(opts)
self.pub_channel = salt.transport.channel.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
self.pub_channel = salt.transport.client.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
# TODO: remove? What is this used for...
self.tok = self.pub_channel.auth.gen_token('salt')
self.connected = True
@ -1456,7 +1456,7 @@ class Minion(MinionBase):
'master {0}'.format(self.opts['master']))
del self.pub_channel
del self.poller
self.pub_channel = salt.transport.channel.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
self.pub_channel = salt.transport.client.PubChannel.factory(self.opts, timeout=timeout, safe=safe)
self.poller.register(self.pub_channel.socket, zmq.POLLIN)
self.poller.register(self.epull_sock, zmq.POLLIN)
self._fire_master_minion_start()

View File

@ -6,6 +6,8 @@ import os
import threading
import errno
import hashlib
import ctypes
import multiprocessing
from M2Crypto import RSA
@ -323,6 +325,10 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
'''
Pre-fork we need to create the zmq router device
'''
salt.master.SMaster.secrets['aes'] = {'secret': multiprocessing.Array(ctypes.c_char,
salt.crypt.Crypticle.generate_key_string()),
'reload': salt.crypt.Crypticle.generate_key_string,
}
process_manager.add_process(self.zmq_device)
def post_fork(self):
@ -339,7 +345,7 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
self._socket.connect(self.w_uri)
self.serial = salt.payload.Serial(self.opts)
self.crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
# other things needed for _auth
# Create the event manager
@ -361,8 +367,8 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
Check to see if a fresh AES key is available and update the components
of the worker
'''
if self.opts['aes'].value != self.crypticle.key_string:
self.crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
if salt.master.SMaster.secrets['aes']['secret'].value != self.crypticle.key_string:
self.crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
return True
return False
@ -771,13 +777,13 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
if 'token' in load:
try:
mtoken = self.master_key.key.private_decrypt(load['token'], 4)
aes = '{0}_|-{1}'.format(self.opts['aes'].value, mtoken)
aes = '{0}_|-{1}'.format(salt.master.SMaster.secrets['aes']['secret'].value, mtoken)
except Exception:
# Token failed to decrypt, send back the salty bacon to
# support older minions
pass
else:
aes = self.opts['aes'].value
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(aes, 4)
else:
@ -792,8 +798,8 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
# support older minions
pass
aes = self.opts['aes'].value
ret['aes'] = pub.public_encrypt(self.opts['aes'].value, 4)
aes = salt.master.SMaster.secrets['aes']['secret'].value
ret['aes'] = pub.public_encrypt(salt.master.SMaster.secrets['aes']['secret'].value, 4)
# Be aggressive about the signature
digest = hashlib.sha256(aes).hexdigest()
ret['sig'] = self.master_key.key.private_encrypt(digest, 5)
@ -902,7 +908,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
'''
payload = {'enc': 'aes'}
crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')