Merge pull request #14057 from SmithSamuelM/sam_raet_50

LaneStack rxMsgs queue now returns duple (msg, sender)
This commit is contained in:
Samuel Smith 2014-07-09 11:01:08 -06:00
commit 3cf0452dff
5 changed files with 17 additions and 13 deletions

View File

@ -73,7 +73,7 @@ class LocalClient(salt.client.LocalClient):
while True:
time.sleep(0.01)
stack.serviceAll()
for msg in stack.rxMsgs:
for msg, sender in stack.rxMsgs:
ret = msg.get('return', {})
if 'ret' in ret:
stack.server.close()

View File

@ -509,7 +509,7 @@ class LoadPillar(ioflo.base.deeding.Deed):
while True:
time.sleep(0.01)
if self.udp_stack.value.rxMsgs:
for msg, rnmid in self.udp_stack.value.rxMsgs:
for msg, sender in self.udp_stack.value.rxMsgs:
self.pillar.value = msg.get('return', {})
self.opts.value['pillar'] = self.pillar.value
return
@ -719,11 +719,11 @@ class Router(ioflo.base.deeding.Deed):
'uxd_stack': '.salt.uxd.stack.stack',
'udp_stack': '.raet.udp.stack.stack'}
def _process_udp_rxmsg(self, msg, rnmid):
def _process_udp_rxmsg(self, msg, sender):
'''
Send to the right queue
msg is the message body dict
rnmid is the unique name identifyer of the remote estate that sent the message
sender is the unique name of the remote estate that sent the message
'''
try:
d_estate = msg['route']['dst'][0]
@ -755,16 +755,19 @@ class Router(ioflo.base.deeding.Deed):
elif d_share == 'remote_cmd':
# Send it to a remote worker
if 'load' in msg:
msg['load']['id'] = rnmid
msg['load']['id'] = sender
self.uxd_stack.value.transmit(msg,
self.uxd_stack.value.uids.get(next(self.workers.value)))
elif d_share == 'fun':
self.fun.value.append(msg)
def _process_uxd_rxmsg(self, msg):
def _process_uxd_rxmsg(self, msg, sender):
'''
Send uxd messages tot he right queue or forward them to the correct
yard etc.
msg is message body dict
sender is unique name of remote that sent the message
'''
try:
d_estate = msg['route']['dst'][0]
@ -808,10 +811,11 @@ class Router(ioflo.base.deeding.Deed):
Process the messages!
'''
while self.udp_stack.value.rxMsgs:
msg, name = self.udp_stack.value.rxMsgs.popleft()
self._process_udp_rxmsg(msg=msg, rnmid=name)
msg, sender = self.udp_stack.value.rxMsgs.popleft()
self._process_udp_rxmsg(msg=msg, sender=sender)
while self.uxd_stack.value.rxMsgs:
self._process_uxd_rxmsg(self.uxd_stack.value.rxMsgs.popleft())
msg, sender = self.uxd_stack.value.rxMsgs.popleft()
self._process_uxd_rxmsg(msg=msg, sender=sender)
class Eventer(ioflo.base.deeding.Deed):

View File

@ -167,7 +167,7 @@ class WorkerRouter(ioflo.base.deeding.Deed):
'''
self.uxd_stack.value.serviceAll()
while self.uxd_stack.value.rxMsgs:
msg = self.uxd_stack.value.rxMsgs.popleft()
msg, sender = self.uxd_stack.value.rxMsgs.popleft()
if 'load' in msg:
cmd = msg['load'].get('cmd')
if not cmd:

View File

@ -106,7 +106,7 @@ class RAETChannel(Channel):
time.sleep(0.01)
self.stack.serviceAll()
if self.stack.rxMsgs:
for msg in self.stack.rxMsgs:
for msg, sender in self.stack.rxMsgs:
return msg.get('return', {})
if time.time() - start > timeout:
if tried >= tries:

View File

@ -115,7 +115,7 @@ class SaltEvent(object):
while True:
self.stack.serviceAll()
if self.stack.rxMsgs:
msg = self.stack.rxMsgs.popleft()
msg, sender = self.stack.rxMsgs.popleft()
event = msg.get('event', {})
if 'tag' not in event and 'data' not in event:
# Invalid event, how did this get here?
@ -138,7 +138,7 @@ class SaltEvent(object):
self.connect_pub()
self.stack.serviceAll()
if self.stack.rxMsgs:
event = self.stack.rxMsgs.popleft()
event, sender = self.stack.rxMsgs.popleft()
if 'tag' not in event and 'data' not in event:
# Invalid event, how did this get here?
return None