Merge pull request #11295 from SmithSamuelM/sam_raet_25

Sam raet 25 Refactored the service methods of the Udp Uxd stacks.
This commit is contained in:
Samuel Smith 2014-03-17 12:47:34 -06:00
commit 5eb6396ad4
3 changed files with 177 additions and 139 deletions

View File

@ -239,7 +239,7 @@ class MessengerStackUdpRaet(deeding.Deed): # pylint: disable=W0232
stack = self.stack.value
if stack and isinstance(stack, stacking.StackUdp):
deid = self.destination.value
stack.txMsg(msg=msg, deid=deid)
stack.transmit(msg=msg, deid=deid)
class PrinterStackUdpRaet(deeding.Deed): # pylint: disable=W0232

View File

@ -357,9 +357,9 @@ class StackUdp(object):
'''
self.stats[key] = value
def serviceUdp(self):
def serviceUdpRx(self):
'''
Service the UDP receive and transmit queues
Service the UDP receive and fill the rxes deque
'''
if self.server:
while True:
@ -369,11 +369,69 @@ class StackUdp(object):
# triple = ( packet, source address, destination address)
self.rxes.append((rx, ra, self.server.ha))
return None
def serviceRxes(self):
'''
Process all messages in .rxes deque
'''
while self.rxes:
self.processUdpRx()
def serviceTxMsgs(self):
'''
Service .txMsgs queue of outgoing messages and start message transactions
'''
while self.txMsgs:
body, deid = self.txMsgs.popleft() # duple (body dict, destination eid)
self.message(body, deid)
console.verbose("{0} sending\n{1}\n".format(self.name, body))
def serviceTxes(self):
'''
Service the .txes deque to send Udp messages
'''
if self.server:
laters = deque()
while self.txes:
tx, ta = self.txes.popleft() # duple = (packet, destination address)
self.server.send(tx, ta)
try:
self.server.send(tx, ta)
except socket.error as ex:
if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
#busy with last message save it for later
laters.append((tx, ta))
else:
console.terse("socket.error = {0}\n".format(ex))
raise
while laters:
self.txes.append(laters.popleft())
return None
def serviceUdp(self):
'''
Service the UDP receive and transmit queues
'''
self.serviceUdpRx()
self.serviceTxes()
def serviceRx(self):
'''
Service:
UDP Socket receive
rxes queue
process
'''
self.serviceUdpRx()
self.serviceRxes()
self.process()
def serviceTx(self):
'''
Service:
txMsgs queue
txes queue and UDP Socket send
'''
self.serviceTxMsgs()
self.serviceTxes()
def serviceAll(self):
'''
@ -382,42 +440,16 @@ class StackUdp(object):
rxes queue
process
txMsgs queue
txes queue
UDP Socket send
txes queue and UDP Socket send
'''
if self.server:
while True:
rx, ra = self.server.receive() # if no data the duple is ('',None)
if not rx: # no received data so break
break
# triple = ( packet, source address, destination address)
self.rxes.append((rx, ra, self.server.ha))
self.serviceUdpRx()
self.serviceRxes()
self.process()
self.serviceUdpRx()
self.serviceTxMsgs()
self.serviceTxes()
self.process()
self.serviceTxMsg()
while self.txes:
tx, ta = self.txes.popleft() # duple = (packet, destination address)
self.server.send(tx, ta)
return None
def txUdp(self, packed, deid):
'''
Queue duple of (packed, da) on stack transmit queue
Where da is the ip destination (host,port) address associated with
the estate with deid
'''
if deid not in self.estates:
msg = "Invalid destination estate id '{0}'".format(deid)
raise raeting.StackError(msg)
self.txes.append((packed, self.estates[deid].ha))
def txMsg(self, msg, deid=None):
def transmit(self, msg, deid=None):
'''
Append duple (msg,deid) to .txMsgs deque
If msg is not mapping then raises exception
@ -431,16 +463,44 @@ class StackUdp(object):
#raise raeting.StackError(emsg)
self.txMsgs.append((msg, deid))
transmit = txMsg
txMsg = transmit
def serviceTxMsg(self):
def txUdp(self, packed, deid):
'''
Service .udpTxMsgs queue of outgoint udp messages for message transactions
Queue duple of (packed, da) on stack transmit queue
Where da is the ip destination (host,port) address associated with
the estate with deid
'''
while self.txMsgs:
body, deid = self.txMsgs.popleft() # duple (body dict, destination eid)
self.message(body, deid)
console.verbose("{0} sending\n{1}\n".format(self.name, body))
if deid not in self.estates:
msg = "Invalid destination estate id '{0}'".format(deid)
raise raeting.StackError(msg)
self.txes.append((packed, self.estates[deid].ha))
def processUdpRx(self):
'''
Retrieve next packet from stack receive queue if any and parse
Process associated transaction or reply with new correspondent transaction
'''
packet = self.fetchParseUdpRx()
if not packet:
return
console.verbose("{0} received packet data\n{1}\n".format(self.name, packet.data))
console.verbose("{0} received packet index = '{1}'\n".format(self.name, packet.index))
trans = self.transactions.get(packet.index, None)
if trans:
trans.receive(packet)
return
if packet.data['cf']: #correspondent to stale transaction so drop
emsg = "{0} Stale Transaction, dropping ...".format(self.name)
console.terse(emsg + '\n')
self.incStat('stale_correspondent_attempt')
# Should send abort nack to drop transaction on other side
return
self.reply(packet)
def fetchParseUdpRx(self):
'''
@ -476,39 +536,6 @@ class StackUdp(object):
return packet # outer only has been parsed
def processUdpRx(self):
'''
Retrieve next packet from stack receive queue if any and parse
Process associated transaction or reply with new correspondent transaction
'''
packet = self.fetchParseUdpRx()
if not packet:
return
console.verbose("{0} received packet data\n{1}\n".format(self.name, packet.data))
console.verbose("{0} received packet index = '{1}'\n".format(self.name, packet.index))
trans = self.transactions.get(packet.index, None)
if trans:
trans.receive(packet)
return
if packet.data['cf']: #correspondent to stale transaction so drop
emsg = "{0} Stale Transaction, dropping ...".format(self.name)
console.terse(emsg + '\n')
self.incStat('stale_correspondent_attempt')
# Should send abort nack to drop transaction on other side
return
self.reply(packet)
def serviceUdpRx(self):
'''
Process all packets in .udpRxes deque
'''
while self.rxes:
self.processUdpRx()
def reply(self, packet):
'''
Reply to packet with corresponding transaction or action
@ -795,7 +822,7 @@ class StackUxd(object):
def serviceUxdRx(self):
'''
Service the Uxd recieves and fill the .rxes deque
Service the Uxd receive and fill the .rxes deque
'''
if self.server:
while True:
@ -807,10 +834,21 @@ class StackUxd(object):
def serviceRxes(self):
'''
Process all messages in .uxdRxes deque
Process all messages in .rxes deque
'''
while self.rxes:
self.processUdpRx()
self.processUxdRx()
def serviceTxMsgs(self):
'''
Service .txMsgs queue of outgoing messages
'''
while self.txMsgs:
body, name = self.txMsgs.popleft() # duple (body dict, destination name)
packed = self.packUxdTx(body)
if packed:
console.verbose("{0} sending\n{1}\n".format(self.name, body))
self.txUxd(packed, name)
def serviceTxes(self):
'''
@ -839,18 +877,6 @@ class StackUxd(object):
while laters:
self.txes.append(laters.popleft())
def serviceTxMsgs(self):
'''
Service .txMsgs queue of outgoing messages
'''
while self.txMsgs:
body, name = self.txMsgs.popleft() # duple (body dict, destination name)
packed = self.packUxdTx(body)
if packed:
console.verbose("{0} sending\n{1}\n".format(self.name, body))
self.txUxd(packed, name)
def serviceUxd(self):
'''
Service the UXD receive and transmit queues
@ -858,7 +884,23 @@ class StackUxd(object):
self.serviceUxdRx()
self.serviceTxes()
return None
def serviceRx(self):
'''
Service:
Uxd Socket receive
rxes queue
'''
self.serviceUxdRx()
self.serviceRxes()
def serviceTx(self):
'''
Service:
txMsgs deque
txes deque and send Uxd messages
'''
self.serviceTxMsgs()
self.serviceTxes()
def serviceAll(self):
'''
@ -866,20 +908,16 @@ class StackUxd(object):
Uxd Socket receive
rxes queue
txMsgs queue
txes queue
Uxd Socket send
txes queue and Uxd Socket send
'''
self.serviceUxdRx()
self.serviceRxes()
self.serviceTxMsgs()
self.serviceTxes()
return None
def txUxd(self, packed, name):
'''
Queue duple of (packed, da) on stack transmit queue
Queue duple of (packed, da) on stack .txes queue
Where da is the ip destination address associated with
the estate with name
If name is None then it will default to the first entry in .yards
@ -900,7 +938,7 @@ class StackUxd(object):
#raise raeting.StackError(msg)
self.txes.append((packed, self.yards[name].ha))
def txMsg(self, msg, name=None):
def transmit(self, msg, name=None):
'''
Append duple (msg, name) to .txMsgs deque
If msg is not mapping then raises exception
@ -914,7 +952,7 @@ class StackUxd(object):
#raise raeting.StackError(emsg)
self.txMsgs.append((msg, name))
transmit = txMsg # alias
txMsg = transmit # alias
def packUxdTx(self, body=None, name=None, kind=None):
'''
@ -954,6 +992,17 @@ class StackUxd(object):
return packed
def processUxdRx(self):
'''
Retrieve next message from stack receive queue if any and parse
'''
body = self.fetchParseUxdRx()
if not body:
return
console.verbose("{0} received message data\n{1}\n".format(self.name, body))
self.rxMsgs.append(body)
def fetchParseUxdRx(self):
'''
@ -1046,17 +1095,6 @@ class StackUxd(object):
return body
def processUdpRx(self):
'''
Retrieve next message from stack receive queue if any and parse
'''
body = self.fetchParseUxdRx()
if not body:
return
console.verbose("{0} received message data\n{1}\n".format(self.name, body))
self.rxMsgs.append(body)

