diff --git a/salt/daemons/flo/master.py b/salt/daemons/flo/master.py index 8400698d57..ef63ac999a 100644 --- a/salt/daemons/flo/master.py +++ b/salt/daemons/flo/master.py @@ -57,7 +57,9 @@ class RouterMaster(ioflo.base.deeding.Deed): # pylint: disable=W0232 'local_cmd': '.salt.uxd.local_cmd', 'local_ret': '.salt.uxd.local_ret', 'events': '.salt.uxd.events', - 'stack': '.salt.uxd.stack.stack'} + 'publish': '.salt.net.publish', + 'stack': '.salt.uxd.stack.stack', + 'udp_stack': '.raet.udp.stack.stack'} def postinitio(self): ''' @@ -73,6 +75,8 @@ class RouterMaster(ioflo.base.deeding.Deed): # pylint: disable=W0232 self.local_cmd.value = deque() self.local_ret.value = deque() self.events.value = deque() + if not self.publish.value: + self.publish.value = deque() def _register_event_yard(self, msg): ''' @@ -112,6 +116,18 @@ class RouterMaster(ioflo.base.deeding.Deed): # pylint: disable=W0232 except Exception: return + def _publish(self, pub_msg): + ''' + Publish the message out to the targetted minions + ''' + import pprint + pprint.pprint(self.udp_stack.value.eids) + pprint.pprint(pub_msg) + for minion in self.udp_stack.value.eids: + eid = self.udp_stack.value.eids.get(minion) + if eid: + self.udp_stack.value.message(pub_msg['pub'], eid) + def action(self): ''' Process the messages! @@ -125,6 +141,9 @@ class RouterMaster(ioflo.base.deeding.Deed): # pylint: disable=W0232 while self.local_ret.value: msg = self.local_ret.value.popleft() self.stack.value.transmit(msg, msg['route']['dst'][1]) + while self.publish.value: + pub_msg = self.publish.value.popleft() + self._publish(pub_msg) self.stack.value.serviceAll() @@ -165,6 +184,7 @@ class LocalCmd(ioflo.base.deeding.Deed): # pylint: disable=W0232 Ioinits = {'opts': '.salt.opts', 'local_cmd': '.salt.uxd.local_cmd', 'local_ret': '.salt.uxd.local_ret', + 'publish': '.salt.net.publish', 'stack': '.salt.uxd.stack.stack'} def postinitio(self): @@ -173,6 +193,8 @@ class LocalCmd(ioflo.base.deeding.Deed): # pylint: disable=W0232 ''' self.access_keys = salt.daemons.masterapi.access_keys(self.opts.value) self.local = salt.daemons.masterapi.LocalFuncs(self.opts.value, self.access_keys) + if not self.publish.value: + self.publish.value = deque() def action(self): ''' @@ -191,5 +213,6 @@ class LocalCmd(ioflo.base.deeding.Deed): # pylint: disable=W0232 ret['return'] = getattr(self.local, load['cmd'])(load) ret['route'] = {'src': ('router', self.stack.value.yard.name, None), 'dst': cmd['route']['src']} - + if load['cmd'] == 'publish': + self.publish.value.append(ret['return']) self.local_ret.value.append(ret)