update minion to connect to master

The RAET minion now connects to the master and waits for data
This commit is contained in:
Thomas S Hatch 2014-03-05 15:22:10 -07:00
parent d06667cb6d
commit aadb4a8d84

View File

@ -21,6 +21,8 @@ import salt.utils.schedule
from salt.exceptions import (
CommandExecutionError, CommandNotFoundError, SaltInvocationError)
from salt.transport.road.raet import yarding
from salt.transport.road.raet import stacking
# Import ioflo libs
import ioflo.base.deeding
@ -41,25 +43,24 @@ except ImportError:
log = logging.getLogger(__name__)
class Router(ioflo.base.deeding.Deed):
class RouterMinion(ioflo.base.deeding.Deed): # pylint: disable=W0232
'''
Route packaets from raet into minion proessing bins
'''
Ioinits = {'opts_store': '.salt.opts',
Ioinits = {'opts': '.salt.opts',
'udp_stack': '.raet.udp.stack.stack',
'uxd_stack': '.raet.uxd.stack.stack',
'uxd_stack': '.salt.uxd.stack.stack',
'fun_in': '.salt.net.fun_in',
}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postinitio(self):
'''
Map opts for convenience
'''
# TODO: Setup RAET and UXD
self.opts = self.opts_store.value
self.uxd_stack.value = stacking.StackUxd(
lanename=self.opts.value['id'],
yid=0,
dirpath=self.opts.value['sock_dir'])
def action(self):
'''
@ -67,31 +68,25 @@ class Router(ioflo.base.deeding.Deed):
'''
# Start on the udp_in:
# TODO: Route UXD messages
while True:
try:
data = self.udp_stack.rxMsgs.value.pop()
# Check if the PID is not the default of 0 and pass directly to
# the raet socket handler
if data['dest'][1]:
self.raet_sock_out.value.append(data)
if data['dest'][3] == 'fun':
self.fun_in.value.append(data)
except IndexError:
break
while self.udp_stack.value.rxMsgs:
print 'process udp'
data = self.udp_stack.rxMsgs.value.pop()
# Check if the PID is not the default of 0 and pass directly to
# the raet socket handler
if data['dst'][2] == 'fun':
self.fun_in.value.append(data)
class PillarLoad(ioflo.base.deeding.Deed):
class PillarLoad(ioflo.base.deeding.Deed): # pylint: disable=W0232
'''
Load up the pillar in the data store
'''
Ioinits = {'opts_store': '.salt.opts',
'grains': '.salt.loader.grains'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
class ModulesLoad(ioflo.base.deeding.Deed):
class ModulesLoad(ioflo.base.deeding.Deed): # pylint: disable=W0232
'''
Reload the minion modules
'''
@ -100,9 +95,6 @@ class ModulesLoad(ioflo.base.deeding.Deed):
'modules': '.salt.loader.modules',
'returners': '.salt.loader.returners'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postinitio(self):
'''
Map opts for convenience
@ -144,7 +136,7 @@ class ModulesLoad(ioflo.base.deeding.Deed):
resource.setrlimit(resource.RLIMIT_AS, old_mem_limit)
class Schedule(ioflo.base.deeding.Deed):
class Schedule(ioflo.base.deeding.Deed): # pylint: disable=W0232
'''
Evaluates the scedule
'''
@ -154,9 +146,6 @@ class Schedule(ioflo.base.deeding.Deed):
'returners': '.salt.loader.returners',
'master_ret': '.salt.net.master_out'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postinitio(self):
'''
Map opts and make the scedule object
@ -173,140 +162,7 @@ class Schedule(ioflo.base.deeding.Deed):
self.scedule.eval()
class FunctionInline(ioflo.base.deeding.Deed):
'''
Execute a function without threading
'''
Ioinits = {'opts_store': '.salt.opts',
'grains': '.salt.grains',
'modules': '.salt.loader.modules',
'returners': '.salt.loader.returners',
'fun_ack': '.salt.net.fun_ack',
'fun_in': '.salt.net.fun_in',
'master_ret': '.salt.net.master_out'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postinitio(self):
'''
Map opts for convenience
'''
self.opts = self.opts_store.value
self.matcher = salt.minion.Matcher(
self.opts,
self.modules.value)
self.proc_dir = salt.minion.get_proc_dir(self.opts['cachedir'])
self.serial = salt.payload.Serial(self.opts)
def action(self):
'''
Pull the queue and exeute
'''
if not self.fun_in.value:
return
exchange = self.fun_in.value.pop()
data = exchange['load']
match = getattr(
self.matcher,
'{0}_match'.format(
data.get('tgt_type', 'glob')
)
)(data['tgt'])
if not match:
return
self.fun_ack.value.append(exchange)
if 'user' in data:
log.info(
'User {0[user]} Executing command {0[fun]} with jid '
'{0[jid]}'.format(data))
else:
log.info(
'Executing command {0[fun]} with jid {0[jid]}'.format(data)
)
log.debug('Command details {0}'.format(data))
ret = {'success': False}
function_name = data['fun']
if function_name in self.modules.value:
try:
func = self.modules.value[data['fun']]
args, kwargs = salt.minion.parse_args_and_kwargs(func, data['arg'], data)
sys.modules[func.__module__].__context__['retcode'] = 0
return_data = func(*args, **kwargs)
if isinstance(return_data, types.GeneratorType):
ind = 0
iret = {}
for single in return_data:
if isinstance(single, dict) and isinstance(iret, list):
iret.update(single)
else:
if not iret:
iret = []
iret.append(single)
tag = salt.utils.event.tagify(
[data['jid'], 'prog', self.opts['id'], str(ind)],
'job')
event_data = {'return': single}
self._fire_master(event_data, tag) # Need to look into this
ind += 1
ret['return'] = iret
else:
ret['return'] = return_data
ret['retcode'] = sys.modules[func.__module__].__context__.get(
'retcode',
0
)
ret['success'] = True
except CommandNotFoundError as exc:
msg = 'Command required for {0!r} not found'.format(
function_name
)
log.debug(msg, exc_info=True)
ret['return'] = '{0}: {1}'.format(msg, exc)
except CommandExecutionError as exc:
log.error(
'A command in {0!r} had a problem: {1}'.format(
function_name,
exc
),
exc_info=log.isEnabledFor(logging.DEBUG)
)
ret['return'] = 'ERROR: {0}'.format(exc)
except SaltInvocationError as exc:
log.error(
'Problem executing {0!r}: {1}'.format(
function_name,
exc
),
exc_info=log.isEnabledFor(logging.DEBUG)
)
ret['return'] = 'ERROR executing {0!r}: {1}'.format(
function_name, exc
)
except TypeError as exc:
aspec = salt.utils.get_function_argspec(
self.modules.value[data['fun']]
)
msg = ('TypeError encountered executing {0}: {1}. See '
'debug log for more info. Possibly a missing '
'arguments issue: {2}').format(function_name,
exc,
aspec)
log.warning(msg, exc_info=log.isEnabledFor(logging.DEBUG))
ret['return'] = msg
except Exception:
msg = 'The minion function caused an exception'
log.warning(msg, exc_info=log.isEnabledFor(logging.DEBUG))
ret['return'] = '{0}: {1}'.format(msg, traceback.format_exc())
else:
ret['return'] = '{0!r} is not available.'.format(function_name)
ret['jid'] = data['jid']
ret['fun'] = data['fun']
ret['fun_args'] = data['arg']
class FunctionNix(ioflo.base.deeding.Deed):
class FunctionNix(ioflo.base.deeding.Deed): # pylint: disable=W0232
'''
Execute a function call
'''
@ -320,9 +176,6 @@ class FunctionNix(ioflo.base.deeding.Deed):
'uxd_stack': '.raet.uxd.stack.stack',
'executors': '.salt.track.executors'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postinitio(self):
'''
Map opts for convenience