View File

@ -72,7 +72,7 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack1.serviceUdp()
stack0.serviceUdp()
stack0.serviceUdpRx()
stack0.serviceRxes()
stack0.process()
timer.restart()
@ -80,7 +80,7 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack1.serviceRxes()
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
@ -106,28 +106,28 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack1.serviceUdp()
stack0.serviceUdp()
stack0.serviceUdpRx()
stack0.serviceRxes()
timer.restart()
while not timer.expired:
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack1.serviceRxes()
timer.restart()
while not timer.expired:
stack0.serviceUdp()
stack1.serviceUdp()
stack0.serviceUdpRx()
stack0.serviceRxes()
timer.restart()
while not timer.expired:
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack1.serviceRxes()
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
@ -153,14 +153,14 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack1.serviceUdp()
stack0.serviceUdp()
stack0.serviceUdpRx()
stack0.serviceRxes()
timer.restart()
while not timer.expired:
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack1.serviceRxes()
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
@ -183,14 +183,14 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack1.serviceRxes()
timer.restart()
while not timer.expired:
stack1.serviceUdp()
stack0.serviceUdp()
stack0.serviceUdpRx()
stack0.serviceRxes()
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
@ -225,24 +225,24 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stack1.txMsgs.append((odict(house="Mama mia1", queue="big stuff", stuff=stuff), None))
stack0.txMsgs.append((odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None))
stack1.serviceTxMsg()
stack0.serviceTxMsg()
stack1.serviceTxMsgs()
stack0.serviceTxMsgs()
timer.restart()
while not timer.expired:
stack1.serviceUdp()
stack0.serviceUdp()
stack0.serviceUdpRx()
stack1.serviceUdpRx()
stack0.serviceRxes()
stack1.serviceRxes()
timer.restart()
while not timer.expired:
stack0.serviceUdp()
stack1.serviceUdp()
stack1.serviceUdpRx()
stack0.serviceUdpRx()
stack1.serviceRxes()
stack0.serviceRxes()
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
@ -262,15 +262,15 @@ def testStackUdp(bk=raeting.bodyKinds.json):
print "\n********* Message Transactions Both Ways Again **********"
stack1.txMsg(odict(house="Oh Boy1", queue="Nice"))
stack1.txMsg(odict(house="Oh Boy2", queue="Mean"))
stack1.txMsg(odict(house="Oh Boy3", queue="Ugly"))
stack1.txMsg(odict(house="Oh Boy4", queue="Pretty"))
stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
stack1.transmit(odict(house="Oh Boy2", queue="Mean"))
stack1.transmit(odict(house="Oh Boy3", queue="Ugly"))
stack1.transmit(odict(house="Oh Boy4", queue="Pretty"))
stack0.txMsg(odict(house="Yeah Baby1", queue="Good"))
stack0.txMsg(odict(house="Yeah Baby2", queue="Bad"))
stack0.txMsg(odict(house="Yeah Baby3", queue="Fast"))
stack0.txMsg(odict(house="Yeah Baby4", queue="Slow"))
stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
stack0.transmit(odict(house="Yeah Baby2", queue="Bad"))
stack0.transmit(odict(house="Yeah Baby3", queue="Fast"))
stack0.transmit(odict(house="Yeah Baby4", queue="Slow"))
#segmented packets
stuff = []
@ -278,8 +278,8 @@ def testStackUdp(bk=raeting.bodyKinds.json):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.txMsg(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
stack0.txMsg(odict(house="Craps", queue="far stuff", stuff=stuff))
stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
timer.restart(duration=2)
while not timer.expired: