mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Added testing and fuller support for unix domain socket stack
This commit is contained in:
parent
c034dde502
commit
df45194b94
@ -260,14 +260,19 @@ class StackUxdRaet(deeding.Deed): # pylint: disable=W0232
|
||||
stack='stack',
|
||||
txmsgs=odict(ipath='txmsgs', ival=deque()),
|
||||
rxmsgs=odict(ipath='rxmsgs', ival=deque()),
|
||||
local=odict(ipath='name', ival=odict(name='minion', yid=0)),)
|
||||
local=odict(ipath='local', ival=odict(name='minion',
|
||||
yardname="",
|
||||
yid=0,
|
||||
ane="maple")),)
|
||||
|
||||
def postinitio(self):
|
||||
'''
|
||||
Setup stack instance
|
||||
'''
|
||||
name = self.local.data.name
|
||||
yardname = self.local.data.yardname
|
||||
yid = self.local.data.yid
|
||||
lane = self.local.data.lane
|
||||
txMsgs = self.txmsgs.value
|
||||
rxMsgs = self.rxmsgs.value
|
||||
|
||||
@ -275,6 +280,8 @@ class StackUxdRaet(deeding.Deed): # pylint: disable=W0232
|
||||
store=self.store,
|
||||
name=name,
|
||||
yid=yid,
|
||||
yardname=yardname,
|
||||
lanename=lane,
|
||||
txMsgs=txMsgs,
|
||||
rxMsgs=rxMsgs, )
|
||||
|
||||
@ -306,15 +313,57 @@ class AddYardStackUxdRaet(deeding.Deed): # pylint: disable=W0232
|
||||
Ioinits = odict(
|
||||
inode=".raet.uxd.stack.",
|
||||
stack='stack',
|
||||
yard='yard,')
|
||||
yard='yard',
|
||||
local=odict(ipath='local', ival=odict(name='serf', yid=0, lane="maple")),)
|
||||
|
||||
def action(self, yid=None, **kwa):
|
||||
def action(self, lane="lane", yid=None, **kwa):
|
||||
'''
|
||||
Adds new yard to stack with yid
|
||||
Adds new yard to stack on lane with yid
|
||||
'''
|
||||
stack = self.stack.value
|
||||
if stack and isinstance(stack, stacking.StackUxd):
|
||||
yard = yarding.Yard(stack=stack, yid=yid)
|
||||
yard = yarding.Yard(stack=stack, prefix=lane, yid=yid)
|
||||
stack.addRemoteYard(yard)
|
||||
self.yard.value = yard
|
||||
|
||||
class TransmitStackUxdRaet(deeding.Deed): # pylint: disable=W0232
|
||||
'''
|
||||
Puts message on txMsgs deque sent to ddid
|
||||
Message is composed fields that are parameters to action method
|
||||
and is sent to remote device ddid
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode=".raet.uxd.stack.",
|
||||
stack="stack",
|
||||
dest="dest",)
|
||||
|
||||
def action(self, **kwa):
|
||||
'''
|
||||
Queue up message
|
||||
'''
|
||||
if kwa:
|
||||
msg = odict(kwa)
|
||||
stack = self.stack.value
|
||||
if stack and isinstance(stack, stacking.StackUxd):
|
||||
name = self.dest.value #destination yard name
|
||||
stack.transmit(msg=msg, name=name)
|
||||
|
||||
|
||||
class PrinterStackUxdRaet(deeding.Deed): # pylint: disable=W0232
|
||||
'''
|
||||
Prints out messages on rxMsgs queue
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode=".raet.uxd.stack.",
|
||||
stack="stack",
|
||||
rxmsgs=odict(ipath='rxmsgs', ival=deque()),)
|
||||
|
||||
def action(self, **kwa):
|
||||
'''
|
||||
Queue up message
|
||||
'''
|
||||
rxMsgs = self.rxmsgs.value
|
||||
stack = self.stack.value
|
||||
while rxMsgs:
|
||||
msg = rxMsgs.popleft()
|
||||
console.terse("\n{0} Received....\n{1}\n".format(stack.name, msg))
|
||||
|
@ -275,7 +275,7 @@ class StackUdp(object):
|
||||
Otherwise return None
|
||||
'''
|
||||
try:
|
||||
raw, ra, da = self.udpRxes.popleft()
|
||||
raw, sa, da = self.udpRxes.popleft()
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@ -294,7 +294,7 @@ class StackUdp(object):
|
||||
print emsg
|
||||
return None
|
||||
|
||||
sh, sp = ra
|
||||
sh, sp = sa
|
||||
dh, dp = da
|
||||
packet.data.update(sh=sh, sp=sp, dh=dh, dp=dp)
|
||||
|
||||
@ -441,11 +441,13 @@ class StackUxd(object):
|
||||
'''
|
||||
Count = 0
|
||||
PackKind = raeting.bodyKinds.json
|
||||
Accept = True # accept any uxd messages if True from yards not already in lanes
|
||||
|
||||
def __init__(self,
|
||||
name='',
|
||||
version=raeting.VERSION,
|
||||
store=None,
|
||||
lanename='lane',
|
||||
yard=None,
|
||||
yid=None,
|
||||
yardname='',
|
||||
@ -455,6 +457,7 @@ class StackUxd(object):
|
||||
uxdRxes = None,
|
||||
uxdTxes = None,
|
||||
lane=None,
|
||||
accept=None,
|
||||
):
|
||||
'''
|
||||
Setup StackUxd instance
|
||||
@ -462,21 +465,22 @@ class StackUxd(object):
|
||||
if not name:
|
||||
name = "stackUxd{0}".format(StackUxd.Count)
|
||||
StackUxd.Count += 1
|
||||
if " " in name:
|
||||
emsg = "Invalid Stack name '{0}'".format(name)
|
||||
raise raeting.StackError(emsg)
|
||||
self.name = name
|
||||
self.version = version
|
||||
self.store = store or storing.Store(stamp=0.0)
|
||||
self.yards = odict() # remote uxd yards attached to this stack by name
|
||||
self.names = odict() # remote uxd yard names by ha
|
||||
self.yard = yard or yarding.Yard(stack=self, name=yardname, yid=yid, ha=ha)
|
||||
self.yard = yard or yarding.Yard(stack=self,
|
||||
name=yardname,
|
||||
yid=yid,
|
||||
ha=ha,
|
||||
prefix=lanename)
|
||||
self.rxMsgs = rxMsgs if rxMsgs is not None else deque() # messages received
|
||||
self.txMsgs = txMsgs if txMsgs is not None else deque() # messages to transmit
|
||||
#(msg, ddid) ddid=0 is broadcast
|
||||
self.uxdRxes = uxdRxes if uxdRxes is not None else deque() # uxd packets received
|
||||
self.uxdTxes = uxdTxes if uxdTxes is not None else deque() # uxd packets to transmit
|
||||
self.lane = lane # or keeping.LaneKeep()
|
||||
self.accept = self.Accept if accept is None else accept #accept uxd msg if not in lane
|
||||
self.serverUxd = aiding.SocketUxdNb(ha=self.yard.ha, bufsize=raeting.MAX_MESSAGE_SIZE)
|
||||
self.serverUxd.reopen() # open socket
|
||||
self.yard.ha = self.serverUxd.ha # update device host address after open
|
||||
@ -674,17 +678,24 @@ class StackUxd(object):
|
||||
Otherwise return None
|
||||
'''
|
||||
try:
|
||||
raw, ra, da = self.uxdRxes.popleft()
|
||||
raw, sa, da = self.uxdRxes.popleft()
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
console.verbose("{0} received raw message \n{1}\n".format(self.name, raw))
|
||||
|
||||
if ra not in self.names:
|
||||
emsg = "Invalid destination ha = {0}. Dropping packet.".format(ra)
|
||||
if sa not in self.names:
|
||||
if not self.accept:
|
||||
emsg = "Unaccepted source ha = {0}. Dropping packet.".format(sa)
|
||||
print emsg
|
||||
return None
|
||||
|
||||
name = yarding.Yard.nameFromHa(sa)
|
||||
yard = yarding.Yard(stack=self,
|
||||
name=name,
|
||||
ha=sa)
|
||||
self.addRemoteYard(yard)
|
||||
|
||||
return self.parseUxdRx(raw) # deserialize
|
||||
|
||||
def parseUxdRx(self, packed):
|
||||
|
@ -340,7 +340,7 @@ def testStackUxd():
|
||||
print msg
|
||||
print
|
||||
|
||||
print "\n********* Message Transactions Both Ways **********"
|
||||
print "\n********* Multiple Messages Both Ways **********"
|
||||
|
||||
stack1.transmit(odict(house="Mama mia1", queue="fix me"), None)
|
||||
stack1.transmit(odict(house="Mama mia2", queue="help me"), None)
|
||||
@ -397,7 +397,84 @@ def testStackUxd():
|
||||
print msg
|
||||
print
|
||||
|
||||
estate = 'minion1'
|
||||
#lord stack yard0
|
||||
stack0 = stacking.StackUxd(name='lord', lanename='cherry', yid=0)
|
||||
|
||||
#serf stack yard1
|
||||
stack1 = stacking.StackUxd(name='serf', lanename='cherry', yid=1)
|
||||
|
||||
print "\n********* Attempt Auto Accept ************"
|
||||
#stack0.addRemoteYard(stack1.yard)
|
||||
yard = yarding.Yard( name=stack0.yard.name, prefix='cherry')
|
||||
stack1.addRemoteYard(yard)
|
||||
|
||||
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
|
||||
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
|
||||
print "{0} names=\n{1}".format(stack0.name, stack0.names)
|
||||
|
||||
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
|
||||
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
|
||||
print "{0} names=\n{1}".format(stack1.name, stack1.names)
|
||||
|
||||
print "\n********* UXD Message serf to lord **********"
|
||||
src = (estate, stack1.yard.name, None)
|
||||
dst = (estate, stack0.yard.name, None)
|
||||
route = odict(src=src, dst=dst)
|
||||
msg = odict(route=route, stuff="Serf to my lord. Feed me!")
|
||||
stack1.transmit(msg=msg)
|
||||
|
||||
timer = Timer(duration=0.5)
|
||||
timer.restart()
|
||||
while not timer.expired:
|
||||
stack0.serviceAll()
|
||||
stack1.serviceAll()
|
||||
|
||||
|
||||
print "{0} Received Messages".format(stack0.name)
|
||||
for msg in stack0.rxMsgs:
|
||||
print msg
|
||||
print
|
||||
|
||||
print "{0} Received Messages".format(stack1.name)
|
||||
for msg in stack1.rxMsgs:
|
||||
print msg
|
||||
print
|
||||
|
||||
print "\n********* UXD Message lord to serf **********"
|
||||
src = (estate, stack0.yard.name, None)
|
||||
dst = (estate, stack1.yard.name, None)
|
||||
route = odict(src=src, dst=dst)
|
||||
msg = odict(route=route, stuff="Lord to serf. Feed yourself!")
|
||||
stack0.transmit(msg=msg)
|
||||
|
||||
|
||||
timer = Timer(duration=0.5)
|
||||
timer.restart()
|
||||
while not timer.expired:
|
||||
stack0.serviceAll()
|
||||
stack1.serviceAll()
|
||||
|
||||
print "{0} Received Messages".format(stack0.name)
|
||||
for msg in stack0.rxMsgs:
|
||||
print msg
|
||||
print
|
||||
|
||||
print "{0} Received Messages".format(stack1.name)
|
||||
for msg in stack1.rxMsgs:
|
||||
print msg
|
||||
print
|
||||
|
||||
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
|
||||
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
|
||||
print "{0} names=\n{1}".format(stack0.name, stack0.names)
|
||||
|
||||
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
|
||||
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
|
||||
print "{0} names=\n{1}".format(stack1.name, stack1.names)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
testStackUdp()
|
||||
#testStackUdp()
|
||||
testStackUxd()
|
||||
|
@ -20,6 +20,8 @@ console = getConsole()
|
||||
|
||||
YARD_UXD_DIR = os.path.join('/tmp', 'raet')
|
||||
|
||||
|
||||
|
||||
class Yard(object):
|
||||
'''
|
||||
RAET protocol Yard
|
||||
@ -29,10 +31,10 @@ class Yard(object):
|
||||
def __init__(self,
|
||||
stack=None,
|
||||
yid=None,
|
||||
name="",
|
||||
ha="",
|
||||
name='',
|
||||
ha='',
|
||||
dirpath=YARD_UXD_DIR,
|
||||
prefix='yard'):
|
||||
prefix='lane'):
|
||||
'''
|
||||
Initialize instance
|
||||
'''
|
||||
@ -42,17 +44,43 @@ class Yard(object):
|
||||
Yard.Yid += 1
|
||||
|
||||
self.yid = yid # yard ID
|
||||
self.name = name or "{0}{1}".format(prefix, self.yid)
|
||||
self.name = name or "yard{0}".format(self.yid)
|
||||
if " " in self.name:
|
||||
emsg = "Invalid Yard name '{0}'".format(self.name)
|
||||
raise raeting.YardError(emsg)
|
||||
|
||||
if self.stack:
|
||||
stackname = self.stack.name
|
||||
else:
|
||||
stackname = stack
|
||||
self.dirpath = dirpath
|
||||
if " " in prefix:
|
||||
emsg = "Invalid prefix '{0}'".format(prefix)
|
||||
raise raeting.YardError(emsg)
|
||||
self.prefix = prefix
|
||||
|
||||
self.ha = ha or os.path.join(dirpath, "{0}.uxd.{1}".format(
|
||||
stackname, self.name))
|
||||
if ha and Yard.nameFromHa(ha) != self.name:
|
||||
emsg = "Incompatible Yard name '{0}' and ha '{1}'".format(self.name, ha)
|
||||
raise raeting.YardError(emsg)
|
||||
|
||||
self.ha = ha or os.path.join(dirpath, "{0}.{1}.uxd".format(prefix, self.name))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def nameFromHa(ha):
|
||||
'''
|
||||
Extract and return the yard name from yard host address ha
|
||||
'''
|
||||
head, tail = os.path.split(ha)
|
||||
if not tail:
|
||||
emsg = "Invalid format for ha '{0}'. No file".format(ha)
|
||||
raise raeting.YardError(emsg)
|
||||
|
||||
root, ext = os.path.splitext(tail)
|
||||
|
||||
if ext != ".uxd":
|
||||
emsg = "Invalid format for ha '{0}'. Ext not 'uxd'".format(ha)
|
||||
raise raeting.YardError(emsg)
|
||||
|
||||
lane, sep, name = root.rpartition('.')
|
||||
if not sep:
|
||||
emsg = "Invalid format for ha '{0}'. Not lane.name".format(ha)
|
||||
raise raeting.YardError(emsg)
|
||||
|
||||
return name
|
||||
|
@ -3,7 +3,7 @@
|
||||
house master
|
||||
|
||||
init .raet.udp.stack.local to did 1 name "master" host "" port 7530
|
||||
init .raet.uxd.stack.local to name "boss" yid 0
|
||||
init .raet.uxd.stack.local to name "boss" lane "ash" yid 0
|
||||
|
||||
framer masterudpstack be active first start
|
||||
frame start
|
||||
@ -11,18 +11,6 @@ framer masterudpstack be active first start
|
||||
exit
|
||||
do raet udp stack closer per inode ".raet.udp.stack."
|
||||
|
||||
framer bossuxdstack be active first start
|
||||
frame start
|
||||
do raet uxd stack per inode ".raet.uxd.stack"
|
||||
exit
|
||||
do raet uxd stack closer per inode ".raet.uxd.stack."
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to yid 1
|
||||
bid stop me
|
||||
|
||||
framer receiver be active first start
|
||||
frame start
|
||||
do raet udp stack printer per inode ".raet.udp.stack."
|
||||
@ -31,9 +19,37 @@ framer receiver be active first start
|
||||
frame abort
|
||||
bid stop all
|
||||
|
||||
framer bossuxdstack be active first start
|
||||
frame start
|
||||
do raet uxd stack per inode ".raet.uxd.stack"
|
||||
exit
|
||||
do raet uxd stack closer per inode ".raet.uxd.stack."
|
||||
|
||||
framer uxdreceiver be active first start
|
||||
frame start
|
||||
do raet uxd stack printer per inode ".raet.udp.stack."
|
||||
timeout 5
|
||||
|
||||
frame abort
|
||||
bid stop me
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to lane "ash" yid 1
|
||||
|
||||
frame send
|
||||
enter
|
||||
do raet uxd stack transmit to stuff "Lord Hello" \
|
||||
per inode ".raet.uxd.stack."
|
||||
|
||||
frame stop
|
||||
bid stop me
|
||||
|
||||
|
||||
house serf
|
||||
|
||||
init .raet.uxd.stack.local to name "serf" yid 1
|
||||
init .raet.uxd.stack.local to name "serf" lane "ash" yid 1
|
||||
|
||||
framer serfuxdstack be active first start
|
||||
frame start
|
||||
@ -41,19 +57,29 @@ framer serfuxdstack be active first start
|
||||
exit
|
||||
do raet uxd stack closer per inode ".raet.uxd.stack."
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
framer uxdreceiver be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to yid 0
|
||||
bid stop me
|
||||
|
||||
framer receiver be active first start
|
||||
frame start
|
||||
#do raet udp stack printer per inode ".raet.udp.stack."
|
||||
do raet uxd stack printer per inode ".raet.udp.stack."
|
||||
timeout 5
|
||||
|
||||
frame abort
|
||||
bid stop all
|
||||
bid stop me
|
||||
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to lane "ash" yid 0
|
||||
timeout 0.5
|
||||
|
||||
frame send
|
||||
enter
|
||||
do raet uxd stack transmit to stuff "Serf Hello" \
|
||||
per inode ".raet.uxd.stack."
|
||||
|
||||
frame stop
|
||||
bid stop me
|
||||
|
||||
|
||||
house minion1
|
||||
|
||||
|
65
salt/transport/road/route1.flo
Normal file
65
salt/transport/road/route1.flo
Normal file
@ -0,0 +1,65 @@
|
||||
# Raet Test FloScript
|
||||
|
||||
house master
|
||||
|
||||
init .raet.uxd.stack.local to name "lord" lane "ash" yid 0
|
||||
init .raet.uxd.stack.dest to "yard1"
|
||||
|
||||
framer bossuxdstack be active first start
|
||||
frame start
|
||||
do raet uxd stack per inode ".raet.uxd.stack"
|
||||
exit
|
||||
do raet uxd stack closer per inode ".raet.uxd.stack."
|
||||
timeout 5
|
||||
|
||||
frame abort
|
||||
bid stop all
|
||||
|
||||
framer uxdreceiver be active first start
|
||||
frame start
|
||||
do raet uxd stack printer per inode ".raet.uxd.stack."
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to lane "ash" yid 1
|
||||
timeout 1
|
||||
|
||||
frame send
|
||||
enter
|
||||
do raet uxd stack transmit to stuff "Lord Hello" \
|
||||
per inode ".raet.uxd.stack."
|
||||
|
||||
|
||||
house serf
|
||||
|
||||
init .raet.uxd.stack.local to name "serf" lane "ash" yid 1
|
||||
init .raet.uxd.stack.dest to "yard0"
|
||||
|
||||
framer serfuxdstack be active first start
|
||||
frame start
|
||||
do raet uxd stack per inode ".raet.uxd.stack"
|
||||
exit
|
||||
do raet uxd stack closer per inode ".raet.uxd.stack."
|
||||
timeout 5
|
||||
|
||||
frame abort
|
||||
bid stop all
|
||||
|
||||
framer uxdreceiver be active first start
|
||||
frame start
|
||||
do raet uxd stack printer per inode ".raet.uxd.stack."
|
||||
|
||||
|
||||
framer setupuxdyard be active first start
|
||||
frame start
|
||||
enter
|
||||
do raet uxd stack yard add to lane "ash" yid 0
|
||||
timeout 1
|
||||
|
||||
frame send
|
||||
enter
|
||||
do raet uxd stack transmit to stuff "Serf Hello" \
|
||||
per inode ".raet.uxd.stack."
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user