diff --git a/salt/daemons/flo/master.py b/salt/daemons/flo/master.py index 02f57bc74e..1148461616 100644 --- a/salt/daemons/flo/master.py +++ b/salt/daemons/flo/master.py @@ -4,10 +4,12 @@ The behaviors to run the salt master via ioflo ''' # Import python libs -#import collections +from collections import deque # Import salt libs import salt.daemons.masterapi +from salt.transport.road.raet import stacking +from salt.transport.road.raet import yarding # Import ioflo libs import ioflo.base.deeding @@ -45,6 +47,88 @@ def fileserver_update(self): salt.daemons.masterapi.fileserver_update(self.opts.value) +class UXDRouter(ioflo.base.deeding.Deed): + ''' + Routes the communication in and out of uxd connections + ''' + Ioinits = {'opts': '.salt.etc.opts', + 'event_yards': '.salt.uxd.yards.event', + 'com_yards': '.salt.uxd.yards.com', + 'local_cmd': '.salt.uxd.local_cmd', + 'local_ret': '.salt.uxd.local_ret', + 'events': '.salt.uxd.events', + 'stack': '.salt.uxd.stack.stack'} + + def __init__(self): + ioflo.base.deeding.Deed.__init__(self) + + def postioinit(self): + ''' + Set up required objects + ''' + self.stack.value = stacking.StackUxd( + name='router', + lanename='com', + yid=0, + dirpath=self.opts.value['sock_dir']) + self.event_yards.value = set() + self.com_yards.value = set() + self.local_cmd.value = deque() + self.local_ret.value = deque() + self.events.value = deque() + + def _register_event_yard(self, msg): + ''' + register an incoming event request with the requesting yard id + ''' + try: + ev_yard = yarding.Yard( + yid=msg['load']['yid'], + prefix='com', + dirpath=msg['load']['dirpath']) + except Exception: + return + self.stack.value.addRemoteYard(ev_yard) + self.event_yards.value.add(ev_yard.name) + + def _fire_event(self, event): + ''' + Fire an event to all subscribed yards + ''' + for y_name in self.event_yards.value: + route = {'src': ('router', self.stack.value.yard.name, None), + 'dst': ('router', y_name, None)} + msg = {'route': route, 'event': event} + self.stack.value.transmit(msg) + + def _process_rxmsg(self, msg): + ''' + Send the message to the correct location + ''' + try: + if msg['route']['src'][0] == 'router' and msg['route']['src'][2] == 'local_cmd': + self.local_cmd.append(msg) + elif msg['route']['src'][0] == 'router' and msg['route']['src'][2] == 'event_req': + # Register the event interface + self._register_event_yard(msg) + except Exception: + return + + def action(self): + ''' + Process the messages! + ''' + self.stack.value.serviceAll() + # Process inboud communication stack + for msg in self.stack.value.rxMsgs: + self._process_msg(msg) + for event in self.events.value: + self._fire_event(event) + for ret in self.local_ret.value: + self.stack.value.transmit(ret) + self.stack.value.serviceAll() + + class RemoteMaster(ioflo.base.deeding.Deed): ''' Abstract access to the core salt master api @@ -83,7 +167,8 @@ class LocalMaster(ioflo.base.deeding.Deed): Abstract access to the core salt master api ''' Ioinits = {'opts': '.salt.etc.opts', - 'rxmsgs': '.raet.udp.stack'} + 'local_cmd': '.salt.uxd.local_cmd', + 'local_ret': '.salt.uxd.local_ret'} def __init__(self): ioflo.base.deeding.Deed.__init__(self) @@ -93,23 +178,22 @@ class LocalMaster(ioflo.base.deeding.Deed): Set up required objects ''' self.local = salt.daemons.masterapi.LocalFuncs(self.opts.value) - self.remote = salt.daemons.masterapi.RemoteFuncs(self.opts.value) def action(self): ''' Perform an action ''' - while self.rxmsgs.value: - exchange = self.rxmsgs.value.pop(0) - load = exchange.get('load') + for cmd in self.local_cmd.value: + ret = {} + load = cmd.get('load') # If the load is invalid, just ignore the request if not 'cmd' in load: - return False + return if load['cmd'].startswith('__'): - return False - if hasattr(self.remote, load['cmd']): - exchange['ret'] = getattr(self.remote, load['cmd'])(load) - elif hasattr(self.local, load['cmd']): - exchange['ret'] = getattr(self.local, load['cmd'])(load) + return + if hasattr(self.local, load['cmd']): + ret['return'] = getattr(self.local, load['cmd'])(load) + ret['route'] = {'src': ('router', self.stack.value.yard.name, None), + 'dst': cmd['route']('src')} - self.local_out.value.append(exchange) + self.local_ret.value.append(ret)