Merge pull request #11007 from SmithSamuelM/sam_raet_16

Sam raet 16  Support for msgpack raet bodies.  Preliminary support for persisting Raet estate and key data
This commit is contained in:
Samuel Smith 2014-03-06 10:20:12 -07:00
commit 589eba5488
12 changed files with 1217 additions and 342 deletions

View File

@ -56,7 +56,7 @@ class StackUdpRaet(deeding.Deed): # pylint: disable=W0232
stack='stack',
txmsgs=odict(ipath='txmsgs', ival=deque()),
rxmsgs=odict(ipath='rxmsgs', ival=deque()),
local=odict(ipath='local', ival=odict( name='minion',
local=odict(ipath='local', ival=odict( name='master',
eid=0,
host='0.0.0.0',
port=raeting.RAET_PORT,

View File

@ -95,7 +95,7 @@ class LocalEstate(Estate):
RAET protocol endpoint local estate object
Maintains signer for signing and privateer for encrypt/decript
'''
def __init__(self, sigkey=None, prikey=None, **kwa):
def __init__(self, main=False, sigkey=None, prikey=None, **kwa):
'''
Setup Estate instance
@ -103,6 +103,7 @@ class LocalEstate(Estate):
prikey is either nacl PrivateKey or hex encoded key
'''
super(LocalEstate, self).__init__(**kwa)
self.main = True if main else False # main estate for road
self.signer = nacling.Signer(sigkey)
self.priver = nacling.Privateer(prikey) # Long term key

View File

@ -32,16 +32,20 @@ class Keep(object):
'''
RAET protocol base class for estate data persistence
'''
def __init__(self, dirpath='', prefix='estate', ext='json', **kwa):
def __init__(self, dirpath='', stackname='stack', prefix='estate', ext='json', **kwa):
'''
Setup Keep instance
Create directories for saving associated estate data files
keep/
local/
remote/
stackname/
local/
prefix.uid.ext
remote/
prefix.uid.ext
prefix.uid.ext
'''
if not dirpath:
dirpath = "/tmp/raet/keep"
dirpath = os.path.join("/tmp/raet/keep", stackname)
self.dirpath = os.path.abspath(dirpath)
if not os.path.exists(self.dirpath):
os.makedirs(self.dirpath)
@ -102,7 +106,7 @@ class Keep(object):
return None
return (self.load(self.localfilepath))
def removeLocalData(self):
def clearLocalData(self):
'''
Load and Return the data from the local estate
'''
@ -135,7 +139,7 @@ class Keep(object):
return None
return (self.load(filepath))
def removeRemoteData(self, uid):
def clearRemoteData(self, uid):
'''
Load and Return the data from the remote estate file named with uid
'''
@ -160,7 +164,7 @@ class Keep(object):
data[uid] = self.load(filepath)
return data
def removeAllRemoteData(self):
def clearAllRemoteData(self):
'''
Remove all the remote estate files
'''
@ -176,10 +180,9 @@ class Keep(object):
if os.path.exists(filepath):
os.remove(filepath)
def dumpAllRemoteEstates(self, estates):
'''
Dump the data from the remote estate
Dump the data from all the remote estates
'''
for estate in estates:
self.dumpRemoteEstate(estate)
@ -215,17 +218,25 @@ class Keep(object):
uid = estate.eid
return (self.loadRemoteData(uid))
def removeRemoteEstate(self, estate):
def clearRemoteEstate(self, estate):
'''
Load and Return the data from the remote estate file
Override this in sub class to change uid
'''
uid = estate.eid
self.removeRemoteData(uid)
self.clearRemoteData(uid)
class RoadKeep(Keep):
'''
RAET protocol estate road (channel) data persistence
keep/
stackname/
local/
estate.eid.ext
remote/
estate.eid.ext
estate.eid.ext
'''
def __init__(self, prefix='estate', **kwa):
'''
@ -240,6 +251,7 @@ class RoadKeep(Keep):
data = odict([
('eid', estate.eid),
('name', estate.name),
('main', estate.main),
('host', estate.host),
('port', estate.port),
('sid', estate.sid)
@ -270,6 +282,18 @@ class SafeKeep(Keep):
def __init__(self, prefix='key', **kwa):
'''
Setup SafeKeep instance
keep/
local/
key.eid.ext
remote/
key.eid.ext
key.eid.ext
pended/
key.eid.ext
rejected/
key.eid.ext
'''
super(SafeKeep, self).__init__(prefix=prefix, **kwa)
@ -299,7 +323,7 @@ class SafeKeep(Keep):
'''
Dump the data from the remote estate
'''
uid = estate.name
uid = estate.eid
data = odict([
('eid', estate.eid),
('name', estate.name),
@ -323,11 +347,22 @@ class SafeKeep(Keep):
Override this in sub class to change uid
'''
uid = estate.name
self.removeRemoteData(uid)
self.clearRemoteData(uid)
def remoteAcceptStatus(self, estate):
def statusRemote(self, estate):
'''
Evaluate acceptance status of estate per its keys
persist key data differentially based on status
'''
return (raeting.acceptance.accepted)
def clearAllRoadSafe(dirpath):
'''
Convenience function to clear all road and safe keep data in dirpath
'''
road = RoadKeep(dirpath=dirpath)
road.clearLocalData()
road.clearAllRemoteData()
safe = SafeKeep(dirpath=dirpath)
safe.clearLocalData()
safe.clearAllRemoteData()

View File

@ -11,7 +11,11 @@ try:
import simplejson as json
except ImportError:
import json
import msgpack
try:
import msgpack
except ImportError:
mspack = None
# Import ioflo libs
from ioflo.base.odicting import odict
@ -193,9 +197,11 @@ class TxBody(Body):
self.packed = json.dumps(self.data, separators=(',', ':'))
elif bk == raeting.bodyKinds.msgpack:
if self.data:
if not msgpack:
emsg = "Msgpack not installed."
raise raeting.PacketError(emsg)
self.packed = msgpack.dumps(self.data)
if bk == raeting.bodyKinds.raw:
elif bk == raeting.bodyKinds.raw:
self.packed = self.data # data is already formatted string
class RxBody(Body):
@ -223,10 +229,18 @@ class RxBody(Body):
emsg = "Packet body not a mapping."
raise raeting.PacketError(emsg)
self.data = kit
if bk == raeting.bodyKinds.raw:
elif bk == raeting.bodyKinds.msgpack:
if self.packed:
if not msgpack:
emsg = "Msgpack not installed."
raise raeting.PacketError(emsg)
kit = msgpack.loads(self.packed, object_pairs_hook=odict)
if not isinstance(kit, Mapping):
emsg = "Packet body not a mapping."
raise raeting.PacketError(emsg)
self.data = kit
elif bk == raeting.bodyKinds.raw:
self.data = self.packed # return as string
elif bk == raeting.bodyKinds.nada:
pass
@ -452,16 +466,16 @@ class TxPacket(Packet):
data = self.data
le = data['se']
if le == 0:
host = data['sh']
if host == '0.0.0.0':
host = '127.0.0.1'
le = (host, data['sp'])
#host = data['sh']
#if host == '0.0.0.0':
#host = '127.0.0.1'
le = (data['sh'], data['sp'])
re = data['de']
if re == 0:
host = data['dh']
if host == '0.0.0.0':
host = '127.0.0.1'
re = (host, data['dp'])
#host = data['dh']
#if host == '0.0.0.0':
#host = '127.0.0.1'
re = (data['dh'], data['dp'])
return ((data['cf'], le, re, data['si'], data['ti'], data['bf']))
def signature(self, msg):
@ -564,16 +578,16 @@ class RxPacket(Packet):
data = self.data
le = data['de']
if le == 0:
host = data['dh']
if host == '0.0.0.0':
host = '127.0.0.1'
le = (host, data['dp'])
#host = data['dh']
#if host == '0.0.0.0':
#host = '127.0.0.1'
le = (data['dh'], data['dp'])
re = data['se']
if re == 0:
host = data['sh']
if host == '0.0.0.0':
host = '127.0.0.1'
re = (host, data['sp'])
#host = data['sh']
#if host == '0.0.0.0':
#host = '127.0.0.1'
re = (data['sh'], data['sp'])
return ((not data['cf'], le, re, data['si'], data['ti'], data['bf']))
def verify(self, signature, msg):

View File

@ -42,17 +42,19 @@ class StackUdp(object):
def __init__(self,
name='',
main=False,
version=raeting.VERSION,
store=None,
estate=None,
eid=None,
ha=("", raeting.RAET_PORT),
rxMsgs = None,
txMsgs = None,
udpRxes = None,
udpTxes = None,
road = None,
safe = None,
rxMsgs=None,
txMsgs=None,
udpRxes=None,
udpTxes=None,
road=None,
safe=None,
dirpath=None,
):
'''
Setup StackUdp instance
@ -65,22 +67,30 @@ class StackUdp(object):
self.store = store or storing.Store(stamp=0.0)
self.estates = odict() # remote estates attached to this stack by eid
self.eids = odict() # reverse lookup eid by estate.name
# local estate for this stack
self.estate = estate or estating.LocalEstate(stack=self, eid=eid, ha=ha)
self.transactions = odict() #transactions
self.rxMsgs = rxMsgs if rxMsgs is not None else deque() # messages received
self.txMsgs = txMsgs if txMsgs is not None else deque() # messages to transmit
#(msg, deid) deid=0 is broadcast
self.udpRxes = udpRxes if udpRxes is not None else deque() # udp packets received
self.udpTxes = udpTxes if udpTxes is not None else deque() # udp packet to transmit
self.road = road or keeping.RoadKeep()
self.safe = safe or keeping.SafeKeep()
self.road = road or keeping.RoadKeep(dirpath=dirpath, stackname=self.name)
self.safe = safe or keeping.SafeKeep(dirpath=dirpath, stackname=self.name)
kept = self.loadLocal() # local estate from saved data
# local estate for this stack
self.estate = kept or estate or estating.LocalEstate(stack=self,
eid=eid,
main=main,
ha=ha)
self.estate.stack = self
self.serverUdp = aiding.SocketUdpNb(ha=self.estate.ha, bufsize=raeting.MAX_MESSAGE_SIZE)
self.serverUdp.reopen() # open socket
self.estate.ha = self.serverUdp.ha # update estate host address after open
self.dumpLocal() # save local estate data
#self.road.dumpLocalEstate(self.estate)
#self.safe.dumpLocalEstate(self.estate)
kepts = self.loadAllRemote() # remote estates from saved data
for kept in kepts:
self.addRemoteEstate(kept)
self.dumpAllRemote() # save remote estate data
def fetchRemoteEstateByHostPort(self, host, port):
'''
@ -93,6 +103,18 @@ class StackUdp(object):
return None
def fetchRemoteEstateByKeys(self, sighex, prihex):
'''
Search for remote estate with matching (name, sighex, prihex)
Return estate if found Otherwise return None
'''
for estate in self.estates.values():
if (estate.signer.keyhex == sighex or
estate.priver.keyhex == prihex):
return estate
return None
def fetchRemoteEstateByName(self, name):
'''
Search for remote estate with matching name
@ -167,6 +189,108 @@ class StackUdp(object):
del self.estates[eid]
del self.eids[estate.name]
def clearLocal(self):
'''
Clear local keeps
'''
self.road.clearLocalData()
self.safe.clearLocalData()
def clearRemote(self, estate):
'''
Clear remote keeps of estate
'''
self.road.clearRemoteEstate()
self.safe.clearRemoteEstate()
def clearAllRemote(self):
'''
Clear all remote keeps
'''
self.road.clearAllRemoteData()
self.safe.clearAllRemoteData()
def dumpLocal(self):
'''
Dump keeps of local estate
'''
self.road.dumpLocalEstate(self.estate)
self.safe.dumpLocalEstate(self.estate)
def dumpRemote(self, estate):
'''
Dump keeps of estate
'''
self.road.dumpRemoteEstate(estate)
self.safe.dumpRemoteEstate(estate)
def dumpRemoteByEid(self, eid):
'''
Dump keeps of estate given by eid
'''
estate = self.estates.get(eid)
if estate:
self.dumpRemote(estate)
def dumpAllRemote(self):
'''
Dump all remotes estates to keeps'''
self.road.dumpAllRemoteEstates(self.estates.values())
self.safe.dumpAllRemoteEstates(self.estates.values())
def loadLocal(self):
'''
Load and Return local estate if keeps found
'''
road = self.road.loadLocalData()
safe = self.safe.loadLocalData()
if not road or not safe:
return None
estate = estating.LocalEstate(stack=self,
eid=road['eid'],
name=road['name'],
main=road['main'],
host=road['host'],
port=road['port'],
sid=road['sid'],
sigkey=safe['sighex'],
prikey=safe['prihex'],)
return estate
def loadAllRemote(self):
'''
Load and Return list of remote estates
remote = estating.RemoteEstate( stack=self.stack,
name=name,
host=data['sh'],
port=data['sp'],
verkey=verhex,
pubkey=pubhex,
rsid=self.sid,
rtid=self.tid, )
self.stack.addRemoteEstate(remote)
'''
estates = []
roads = self.road.loadAllRemoteData()
safes = self.safe.loadAllRemoteData()
if not roads or not safes:
return []
for key, road in roads.items():
if key not in safes:
continue
safe = safes[key]
estate = estating.RemoteEstate( stack=self,
eid=road['eid'],
name=road['name'],
host=road['host'],
port=road['port'],
sid=road['sid'],
rsid=road['rsid'],
verkey=safe['verhex'],
pubkey=safe['pubhex'],)
estates.append(estate)
return estates
def addTransaction(self, index, transaction):
'''
Safely add transaction at index If not already there
@ -258,6 +382,8 @@ class StackUdp(object):
raise raeting.StackError(emsg)
self.txMsgs.append((msg, deid))
transmit = txMsg
def serviceTxMsg(self):
'''
Service .udpTxMsgs queue of outgoint udp messages for message transactions

View File

@ -9,7 +9,7 @@ import os
from ioflo.base.odicting import odict
from salt.transport.road.raet import (raeting, nacling, estating, keeping)
from salt.transport.road.raet import (raeting, nacling, estating, keeping, stacking)
def test():
@ -25,51 +25,120 @@ def test():
masterPubKeyHex = privateer.pubhex
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
minionVerKeyHex = signer.verhex
m1SignKeyHex = signer.keyhex
m1VerKeyHex = signer.verhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
minionPubKeyHex = privateer.pubhex
m1PriKeyHex = privateer.keyhex
m1PubKeyHex = privateer.pubhex
signer = nacling.Signer()
m2SignKeyHex = signer.keyhex
m2VerKeyHex = signer.verhex
privateer = nacling.Privateer()
m2PriKeyHex = privateer.keyhex
m2PubKeyHex = privateer.pubhex
signer = nacling.Signer()
m3SignKeyHex = signer.keyhex
m3VerKeyHex = signer.verhex
privateer = nacling.Privateer()
m3PriKeyHex = privateer.keyhex
m3PubKeyHex = privateer.pubhex
#master stack
estate = estating.LocalEstate(eid=1,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex)
dirpath = os.path.join(os.getcwd(), 'keep', 'master')
estate = estating.LocalEstate( eid=1,
name='master',
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate, dirpath=dirpath)
remote0 = estating.RemoteEstate(eid=2,
stack0.addRemoteEstate(estating.RemoteEstate(eid=2,
ha=('127.0.0.1', 7532),
verkey=minionVerKeyHex,
pubkey=minionPubKeyHex,)
verkey=m1VerKeyHex,
pubkey=m1PubKeyHex,))
remote1 = estating.RemoteEstate(eid=3,
stack0.addRemoteEstate(estating.RemoteEstate(eid=3,
ha=('127.0.0.1', 7533),
verkey=minionVerKeyHex,
pubkey=minionPubKeyHex,)
verkey=m2VerKeyHex,
pubkey=m2PubKeyHex,))
pond = keeping.RoadKeep(dirpath=os.getcwd())
safe = keeping.SafeKeep(dirpath=os.getcwd())
pond.dumpLocalEstate(estate)
pond.dumpRemoteEstate(remote0)
pond.dumpRemoteEstate(remote1)
safe.dumpLocalEstate(estate)
safe.dumpRemoteEstate(remote0)
safe.dumpRemoteEstate(remote1)
data = pond.loadLocalData()
print data
data = pond.loadAllRemoteData()
print data
data = safe.loadLocalData()
print data
data = safe.loadAllRemoteData()
print data
#minion stack
dirpath = os.path.join(os.getcwd(), 'keep', 'minion1')
estate = estating.LocalEstate( eid=2,
name='minion1',
ha=("", raeting.RAET_TEST_PORT),
sigkey=m1SignKeyHex,
prikey=m1PriKeyHex,)
stack1 = stacking.StackUdp(estate=estate, dirpath=dirpath)
stack1.addRemoteEstate(estating.RemoteEstate(eid=1,
ha=('127.0.0.1', 7532),
verkey=masterVerKeyHex,
pubkey=masterPubKeyHex,))
stack1.addRemoteEstate(estating.RemoteEstate(eid=4,
ha=('127.0.0.1', 7534),
verkey=m3VerKeyHex,
pubkey=m3PubKeyHex,))
stack0.clearLocal()
stack0.clearAllRemote()
stack1.clearLocal()
stack1.clearAllRemote()
stack0.dumpLocal()
stack0.dumpAllRemote()
stack1.dumpLocal()
stack1.dumpAllRemote()
print "Road {0}".format(stack0.name)
print stack0.road.loadLocalData()
print stack0.road.loadAllRemoteData()
print "Safe {0}".format(stack0.name)
print stack0.safe.loadLocalData()
print stack0.safe.loadAllRemoteData()
print
print "Road {0}".format(stack1.name)
print stack1.road.loadLocalData()
print stack1.road.loadAllRemoteData()
print "Safe {0}".format(stack1.name)
print stack1.safe.loadLocalData()
print stack1.safe.loadAllRemoteData()
stack0.serverUdp.close()
stack1.serverUdp.close()
#master stack
dirpath = os.path.join(os.getcwd(), 'keep', 'master')
estate = estating.LocalEstate( eid=1,
name='master',
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate, dirpath=dirpath)
#minion stack
dirpath = os.path.join(os.getcwd(), 'keep', 'minion1')
estate = estating.LocalEstate( eid=2,
name='minion1',
ha=("", raeting.RAET_TEST_PORT),
sigkey=m1SignKeyHex,
prikey=m1PriKeyHex,)
stack1 = stacking.StackUdp(estate=estate, dirpath=dirpath)
estate0 = stack0.loadLocal()
print estate0.name, estate0.eid, estate0.sid, estate0.ha, estate0.signer, estate0.priver
estate1 = stack1.loadLocal()
print estate1.name, estate1.eid, estate1.sid, estate1.ha, estate1.signer, estate1.priver
stack0.clearLocal()
stack0.clearAllRemote()
stack1.clearLocal()
stack1.clearAllRemote()
if __name__ == "__main__":

View File

@ -6,76 +6,96 @@ Tests to try out packeting. Potentially ephemeral
# pylint: skip-file
# pylint: disable=C0103
import os
from ioflo.base.odicting import odict
from salt.transport.road.raet import (raeting, nacling, packeting,
from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, transacting, stacking)
def test():
def test( bk = raeting.bodyKinds.json):
'''
Test packeting.
'''
data = odict(hk=1, bk=raeting.bodyKinds.json)
data = odict(hk=1, bk=bk)
body = odict(msg='Hello Raet World', extra='what is this')
packet0 = packeting.TxPacket(embody=body, data=data, )
print packet0.body.data
packet0.pack()
print packet0.packed
packet1 = packeting.RxPacket(packed=packet0.packed)
packet1.parse()
print packet1.data
print packet1.body.data
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
data.update(bk=raeting.bodyKinds.raw)
packet0 = packeting.TxPacket(embody=stuff, data=data, )
packet0.pack()
print packet0.packed
packet1 = packeting.RxPacket(packed=packet0.packed)
packet1.parse()
print packet1.data
print packet1.body.data
rejoin = []
if packet0.segmented:
for index, segment in packet0.segments.items():
print index, segment.packed
rejoin.append(segment.body.packed)
rejoin = "".join(rejoin)
print stuff == rejoin
#master stack
masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
masterVerKeyHex = signer.verhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
masterPubKeyHex = privateer.pubhex
dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
#minion stack
minionName = "minion"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
minionVerKeyHex = signer.verhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
minionPubKeyHex = privateer.pubhex
dirpathMinion = os.path.join(os.getcwd(), 'keep', minionName)
#master stack
estate = estating.LocalEstate(eid=1,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex)
stack0 = stacking.StackUdp(estate=estate)
keeping.clearAllRoadSafe(dirpathMaster)
keeping.clearAllRoadSafe(dirpathMinion)
remote1 = estating.RemoteEstate( eid=2,
estate = estating.LocalEstate( eid=1,
name=masterName,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex)
stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
remote1 = estating.RemoteEstate( eid=2,
name=minionName,
ha=("127.0.0.1", raeting.RAET_TEST_PORT),
verkey=minionVerKeyHex,
pubkey=minionPubKeyHex,)
stack0.addRemoteEstate(remote1)
#minion stack
estate = estating.LocalEstate( eid=2,
name=minionName,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack1 = stacking.StackUdp(estate=estate)
remote0 = estating.RemoteEstate( eid=1,
name=masterName,
ha=('127.0.0.1', raeting.RAET_PORT),
verkey=masterVerKeyHex,
pubkey=masterPubKeyHex,)
@ -84,10 +104,11 @@ def test():
remote0.publee = nacling.Publican(key=remote1.privee.pubhex)
remote1.publee = nacling.Publican(key=remote0.privee.pubhex)
print "\n___________Raw Body Test"
data.update(se=1, de=2, bk=raeting.bodyKinds.raw, fk=raeting.footKinds.nacl)
packet0 = packeting.TxPacket(stack=stack0, embody=stuff, data=data, )
packet0.pack()
print packet0.packed
print packet0.packed #not signed if segmented each segment is signed
rejoin = []
if packet0.segmented:
@ -121,7 +142,9 @@ def test():
body = odict(stuff=stuff)
print body
data.update(se=1, de=2, bk=raeting.bodyKinds.json, fk=raeting.footKinds.nacl)
print "\n_____________ Packed Body Test"
data.update(se=1, de=2, bk=bk, fk=raeting.footKinds.nacl)
packet0 = packeting.TxPacket(stack=stack0, embody=body, data=data, )
packet0.pack()
print packet0.packed
@ -149,6 +172,8 @@ def test():
body = odict(stuff=stuff)
print body
print "\n___________ Encrypted Coat Test "
data.update(se=1, de=2,
bk=raeting.bodyKinds.json,
ck=raeting.coatKinds.nacl,
@ -186,5 +211,7 @@ def test():
print segmentage.body.data
print segmentage.body.packed == packet0.body.packed
if __name__ == "__main__":
test()
test(bk=raeting.bodyKinds.msgpack)

View File

@ -0,0 +1,307 @@
# -*- coding: utf-8 -*-
'''
Tests to try out stacking. Potentially ephemeral
'''
# pylint: skip-file
import os
from ioflo.base.odicting import odict
from ioflo.base.aiding import Timer
from ioflo.base.consoling import getConsole
console = getConsole()
from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, yarding, transacting, stacking)
def test():
'''
initially
master on port 7530 with eid of 1
minion on port 7531 with eid of 0
eventually
master eid of 1
minion eid of 2
'''
console.reinit(verbosity=console.Wordage.concise)
#master stack
masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
road = keeping.RoadKeep(dirpath=dirpathMaster)
road.clearLocalData()
road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMaster)
safe.clearLocalData()
safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=1,
name=masterName,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
#minion0 stack
minionName0 = "minion0"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
dirpathMinion0 = os.path.join(os.getcwd(), 'keep', minionName0)
road = keeping.RoadKeep(dirpath=dirpathMinion0)
road.clearLocalData()
road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMinion0)
safe.clearLocalData()
safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=0,
name=minionName0,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion0)
#minion1 stack
minionName1 = "minion1"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
dirpathMinion1 = os.path.join(os.getcwd(), 'keep', minionName1)
road = keeping.RoadKeep(dirpath=dirpathMinion1)
road.clearLocalData()
road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMinion1)
safe.clearLocalData()
safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=0,
name=minionName1,
ha=("", 7532),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack2 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion1)
print "\n********* Join Transaction **********"
stack1.join()
timer = Timer(duration=2)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
for estate in stack1.estates.values():
print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
print "\n********* Allow Transaction **********"
stack1.allow()
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
for estate in stack1.estates.values():
print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
print "\n********* Message Transactions Both Ways **********"
stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
stack1.transmit(odict(house="Oh Boy2", queue="Mean"))
stack1.transmit(odict(house="Oh Boy3", queue="Ugly"))
stack1.transmit(odict(house="Oh Boy4", queue="Pretty"))
stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
stack0.transmit(odict(house="Yeah Baby2", queue="Bad"))
stack0.transmit(odict(house="Yeah Baby3", queue="Fast"))
stack0.transmit(odict(house="Yeah Baby4", queue="Slow"))
#segmented packets
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
print "Road {0}".format(stack0.name)
print stack0.road.loadLocalData()
print stack0.road.loadAllRemoteData()
print "Safe {0}".format(stack0.name)
print stack0.safe.loadLocalData()
print stack0.safe.loadAllRemoteData()
print
print "Road {0}".format(stack1.name)
print stack1.road.loadLocalData()
print stack1.road.loadAllRemoteData()
print "Safe {0}".format(stack1.name)
print stack1.safe.loadLocalData()
print stack1.safe.loadAllRemoteData()
print
stack0.serverUdp.close()
stack1.serverUdp.close()
stack2.serverUdp.close()
#estate0 = stack0.loadLocal()
#print estate0.name, estate0.eid, estate0.sid, estate0.ha, estate0.signer, estate0.priver
#estate1 = stack1.loadLocal()
#print estate1.name, estate1.eid, estate1.sid, estate1.ha, estate1.signer, estate1.priver
#master stack
stack0 = stacking.StackUdp(dirpath=dirpathMaster, main=True)
#minion0 stack
stack1 = stacking.StackUdp(dirpath=dirpathMinion0)
#minion1 stack
stack2 = stacking.StackUdp(dirpath=dirpathMinion1)
print "\n********* Join Transaction **********"
stack1.join()
stack2.join()
timer = Timer(duration=2)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack2.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "{0} Remote Estate {1} joined= {2}".format(
stack0.name, estate.eid, estate.joined)
for estate in stack1.estates.values():
print "{0} Remote Estate {1} joined= {2}".format(
stack1.name, estate.eid, estate.joined)
for estate in stack2.estates.values():
print "{0} Remote Estate {1} joined= {2}".format(
stack2.name, estate.eid, estate.joined)
print "\n********* Allow Transaction **********"
stack1.allow()
stack2.allow()
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack2.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "{0} Remote Estate {1} allowed= {2}".format(
stack1.name, estate.eid, estate.allowed)
for estate in stack1.estates.values():
print "{0} Remote Estate {1} allowed= {2}".format(
stack1.name, estate.eid, estate.allowed)
for estate in stack2.estates.values():
print "{0} Remote Estate {1} allowed= {2}".format(
stack2.name, estate.eid, estate.allowed)
print "\n********* Message Transactions Both Ways **********"
#console.reinit(verbosity=console.Wordage.verbose)
stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
stack1.transmit(odict(house="Oh Boy2", queue="Mean"))
stack1.transmit(odict(house="Oh Boy3", queue="Ugly"))
stack1.transmit(odict(house="Oh Boy4", queue="Pretty"))
stack2.transmit(odict(house="Really 1", queue="blue"))
stack2.transmit(odict(house="Really 2", queue="green"))
stack2.transmit(odict(house="Really 3", queue="red"))
stack2.transmit(odict(house="Really 4", queue="yello"))
stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
stack0.transmit(odict(house="Yeah Baby2", queue="Bad"))
stack0.transmit(odict(house="Yeah Baby3", queue="Fast"))
stack0.transmit(odict(house="Yeah Baby4", queue="Slow"))
stack0.transmit(odict(house="Yeah Momma 1", queue="host"), stack2.estate.eid)
stack0.transmit(odict(house="Yeah Momma 1", queue="cold"), stack2.estate.eid)
stack0.transmit(odict(house="Yeah Momma 1", queue="boiling"), stack2.estate.eid)
stack0.transmit(odict(house="Yeah Momma 1", queue="tepid"), stack2.estate.eid)
# segmented packets
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
stack2.transmit(odict(house="Lucky duck", queue="medium stuff", stuff=stuff))
stack0.transmit(odict(house="Boogle", queue="hight stuff", stuff=stuff), stack2.estate.eid)
timer.restart(duration=4)
while not timer.expired:
stack1.serviceAll()
stack2.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack2.name)
for msg in stack2.rxMsgs:
print msg
print
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
print "{0} eid={1}".format(stack2.name, stack2.estate.eid)
print "{0} estates=\n{1}".format(stack2.name, stack2.estates)
print "{0} transactions=\n{1}".format(stack2.name, stack2.transactions)
stack0.serverUdp.close()
stack1.serverUdp.close()
stack2.serverUdp.close()
#stack0.clearLocal()
#stack0.clearAllRemote()
stack1.clearLocal()
stack1.clearAllRemote()
stack2.clearLocal()
stack2.clearAllRemote()
if __name__ == "__main__":
test()

View File

@ -3,6 +3,8 @@
Tests to try out stacking. Potentially ephemeral
'''
import os
# pylint: skip-file
from ioflo.base.odicting import odict
from ioflo.base.aiding import Timer
@ -10,11 +12,11 @@ from ioflo.base.aiding import Timer
from ioflo.base.consoling import getConsole
console = getConsole()
from salt.transport.road.raet import (raeting, nacling, packeting,
from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, yarding, transacting, stacking)
def testStackUdp():
def testStackUdp(bk=raeting.bodyKinds.json):
'''
initially
master on port 7530 with eid of 1
@ -25,31 +27,50 @@ def testStackUdp():
'''
console.reinit(verbosity=console.Wordage.concise)
stacking.StackUdp.Bk = bk #set class body kind for serialization
#master stack
masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
road = keeping.RoadKeep(dirpath=dirpathMaster)
road.clearLocalData()
road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMaster)
safe.clearLocalData()
safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=1,
name=masterName,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
#minion stack
minionName = "minion1"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
#master stack
estate = estating.LocalEstate( eid=1,
name='master',
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate)
dirpathMinion = os.path.join(os.getcwd(), 'keep', minionName)
road = keeping.RoadKeep(dirpath=dirpathMinion)
road.clearLocalData()
road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMinion)
safe.clearLocalData()
safe.clearAllRemoteData()
#minion stack
estate = estating.LocalEstate( eid=0,
name='minion1',
name=minionName,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack1 = stacking.StackUdp(estate=estate)
stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion)
print "\n********* Join Transaction **********"
stack1.join()
@ -288,195 +309,17 @@ def testStackUdp():
for msg in stack1.rxMsgs:
print msg
stack0.serverUdp.close()
stack1.serverUdp.close()
def testStackUxd():
'''
initially
'''
console.reinit(verbosity=console.Wordage.concise)
#lord stack
#yard0 = yarding.Yard(name='lord')
stack0 = stacking.StackUxd()
#serf stack
#yard1 = yarding.Yard(name='serf', yid=1)
stack1 = stacking.StackUxd()
stack0.addRemoteYard(stack1.yard)
stack1.addRemoteYard(stack0.yard)
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
print "\n********* UXD Message lord to serf serf to lord **********"
msg = odict(what="This is a message to the serf. Get to Work", extra="Fix the fence.")
stack0.transmit(msg=msg)
msg = odict(what="This is a message to the lord. Let me be", extra="Go away.")
stack1.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "\n********* Multiple Messages Both Ways **********"
stack1.transmit(odict(house="Mama mia1", queue="fix me"), None)
stack1.transmit(odict(house="Mama mia2", queue="help me"), None)
stack1.transmit(odict(house="Mama mia3", queue="stop me"), None)
stack1.transmit(odict(house="Mama mia4", queue="run me"), None)
stack0.transmit(odict(house="Papa pia1", queue="fix me"), None)
stack0.transmit(odict(house="Papa pia2", queue="help me"), None)
stack0.transmit(odict(house="Papa pia3", queue="stop me"), None)
stack0.transmit(odict(house="Papa pia4", queue="run me"), None)
#big packets
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.transmit(odict(house="Mama mia1", queue="big stuff", stuff=stuff), None)
stack0.transmit(odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
src = ('minion', 'serf', None)
dst = ('master', None, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Hey buddy what is up?")
stack0.transmit(msg)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
estate = 'minion1'
#lord stack yard0
stack0 = stacking.StackUxd(name='lord', lanename='cherry')
#serf stack yard1
stack1 = stacking.StackUxd(name='serf', lanename='cherry')
print "Yid", yarding.Yard.Yid
print "\n********* Attempt Auto Accept ************"
#stack0.addRemoteYard(stack1.yard)
yard = yarding.Yard( name=stack0.yard.name, prefix='cherry')
stack1.addRemoteYard(yard)
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
print "\n********* UXD Message serf to lord **********"
src = (estate, stack1.yard.name, None)
dst = (estate, stack0.yard.name, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Serf to my lord. Feed me!")
stack1.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "\n********* UXD Message lord to serf **********"
src = (estate, stack0.yard.name, None)
dst = (estate, stack1.yard.name, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Lord to serf. Feed yourself!")
stack0.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
stack0.clearLocal()
stack0.clearAllRemote()
stack1.clearLocal()
stack1.clearAllRemote()
if __name__ == "__main__":
testStackUdp()
testStackUxd()
testStackUdp(bk=raeting.bodyKinds.msgpack)

View File

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
'''
Tests to try out stacking. Potentially ephemeral
'''
# pylint: skip-file
import os
from ioflo.base.odicting import odict
from ioflo.base.aiding import Timer
from ioflo.base.consoling import getConsole
console = getConsole()
from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, yarding, transacting, stacking)
def test():
'''
initially
master on port 7530 with eid of 1
minion on port 7531 with eid of 0
eventually
master eid of 1
minion eid of 2
'''
console.reinit(verbosity=console.Wordage.concise)
#master stack
masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
road = keeping.RoadKeep(dirpath=dirpathMaster)
#road.clearLocalData()
#road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMaster)
#safe.clearLocalData()
#safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=1,
name=masterName,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
#minion0 stack
minionName0 = "minion0"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
dirpathMinion0 = os.path.join(os.getcwd(), 'keep', minionName0)
road = keeping.RoadKeep(dirpath=dirpathMinion0)
#road.clearLocalData()
#road.clearAllRemoteData()
safe = keeping.SafeKeep(dirpath=dirpathMinion0)
#safe.clearLocalData()
#safe.clearAllRemoteData()
estate = estating.LocalEstate( eid=0,
name=minionName0,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion0)
print "\n********* Join Transaction **********"
stack1.join()
timer = Timer(duration=2)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
for estate in stack1.estates.values():
print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
print "\n********* Allow Transaction **********"
stack1.allow()
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
for estate in stack0.estates.values():
print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
for estate in stack1.estates.values():
print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
print "\n********* Message Transactions Both Ways **********"
stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
#segmented packets
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
print "Road {0}".format(stack0.name)
print stack0.road.loadLocalData()
print stack0.road.loadAllRemoteData()
print "Safe {0}".format(stack0.name)
print stack0.safe.loadLocalData()
print stack0.safe.loadAllRemoteData()
print
print "Road {0}".format(stack1.name)
print stack1.road.loadLocalData()
print stack1.road.loadAllRemoteData()
print "Safe {0}".format(stack1.name)
print stack1.safe.loadLocalData()
print stack1.safe.loadAllRemoteData()
print
stack0.serverUdp.close()
stack1.serverUdp.close()
#stack0.clearLocal()
#stack0.clearAllRemote()
#stack1.clearLocal()
#stack1.clearAllRemote()
if __name__ == "__main__":
test()

View File

@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
'''
Tests to try out stacking. Potentially ephemeral
'''
# pylint: skip-file
from ioflo.base.odicting import odict
from ioflo.base.aiding import Timer
from ioflo.base.consoling import getConsole
console = getConsole()
from salt.transport.road.raet import (raeting, nacling, packeting,
estating, yarding, transacting, stacking)
def testStackUxd():
'''
initially
'''
console.reinit(verbosity=console.Wordage.concise)
#lord stack
#yard0 = yarding.Yard(name='lord')
stack0 = stacking.StackUxd()
#serf stack
#yard1 = yarding.Yard(name='serf', yid=1)
stack1 = stacking.StackUxd()
stack0.addRemoteYard(stack1.yard)
stack1.addRemoteYard(stack0.yard)
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
print "\n********* UXD Message lord to serf serf to lord **********"
msg = odict(what="This is a message to the serf. Get to Work", extra="Fix the fence.")
stack0.transmit(msg=msg)
msg = odict(what="This is a message to the lord. Let me be", extra="Go away.")
stack1.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "\n********* Multiple Messages Both Ways **********"
stack1.transmit(odict(house="Mama mia1", queue="fix me"), None)
stack1.transmit(odict(house="Mama mia2", queue="help me"), None)
stack1.transmit(odict(house="Mama mia3", queue="stop me"), None)
stack1.transmit(odict(house="Mama mia4", queue="run me"), None)
stack0.transmit(odict(house="Papa pia1", queue="fix me"), None)
stack0.transmit(odict(house="Papa pia2", queue="help me"), None)
stack0.transmit(odict(house="Papa pia3", queue="stop me"), None)
stack0.transmit(odict(house="Papa pia4", queue="run me"), None)
#big packets
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
stuff = "".join(stuff)
stack1.transmit(odict(house="Mama mia1", queue="big stuff", stuff=stuff), None)
stack0.transmit(odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
src = ('minion', 'serf', None)
dst = ('master', None, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Hey buddy what is up?")
stack0.transmit(msg)
timer.restart(duration=2)
while not timer.expired:
stack1.serviceAll()
stack0.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
estate = 'minion1'
#lord stack yard0
stack0 = stacking.StackUxd(name='lord', lanename='cherry')
#serf stack yard1
stack1 = stacking.StackUxd(name='serf', lanename='cherry')
print "Yid", yarding.Yard.Yid
print "\n********* Attempt Auto Accept ************"
#stack0.addRemoteYard(stack1.yard)
yard = yarding.Yard( name=stack0.yard.name, prefix='cherry')
stack1.addRemoteYard(yard)
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
print "\n********* UXD Message serf to lord **********"
src = (estate, stack1.yard.name, None)
dst = (estate, stack0.yard.name, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Serf to my lord. Feed me!")
stack1.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "\n********* UXD Message lord to serf **********"
src = (estate, stack0.yard.name, None)
dst = (estate, stack1.yard.name, None)
route = odict(src=src, dst=dst)
msg = odict(route=route, stuff="Lord to serf. Feed yourself!")
stack0.transmit(msg=msg)
timer = Timer(duration=0.5)
timer.restart()
while not timer.expired:
stack0.serviceAll()
stack1.serviceAll()
print "{0} Received Messages".format(stack0.name)
for msg in stack0.rxMsgs:
print msg
print
print "{0} Received Messages".format(stack1.name)
for msg in stack1.rxMsgs:
print msg
print
print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
print "{0} names=\n{1}".format(stack0.name, stack0.names)
print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
print "{0} names=\n{1}".format(stack1.name, stack1.names)
if __name__ == "__main__":
testStackUxd()

View File

@ -71,18 +71,10 @@ class Transaction(object):
'''
le = self.stack.estate.eid
if le == 0: #bootstapping onto channel use ha
host = self.stack.estate.host
if host == '0.0.0.0':
host = '127.0.0.1'
le = (host, self.stack.estate.port)
#le = self.stack.estate.ha
le = self.stack.estate.ha
re = self.reid
if re == 0:
host = self.stack.estates[self.reid].host
if host == '0.0.0.0':
host = '127.0.0.1'
re = (host, self.stack.estates[self.reid].port)
#re = self.stack.estates[self.reid].ha
if re == 0: #bootstapping onto channel use ha
re = self.stack.estates[self.reid].ha
return ((self.rmt, le, re, self.sid, self.tid, self.bcst,))
def process(self):
@ -93,7 +85,7 @@ class Transaction(object):
def receive(self, packet):
'''
Process received packet
Process received packet Subclasses should super call this
'''
self.rxPacket = packet
@ -197,6 +189,14 @@ class Joiner(Initiator):
elif packet.data['pk'] == raeting.pcktKinds.response:
self.accept()
def process(self):
'''
Perform time based processing of transaction
'''
# need keep sending join until accepted or timed out
#self.join()
def prep(self):
'''
Prepare .txData
@ -283,19 +283,56 @@ class Joiner(Initiator):
emsg = "Missing remote crypt key in accept packet"
raise raeting.TransactionError(emsg)
index = self.index # save before we change it
#index = self.index # save before we change it
self.stack.estate.eid = leid
self.stack.dumpLocal()
remote = self.stack.estates[self.reid]
remote.verfer = nacling.Verifier(key=verhex)
remote.pubber = nacling.Publican(key=pubhex)
if remote.verfer.keyhex != verhex:
remote.verfer = nacling.Verifier(key=verhex)
if remote.pubber.keyhex != pubhex:
remote.pubber = nacling.Publican(key=pubhex)
if remote.eid != reid: #move remote estate to new index
self.stack.moveRemoteEstate(old=remote.eid, new=reid)
if remote.name != name: # rename remote estate to new name
self.stack.renameRemoteEstate(old=remote.name, new=name)
self.reid = reid
# we are assuming for now that the joiner cannot talk peer to peer only
# to main estate otherwise we need to ensure unique eid, name, and ha on road
# Need to verify if remote keys are accepted here
remote.joined = True #accepted
remote.nextSid()
self.remove(index)
self.stack.dumpRemote(remote)
self.ackAccept() #need to ack before we remove as index as changed
def ackAccept(self):
'''
Send ack to accept response
'''
if self.reid not in self.stack.estates:
emsg = "Invalid remote destination estate id '{0}'".format(self.reid)
raise raeting.TransactionError(emsg)
body = odict()
packet = packeting.TxPacket(stack=self.stack,
kind=raeting.pcktKinds.ack,
embody=body,
data=self.txData)
try:
packet.pack()
except raeting.PacketError as ex:
print ex
self.remove(self.rxPacket.index)
return
self.transmit(packet)
self.remove(self.rxPacket.index) # since index changed
class Joinent(Correspondent):
'''
@ -311,14 +348,26 @@ class Joinent(Correspondent):
# Since corresponding bootstrap transaction use packet.index not self.index
self.add(self.rxPacket.index)
def receive(self, packet):
"""
Process received packet belonging to this transaction
"""
super(Joinent, self).receive(packet) # self.rxPacket = packet
if packet.data['tk'] == raeting.trnsKinds.join:
if packet.data['pk'] == raeting.pcktKinds.ack: #pended
self.joined()
def process(self):
'''
Perform time based processing of transaction
'''
# need to perform the check for accepted status somewhere
# need to perform the check for accepted status and then send accept
self.accept()
# need to retry accept packet until get ackAccept transaction ends
def prep(self):
'''
Prepare .txData
@ -340,6 +389,32 @@ class Joinent(Correspondent):
'''
Process join packet
Perform pend operation of pending remote estate being accepted onto channel
Apply the rules to ensure no colliding estates on (host, port)
If matching name estate found then return
Rules:
Only one estate with given eid is allowed on road
Only one estate with given name is allowed on road.
Only one estate with given ha on road is allowed on road.
Are multiple estates with same keys but different name (ha) allowed?
Current logic ignores same keys or not
Since creating new estate assigns unique eid,
we are looking for preexisting estates with any eid.
Processing steps:
I) Search remote estates for matching name
A) Found remote
1) HA not match
Search remotes for other matching HA but different name
If found other delete
Reuse found remote to be updated and joined
B) Not found
Search remotes for other matching HA
If found delete for now
Create new remote and update
'''
if not self.stack.parseInner(self.rxPacket):
return
@ -361,31 +436,37 @@ class Joinent(Correspondent):
emsg = "Missing remote crypt key in join packet"
raise raeting.TransactionError(emsg)
remote = self.stack.fetchRemoteEstateByHostPort(host=data['sh'], port=data['sp'])
host = data['sh']
port = data['sp']
remote = self.stack.fetchRemoteEstateByName(name)
if remote:
if (name != remote.name or
verhex != remote.verfer.keyhex or
pubhex != remote.pubber.keyhex):
# nack join request another estate at same address but different credentials
# and return
pass
# accept here and return
if not (host == remote.host and port == remote.port):
other = self.stack.fetchRemoteEstateByHostPort(host, port)
if other and other is not remote: #may need to terminate transactions
self.stack.removeRemoteEstate(other.eid)
remote.host = host
remote.port = port
if remote.verfer.keyhex != verhex:
remote.verfer = nacling.Verifier(verhex)
if remote.pubber.keyhex != pubhex:
remote.pubber = nacling.Publican(pubhex)
remote.rsid = self.sid
remote.rtid = self.tid
else:
other = self.stack.fetchRemoteEstateByHostPort(host, port)
if other: #may need to terminate transactions
self.stack.removeRemoteEstate(other.eid)
remote = estating.RemoteEstate( stack=self.stack,
name=name,
host=host,
port=port,
verkey=verhex,
pubkey=pubhex,
rsid=self.sid,
rtid=self.tid, )
self.stack.addRemoteEstate(remote) #provisionally add .accepted is None
if name in self.stack.eids:
emsg = "Another estate with name already exists"
raise raeting.TransactionError(emsg)
remote = estating.RemoteEstate( stack=self.stack,
name=name,
host=data['sh'],
port=data['sp'],
verkey=verhex,
pubkey=pubhex,
rsid=self.sid,
rtid=self.tid, )
self.stack.addRemoteEstate(remote) #provisionally add .accepted is None
self.stack.dumpRemote(remote)
self.reid = remote.eid # auto generated at instance creation above
self.ackJoin()
@ -398,7 +479,7 @@ class Joinent(Correspondent):
emsg = "Invalid remote destination estate id '{0}'".format(self.reid)
raise raeting.TransactionError(emsg)
#since bootstrap transaction use updated self.rid
#since bootstrap transaction use updated self.reid changed in self.join()
self.txData.update( dh=self.stack.estates[self.reid].host,
dp=self.stack.estates[self.reid].port,)
body = odict()
@ -440,9 +521,17 @@ class Joinent(Correspondent):
print ex
self.remove(self.rxPacket.index)
return
self.transmit(packet)
def joined(self):
'''
process ack to accept response
'''
remote = self.stack.estates[self.reid]
remote.joined = True # accepted
remote.nextSid()
self.transmit(packet)
self.stack.dumpRemote(remote)
self.remove(self.rxPacket.index)
class Allower(Initiator):