Initial alpha of the UXDRouter on the master

This commit is contained in:
Thomas S Hatch 2014-03-03 13:23:28 -07:00
parent 909286ed0e
commit fa593c9284

View File

@ -4,10 +4,12 @@ The behaviors to run the salt master via ioflo
'''
# Import python libs
#import collections
from collections import deque
# Import salt libs
import salt.daemons.masterapi
from salt.transport.road.raet import stacking
from salt.transport.road.raet import yarding
# Import ioflo libs
import ioflo.base.deeding
@ -45,6 +47,88 @@ def fileserver_update(self):
salt.daemons.masterapi.fileserver_update(self.opts.value)
class UXDRouter(ioflo.base.deeding.Deed):
'''
Routes the communication in and out of uxd connections
'''
Ioinits = {'opts': '.salt.etc.opts',
'event_yards': '.salt.uxd.yards.event',
'com_yards': '.salt.uxd.yards.com',
'local_cmd': '.salt.uxd.local_cmd',
'local_ret': '.salt.uxd.local_ret',
'events': '.salt.uxd.events',
'stack': '.salt.uxd.stack.stack'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
def postioinit(self):
'''
Set up required objects
'''
self.stack.value = stacking.StackUxd(
name='router',
lanename='com',
yid=0,
dirpath=self.opts.value['sock_dir'])
self.event_yards.value = set()
self.com_yards.value = set()
self.local_cmd.value = deque()
self.local_ret.value = deque()
self.events.value = deque()
def _register_event_yard(self, msg):
'''
register an incoming event request with the requesting yard id
'''
try:
ev_yard = yarding.Yard(
yid=msg['load']['yid'],
prefix='com',
dirpath=msg['load']['dirpath'])
except Exception:
return
self.stack.value.addRemoteYard(ev_yard)
self.event_yards.value.add(ev_yard.name)
def _fire_event(self, event):
'''
Fire an event to all subscribed yards
'''
for y_name in self.event_yards.value:
route = {'src': ('router', self.stack.value.yard.name, None),
'dst': ('router', y_name, None)}
msg = {'route': route, 'event': event}
self.stack.value.transmit(msg)
def _process_rxmsg(self, msg):
'''
Send the message to the correct location
'''
try:
if msg['route']['src'][0] == 'router' and msg['route']['src'][2] == 'local_cmd':
self.local_cmd.append(msg)
elif msg['route']['src'][0] == 'router' and msg['route']['src'][2] == 'event_req':
# Register the event interface
self._register_event_yard(msg)
except Exception:
return
def action(self):
'''
Process the messages!
'''
self.stack.value.serviceAll()
# Process inboud communication stack
for msg in self.stack.value.rxMsgs:
self._process_msg(msg)
for event in self.events.value:
self._fire_event(event)
for ret in self.local_ret.value:
self.stack.value.transmit(ret)
self.stack.value.serviceAll()
class RemoteMaster(ioflo.base.deeding.Deed):
'''
Abstract access to the core salt master api
@ -83,7 +167,8 @@ class LocalMaster(ioflo.base.deeding.Deed):
Abstract access to the core salt master api
'''
Ioinits = {'opts': '.salt.etc.opts',
'rxmsgs': '.raet.udp.stack'}
'local_cmd': '.salt.uxd.local_cmd',
'local_ret': '.salt.uxd.local_ret'}
def __init__(self):
ioflo.base.deeding.Deed.__init__(self)
@ -93,23 +178,22 @@ class LocalMaster(ioflo.base.deeding.Deed):
Set up required objects
'''
self.local = salt.daemons.masterapi.LocalFuncs(self.opts.value)
self.remote = salt.daemons.masterapi.RemoteFuncs(self.opts.value)
def action(self):
'''
Perform an action
'''
while self.rxmsgs.value:
exchange = self.rxmsgs.value.pop(0)
load = exchange.get('load')
for cmd in self.local_cmd.value:
ret = {}
load = cmd.get('load')
# If the load is invalid, just ignore the request
if not 'cmd' in load:
return False
return
if load['cmd'].startswith('__'):
return False
if hasattr(self.remote, load['cmd']):
exchange['ret'] = getattr(self.remote, load['cmd'])(load)
elif hasattr(self.local, load['cmd']):
exchange['ret'] = getattr(self.local, load['cmd'])(load)
return
if hasattr(self.local, load['cmd']):
ret['return'] = getattr(self.local, load['cmd'])(load)
ret['route'] = {'src': ('router', self.stack.value.yard.name, None),
'dst': cmd['route']('src')}
self.local_out.value.append(exchange)
self.local_ret.value.append(ret)