mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Merge branch '2014.7' into develop
Conflicts: salt/config.py
This commit is contained in:
commit
3db44821e4
@ -494,8 +494,10 @@ class LoadPillar(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Initial pillar
|
||||
'''
|
||||
route = {'src': (self.opts.value['id'], 0, None),
|
||||
'dst': ('master', None, 'remote_cmd')}
|
||||
# default master is the first remote
|
||||
# this default destination will not work with multiple masters
|
||||
route = {'src': (self.udp_stack.value.local.name, 0, None),
|
||||
'dst': (self.udp_stack.value.remotes.values()[0].name, None, 'remote_cmd')}
|
||||
load = {'id': self.opts.value['id'],
|
||||
'grains': self.grains.value,
|
||||
'saltenv': self.opts.value['environment'],
|
||||
@ -723,18 +725,22 @@ class Router(ioflo.base.deeding.Deed):
|
||||
sender is the unique name of the remote estate that sent the message
|
||||
'''
|
||||
try:
|
||||
d_estate = msg['route']['dst'][0]
|
||||
d_yard = msg['route']['dst'][1]
|
||||
d_share = msg['route']['dst'][2]
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_estate is None: # drop
|
||||
return
|
||||
|
||||
if d_estate is None:
|
||||
pass
|
||||
elif d_estate != self.udp_stack.value.local.name:
|
||||
log.error(
|
||||
'Received message for wrong estate: {0}'.format(d_estate))
|
||||
return
|
||||
|
||||
if d_yard is not None:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.uxd_stack.value.nameRemotes:
|
||||
@ -769,22 +775,32 @@ class Router(ioflo.base.deeding.Deed):
|
||||
sender is unique name of remote that sent the message
|
||||
'''
|
||||
try:
|
||||
d_estate = msg['route']['dst'][0]
|
||||
d_yard = msg['route']['dst'][1]
|
||||
d_share = msg['route']['dst'][2]
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_yard is None:
|
||||
return # drop message
|
||||
|
||||
if s_estate is None: # substitute local estate
|
||||
s_estate = self.udp_stack.value.local.name
|
||||
msg['route']['src'] = (s_estate, s_yard, s_share)
|
||||
|
||||
if d_estate is None:
|
||||
pass
|
||||
elif d_estate != self.udp_stack.value.local:
|
||||
elif d_estate != self.udp_stack.value.local.name:
|
||||
# Forward to the correct estate
|
||||
uid = self.udp_stack.value.fetchUidByName(d_estate)
|
||||
self.udp_stack.value.message(msg, uid)
|
||||
if d_estate in self.udp_stack.value.nameRemotes:
|
||||
self.udp_stack.value.message(msg,
|
||||
self.udp_stack.value.nameRemotes[d_estate].uid)
|
||||
return
|
||||
|
||||
if d_share == 'pub_ret':
|
||||
if msg.get('__worker_verify') == self.worker_verify.value:
|
||||
self.publish.value.append(msg)
|
||||
|
||||
if d_yard is None:
|
||||
pass
|
||||
elif d_yard != self.uxd_stack.value.local.name:
|
||||
@ -931,7 +947,8 @@ class NixExecutor(ioflo.base.deeding.Deed):
|
||||
'modules': '.salt.loader.modules',
|
||||
'returners': '.salt.loader.returners',
|
||||
'fun': '.salt.var.fun',
|
||||
'executors': '.salt.track.executors'}
|
||||
'executors': '.salt.track.executors',
|
||||
'udp_stack': '.raet.udp.stack.stack', }
|
||||
|
||||
def postinitio(self):
|
||||
'''
|
||||
@ -971,9 +988,9 @@ class NixExecutor(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Send the return data back via the uxd socket
|
||||
'''
|
||||
route = {'src': (self.udp_stack.value.local.name, stack.local.name, 'jid_ret'),
|
||||
'dst': (msg['route']['src'][0], None, 'remote_cmd')}
|
||||
mid = self.opts['id']
|
||||
route = {'src': (mid, stack.local.name, 'jid_ret'),
|
||||
'dst': (msg['route']['src'][0], None, 'remote_cmd')}
|
||||
ret['cmd'] = '_return'
|
||||
ret['id'] = mid
|
||||
try:
|
||||
|
@ -8,6 +8,7 @@ The core behaviors used by minion and master
|
||||
import time
|
||||
import os
|
||||
import multiprocessing
|
||||
import logging
|
||||
|
||||
# Import salt libs
|
||||
import salt.daemons.masterapi
|
||||
@ -19,6 +20,8 @@ from raet.lane.yarding import RemoteYard
|
||||
# Import ioflo libs
|
||||
import ioflo.base.deeding
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WorkerFork(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
@ -153,6 +156,7 @@ class WorkerRouter(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Ioinits = {
|
||||
'uxd_stack': '.salt.uxd.stack.stack',
|
||||
'udp_stack': '.raet.udp.stack.stack',
|
||||
'opts': '.salt.opts',
|
||||
'yid': '.salt.yid',
|
||||
'worker_verify': '.salt.var.worker_verify',
|
||||
@ -168,6 +172,13 @@ class WorkerRouter(ioflo.base.deeding.Deed):
|
||||
self.uxd_stack.value.serviceAll()
|
||||
while self.uxd_stack.value.rxMsgs:
|
||||
msg, sender = self.uxd_stack.value.rxMsgs.popleft()
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if 'load' in msg:
|
||||
cmd = msg['load'].get('cmd')
|
||||
if not cmd:
|
||||
@ -175,10 +186,10 @@ class WorkerRouter(ioflo.base.deeding.Deed):
|
||||
elif cmd.startswith('__'):
|
||||
continue
|
||||
ret = {}
|
||||
if msg['route']['dst'][2] == 'remote_cmd':
|
||||
if d_share == 'remote_cmd':
|
||||
if hasattr(self.remote.value, cmd):
|
||||
ret['return'] = getattr(self.remote.value, cmd)(msg['load'])
|
||||
elif msg['route']['dst'][2] == 'local_cmd':
|
||||
elif d_share == 'local_cmd':
|
||||
if hasattr(self.local.value, cmd):
|
||||
ret['return'] = getattr(self.local.value, cmd)(msg['load'])
|
||||
else:
|
||||
@ -187,10 +198,10 @@ class WorkerRouter(ioflo.base.deeding.Deed):
|
||||
r_share = 'pub_ret'
|
||||
ret['__worker_verify'] = self.worker_verify.value
|
||||
else:
|
||||
r_share = msg['route']['src'][2]
|
||||
r_share = s_share
|
||||
ret['route'] = {
|
||||
'src': (self.opts.value.get('id', 'master'), self.uxd_stack.value.local.name, None),
|
||||
'dst': (msg['route']['src'][0], msg['route']['src'][1], r_share)
|
||||
'src': (None, self.uxd_stack.value.local.name, None),
|
||||
'dst': (s_estate, s_yard, r_share)
|
||||
}
|
||||
self.uxd_stack.value.transmit(ret,
|
||||
self.uxd_stack.value.fetchUidByName('manor'))
|
||||
|
@ -331,7 +331,7 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn):
|
||||
|
||||
self.run_script(
|
||||
'salt-call',
|
||||
'-c {0} --output-file={1} --output-file-append -g'.format(
|
||||
'-c {0} --output-file={1} -g'.format(
|
||||
self.get_config_dir(),
|
||||
output_file
|
||||
),
|
||||
@ -340,27 +340,7 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn):
|
||||
)
|
||||
stat2 = os.stat(output_file)
|
||||
self.assertEqual(stat1.st_mode, stat2.st_mode)
|
||||
# Data was appeneded to file
|
||||
self.assertTrue(stat1.st_size < stat2.st_size)
|
||||
|
||||
# Let's remove the output file
|
||||
os.unlink(output_file)
|
||||
|
||||
# Not appending data
|
||||
self.run_script(
|
||||
'salt-call',
|
||||
'-c {0} --output-file={1} -g'.format(
|
||||
self.get_config_dir(),
|
||||
output_file
|
||||
),
|
||||
catch_stderr=True,
|
||||
with_retcode=True
|
||||
)
|
||||
stat3 = os.stat(output_file)
|
||||
# Mode must have changed since we're creating a new log file
|
||||
self.assertNotEqual(stat1.st_mode, stat3.st_mode)
|
||||
# Data was appended to file
|
||||
self.assertEqual(stat1.st_size, stat3.st_size)
|
||||
# self.assertEqual(stat1.st_ctime, stat2.st_ctime)
|
||||
finally:
|
||||
if os.path.exists(output_file):
|
||||
os.unlink(output_file)
|
||||
|
Loading…
Reference in New Issue
Block a user