Merge pull request #19544 from saltstack/revert-19318-develop

Revert "Cleanup and performance tweaks"
This commit is contained in:
Mike Place 2015-01-08 14:57:57 -07:00
commit 372cd762c6
5 changed files with 44 additions and 45 deletions

View File

@ -610,8 +610,6 @@ class Auth(object):
m_pub_fn = os.path.join(self.opts['pki_dir'], self.mpub)
auth['master_uri'] = self.opts['master_uri']
sreq = salt.payload.SREQ(
self.opts['master_uri'],
)
@ -820,5 +818,4 @@ class SAuth(Auth):
log.debug('Authentication wait time is {0}'.format(acceptance_wait_time))
continue
break
self.creds = creds
return Crypticle(self.opts, creds['aes'])

View File

@ -812,9 +812,7 @@ class MWorker(multiprocessing.Process):
try:
data = self.crypticle.loads(load)
except Exception:
# return something not encrypted so the minions know that they aren't
# encrypting correctly.
return 'bad load'
return ''
if 'cmd' not in data:
log.error('Received malformed command {0}'.format(data))
return {}

View File

@ -284,7 +284,8 @@ class SMinion(object):
self.opts['environment']
).compile_pillar()
self.functions = salt.loader.minion_mods(self.opts, include_errors=True)
self.function_errors = self.functions.pop('_errors') # Keep the funcs clean
self.function_errors = self.functions['_errors']
self.functions.pop('_errors') # Keep the funcs clean
self.returners = salt.loader.returners(self.opts, self.functions)
self.states = salt.loader.states(self.opts, self.functions)
self.rend = salt.loader.render(self.opts, self.functions)
@ -1413,13 +1414,34 @@ class Minion(MinionBase):
self.opts['master_ip']
)
)
auth = salt.crypt.SAuth(self.opts)
auth = salt.crypt.Auth(self.opts)
self.tok = auth.gen_token('salt')
self.crypticle = auth.crypticle
acceptance_wait_time = self.opts['acceptance_wait_time']
acceptance_wait_time_max = self.opts['acceptance_wait_time_max']
if not acceptance_wait_time_max:
acceptance_wait_time_max = acceptance_wait_time
while True:
creds = auth.sign_in(timeout, safe)
if creds == 'full':
return creds
elif creds != 'retry':
log.info('Authentication with master at {0} successful!'.format(self.opts['master_ip']))
break
log.info('Waiting for minion key to be accepted by the master.')
if acceptance_wait_time:
log.info('Waiting {0} seconds before retry.'.format(acceptance_wait_time))
time.sleep(acceptance_wait_time)
if acceptance_wait_time < acceptance_wait_time_max:
acceptance_wait_time += acceptance_wait_time
log.debug('Authentication wait time is {0}'.format(acceptance_wait_time))
self.aes = creds['aes']
if self.opts.get('syndic_master_publish_port'):
self.publish_port = self.opts.get('syndic_master_publish_port')
else:
self.publish_port = auth.creds['publish_port']
self.publish_port = creds['publish_port']
self.crypticle = salt.crypt.Crypticle(self.opts, self.aes)
def module_refresh(self, force_refresh=False):
'''
@ -2112,10 +2134,7 @@ class Syndic(Minion):
self.event_forward_timeout = (
time.time() + self.opts['syndic_event_forward_timeout']
)
tag_parts = event['tag'].split('/')
if len(tag_parts) >= 4 and tag_parts[1] == 'job' and \
salt.utils.jid.is_jid(tag_parts[2]) and tag_parts[3] == 'ret' and \
'return' in event['data']:
if salt.utils.jid.is_jid(event['tag']) and 'return' in event['data']:
if 'jid' not in event['data']:
# Not a job return
continue
@ -2129,8 +2148,7 @@ class Syndic(Minion):
self.mminion.returners[fstr](event['data']['jid'])
)
if 'master_id' in event['data']:
# __'s to make sure it doesn't print out on the master cli
jdict['__master_id__'] = event['data']['master_id']
jdict['master_id'] = event['data']['master_id']
jdict[event['data']['id']] = event['data']['return']
else:
# Add generic event aggregation here
@ -2348,16 +2366,12 @@ class MultiSyndic(MinionBase):
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
break
raise
log.trace('Got event {0}'.format(event['tag']))
if self.event_forward_timeout is None:
self.event_forward_timeout = (
time.time() + self.opts['syndic_event_forward_timeout']
)
tag_parts = event['tag'].split('/')
if len(tag_parts) >= 4 and tag_parts[1] == 'job' and \
salt.utils.jid.is_jid(tag_parts[2]) and tag_parts[3] == 'ret' and \
'return' in event['data']:
if salt.utils.jid.is_jid(event['tag']) and 'return' in event['data']:
if 'jid' not in event['data']:
# Not a job return
continue

View File

@ -74,7 +74,9 @@ class RemotePillar(object):
self.ext = ext
self.grains = grains
self.id_ = id_
self.serial = salt.payload.Serial(self.opts)
self.channel = salt.transport.Channel.factory(opts)
# self.auth = salt.crypt.SAuth(opts)
def compile_pillar(self):
'''
@ -87,11 +89,12 @@ class RemotePillar(object):
'cmd': '_pillar'}
if self.ext:
load['ext'] = self.ext
ret_pillar = self.channel.crypted_transfer_decode_dictentry(load,
dictkey='pillar',
tries=3,
timeout=7200,
)
ret_pillar = self.channel.crypted_transfer_decode_dictentry(load, dictkey='pillar', tries=3, timeout=7200)
# key = self.auth.get_keys()
# aes = key.private_decrypt(ret['key'], 4)
# pcrypt = salt.crypt.Crypticle(self.opts, aes)
# ret_pillar = pcrypt.loads(ret['pillar'])
if not isinstance(ret_pillar, dict):
log.error(

View File

@ -213,7 +213,6 @@ class ZeroMQChannel(Channel):
# the sreq is the zmq connection, since those are relatively expensive to
# set up, we are going to reuse them as much as possible.
sreq_cache = defaultdict(dict)
auth_cache = defaultdict(dict)
@property
def sreq_key(self):
@ -225,19 +224,6 @@ class ZeroMQChannel(Channel):
threading.current_thread().name, # per per-thread
)
# TODO: change SAuth to return a singleton, so we don't have to do this
@property
def auth(self):
'''
Return the appropriate "auth" for this channel
Note: auth is only cached keyed by master_uri, this means we assume that
a given master_uri has ONE auth mechanism (which seems reasonable enough)
'''
if self.master_uri not in ZeroMQChannel.auth_cache:
ZeroMQChannel.auth_cache[self.master_uri] = salt.crypt.SAuth(self.opts)
return ZeroMQChannel.auth_cache[self.master_uri]
@property
def sreq(self):
# When using threading, like on Windows, don't cache.
@ -272,15 +258,16 @@ class ZeroMQChannel(Channel):
# crypt defaults to 'aes'
self.crypt = kwargs.get('crypt', 'aes')
if self.crypt != 'clear':
if 'auth' in kwargs:
self.auth = kwargs['auth']
else:
self.auth = salt.crypt.SAuth(opts)
if 'master_uri' in kwargs:
self.master_uri = kwargs['master_uri']
else:
self.master_uri = opts['master_uri']
if self.crypt != 'clear':
if 'auth' in kwargs and self.master_uri not in self.auth_cache:
self.auth_cache[self.master_uri] = kwargs['auth']
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
ret = self.sreq.send('aes', self.auth.crypticle.dumps(load), tries, timeout)
key = self.auth.get_keys()
@ -311,7 +298,7 @@ class ZeroMQChannel(Channel):
try:
return _do_transfer()
except salt.crypt.AuthenticationError:
del ZeroMQChannel.auth_cache[self.master_uri]
self.auth = salt.crypt.SAuth(self.opts)
return _do_transfer()
def _uncrypted_transfer(self, load, tries=3, timeout=60):