mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Merge pull request #10670 from SmithSamuelM/sam_raet_9
Sam raet 9 Added name attributed to devices and .dids to stack so can reverse lookup did by name
This commit is contained in:
commit
e98347a668
@ -23,7 +23,7 @@ class Device(object):
|
||||
'''
|
||||
Did = 2 # class attribute
|
||||
|
||||
def __init__(self, stack=None, did=None, sid=0, tid=0,
|
||||
def __init__(self, stack=None, did=None, name="", sid=0, tid=0,
|
||||
host="", port=raeting.RAET_PORT, ha=None, ):
|
||||
'''
|
||||
Setup Device instance
|
||||
@ -37,6 +37,7 @@ class Device(object):
|
||||
else:
|
||||
did = 0
|
||||
self.did = did # device ID
|
||||
self.name = name or "device{0}".format(self.did)
|
||||
|
||||
self.sid = sid # current session ID
|
||||
self.tid = tid # current transaction ID
|
||||
|
@ -481,7 +481,7 @@ class TxPacket(Packet):
|
||||
if self.size <= raeting.MAX_SEGMENT_SIZE:
|
||||
self.sign()
|
||||
else:
|
||||
print "****Segmentize**** packet size = {0}".format(self.size)
|
||||
#print "****Segmentize**** packet size = {0}".format(self.size)
|
||||
self.segmentize()
|
||||
|
||||
def segmentize(self):
|
||||
@ -646,7 +646,9 @@ class RxPacket(Packet):
|
||||
When done ready to call parseInner on self
|
||||
'''
|
||||
hauls = []
|
||||
for segment in self.segments.values():
|
||||
sc = self.data['sc']
|
||||
for i in range(0, sc):
|
||||
segment = self.segments[i]
|
||||
hl = segment.data['hl']
|
||||
fl = segment.data['fl']
|
||||
haul = segment.packed[hl:segment.size - fl]
|
||||
|
@ -12,6 +12,7 @@ from collections import deque
|
||||
# Import ioflo libs
|
||||
from ioflo.base.odicting import odict
|
||||
from ioflo.base import aiding
|
||||
from ioflo.base import storing
|
||||
|
||||
from . import raeting
|
||||
from . import nacling
|
||||
@ -35,11 +36,12 @@ class StackUdp(object):
|
||||
def __init__(self,
|
||||
name='',
|
||||
version=raeting.VERSION,
|
||||
store=None,
|
||||
device=None,
|
||||
did=None,
|
||||
ha=("", raeting.RAET_PORT),
|
||||
udpRxMsgs = None,
|
||||
udpTxMsgs = None,
|
||||
rxMsgs = None,
|
||||
txMsgs = None,
|
||||
udpRxes = None,
|
||||
udpTxes = None,
|
||||
):
|
||||
@ -51,12 +53,14 @@ class StackUdp(object):
|
||||
StackUdp.Count += 1
|
||||
self.name = name
|
||||
self.version = version
|
||||
self.devices = odict() # remote devices attached to this stack
|
||||
self.store = store or storing.Store(stamp=0.0)
|
||||
self.devices = odict() # remote devices attached to this stack by did
|
||||
self.dids = odict() # reverse lookup did by device.name
|
||||
# local device for this stack
|
||||
self.device = device or devicing.LocalDevice(stack=self, did=did, ha=ha)
|
||||
self.transactions = odict() #transactions
|
||||
self.udpRxMsgs = udpRxMsgs or deque() # messages received
|
||||
self.udpTxMsgs = udpTxMsgs or deque() # messages to transmit (msg, ddid) ddid=0 is broadcast
|
||||
self.rxMsgs = rxMsgs or deque() # messages received
|
||||
self.txMsgs = txMsgs or deque() # messages to transmit (msg, ddid) ddid=0 is broadcast
|
||||
self.udpRxes = udpRxes or deque() # udp packets received
|
||||
self.udpTxes = udpTxes or deque() # udp packet to transmit
|
||||
self.serverUdp = aiding.SocketUdpNb(ha=self.device.ha)
|
||||
@ -71,35 +75,72 @@ class StackUdp(object):
|
||||
did = device.did
|
||||
|
||||
if did in self.devices:
|
||||
msg = "Device with id '{0}' alreadys exists".format(did)
|
||||
raise raeting.StackError(msg)
|
||||
emsg = "Device with id '{0}' alreadys exists".format(did)
|
||||
raise raeting.StackError(emsg)
|
||||
device.stack = self
|
||||
self.devices[did] = device
|
||||
if device.name in self.dids:
|
||||
emsg = "Device with name '{0}' alreadys exists".format(device.name)
|
||||
raise raeting.StackError(emsg)
|
||||
self.dids[device.name] = device.did
|
||||
|
||||
def moveRemoteDevice(self, odid, ndid):
|
||||
def moveRemoteDevice(self, old, new):
|
||||
'''
|
||||
Move device at odid to ndid
|
||||
Move device at key old did to key new did but keep same index
|
||||
'''
|
||||
if ndid in self.devices:
|
||||
msg = "Cannot move, '{0}' already exists".format(ndid)
|
||||
raise raeting.StackError(msg)
|
||||
if new in self.devices:
|
||||
emsg = "Cannot move, '{0}' already exists".format(new)
|
||||
raise raeting.StackError(emsg)
|
||||
|
||||
if odid not in self.devices:
|
||||
msg = "Cannot move '{0}' does not exist".format(odid)
|
||||
raise raeting.StackError(msg)
|
||||
if old not in self.devices:
|
||||
emsg = "Cannot move '{0}' does not exist".format(old)
|
||||
raise raeting.StackError(emsg)
|
||||
|
||||
device = self.devices[odid]
|
||||
del self.devices[odid]
|
||||
device.did = ndid
|
||||
self.devices.insert(0, device.did, device)
|
||||
device = self.devices[old]
|
||||
index = self.devices.keys().index(old)
|
||||
device.did = new
|
||||
self.dids[device.name] = new
|
||||
del self.devices[old]
|
||||
self.devices.insert(index, device.did, device)
|
||||
|
||||
def renameRemoteDevice(self, old, new):
|
||||
'''
|
||||
rename device with old name to new name but keep same index
|
||||
'''
|
||||
if new in self.dids:
|
||||
emsg = "Cannot rename, '{0}' already exists".format(new)
|
||||
raise raeting.StackError(emsg)
|
||||
|
||||
if old not in self.dids:
|
||||
emsg = "Cannot rename '{0}' does not exist".format(old)
|
||||
raise raeting.StackError(emsg)
|
||||
|
||||
did = self.dids[old]
|
||||
device = self.devices[did]
|
||||
device.name = new
|
||||
index = self.dids.keys().index(old)
|
||||
del self.dids[old]
|
||||
self.dids.insert(index, device.name, device.did)
|
||||
|
||||
def removeRemoteDevice(self, did):
|
||||
'''
|
||||
Remove device at key did
|
||||
'''
|
||||
if did not in self.devices:
|
||||
emsg = "Cannot remove, '{0}' does not exist".format(did)
|
||||
raise raeting.StackError(emsg)
|
||||
|
||||
device = self.devices[did]
|
||||
del self.devices[did]
|
||||
del self.dids[device.name]
|
||||
|
||||
def addTransaction(self, index, transaction):
|
||||
'''
|
||||
Safely add transaction at index If not already there
|
||||
'''
|
||||
self.transactions[index] = transaction
|
||||
print "Added {0} transaction to {1} at '{2}'".format(
|
||||
transaction.__class__.__name__, self.name, index)
|
||||
console.verbose( "Added {0} transaction to {1} at '{2}'".format(
|
||||
transaction.__class__.__name__, self.name, index))
|
||||
|
||||
def removeTransaction(self, index, transaction=None):
|
||||
'''
|
||||
@ -146,10 +187,10 @@ class StackUdp(object):
|
||||
'''
|
||||
Service .udpTxMsgs queue of outgoint udp messages for message transactions
|
||||
'''
|
||||
while self.udpTxMsgs:
|
||||
body, ddid = self.udpTxMsgs.popleft() # duple (body dict, destination did)
|
||||
while self.txMsgs:
|
||||
body, ddid = self.txMsgs.popleft() # duple (body dict, destination did)
|
||||
self.message(body, ddid)
|
||||
print "{0} sending\n{1}".format(self.name, body)
|
||||
console.verbose("{0} sending\n{1}".format(self.name, body))
|
||||
|
||||
def fetchParseUdpRx(self):
|
||||
'''
|
||||
@ -163,7 +204,7 @@ class StackUdp(object):
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
print "{0} received packet\n{1}".format(self.name, raw)
|
||||
console.verbose("{0} received packet\n{1}".format(self.name, raw))
|
||||
|
||||
packet = packeting.RxPacket(stack=self, packed=raw)
|
||||
try:
|
||||
@ -191,7 +232,7 @@ class StackUdp(object):
|
||||
'''
|
||||
try:
|
||||
packet.parseInner()
|
||||
print "{0} received packet body\n{1}".format(self.name, packet.body.data)
|
||||
console.verbose("{0} received packet body\n{1}".format(self.name, packet.body.data))
|
||||
except raeting.PacketError as ex:
|
||||
print ex
|
||||
return None
|
||||
@ -206,8 +247,8 @@ class StackUdp(object):
|
||||
if not packet:
|
||||
return
|
||||
|
||||
print "{0} received packet data\n{1}".format(self.name, packet.data)
|
||||
print "{0} received packet index = '{1}'".format(self.name, packet.index)
|
||||
console.verbose("{0} received packet data\n{1}".format(self.name, packet.data))
|
||||
console.verbose("{0} received packet index = '{1}'".format(self.name, packet.index))
|
||||
|
||||
trans = self.transactions.get(packet.index, None)
|
||||
if trans:
|
||||
|
@ -33,12 +33,14 @@ def test():
|
||||
|
||||
#master stack
|
||||
device = devicing.LocalDevice( did=1,
|
||||
name='master',
|
||||
signkey=masterSignKeyHex,
|
||||
prikey=masterPriKeyHex,)
|
||||
stack0 = stacking.StackUdp(device=device)
|
||||
|
||||
#minion stack
|
||||
device = devicing.LocalDevice( did=0,
|
||||
name='minion1',
|
||||
ha=("", raeting.RAET_TEST_PORT),
|
||||
signkey=minionSignKeyHex,
|
||||
prikey=minionPriKeyHex,)
|
||||
@ -67,12 +69,14 @@ def test():
|
||||
|
||||
print "{0} did={1}".format(stack0.name, stack0.device.did)
|
||||
print "{0} devices=\n{1}".format(stack0.name, stack0.devices)
|
||||
print "{0} dids=\n{1}".format(stack0.name, stack0.dids)
|
||||
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
|
||||
for device in stack0.devices.values():
|
||||
print "Remote Device {0} joined= {1}".format(device.did, device.joined)
|
||||
|
||||
print "{0} did={1}".format(stack1.name, stack1.device.did)
|
||||
print "{0} devices=\n{1}".format(stack1.name, stack1.devices)
|
||||
print "{0} dids=\n{1}".format(stack1.name, stack1.dids)
|
||||
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
|
||||
for device in stack1.devices.values():
|
||||
print "Remote Device {0} joined= {1}".format(device.did, device.joined)
|
||||
@ -115,12 +119,14 @@ def test():
|
||||
|
||||
print "{0} did={1}".format(stack0.name, stack0.device.did)
|
||||
print "{0} devices=\n{1}".format(stack0.name, stack0.devices)
|
||||
print "{0} dids=\n{1}".format(stack0.name, stack0.dids)
|
||||
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
|
||||
for device in stack0.devices.values():
|
||||
print "Remote Device {0} allowed= {1}".format(device.did, device.allowed)
|
||||
|
||||
print "{0} did={1}".format(stack1.name, stack1.device.did)
|
||||
print "{0} devices=\n{1}".format(stack1.name, stack1.devices)
|
||||
print "{0} dids=\n{1}".format(stack1.name, stack1.dids)
|
||||
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
|
||||
for device in stack1.devices.values():
|
||||
print "Remote Device {0} allowed= {1}".format(device.did, device.allowed)
|
||||
@ -148,13 +154,15 @@ def test():
|
||||
|
||||
print "{0} did={1}".format(stack0.name, stack0.device.did)
|
||||
print "{0} devices=\n{1}".format(stack0.name, stack0.devices)
|
||||
print "{0} dids=\n{1}".format(stack0.name, stack0.dids)
|
||||
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
|
||||
print "{0} Received Messages =\n{1}".format(stack0.name, stack0.udpRxMsgs)
|
||||
print "{0} Received Messages =\n{1}".format(stack0.name, stack0.rxMsgs)
|
||||
|
||||
print "{0} did={1}".format(stack1.name, stack1.device.did)
|
||||
print "{0} devices=\n{1}".format(stack1.name, stack1.devices)
|
||||
print "{0} dids=\n{1}".format(stack1.name, stack1.dids)
|
||||
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
|
||||
print "{0} Received Messages =\n{1}".format(stack1.name, stack1.udpRxMsgs)
|
||||
print "{0} Received Messages =\n{1}".format(stack1.name, stack1.rxMsgs)
|
||||
|
||||
print "\n********* Message Transaction Master to Minion **********"
|
||||
body = odict(what="This is a message to the minion. Get to Work", extra="Fix the fence.")
|
||||
@ -178,25 +186,27 @@ def test():
|
||||
|
||||
print "{0} did={1}".format(stack0.name, stack0.device.did)
|
||||
print "{0} devices=\n{1}".format(stack0.name, stack0.devices)
|
||||
print "{0} dids=\n{1}".format(stack0.name, stack0.dids)
|
||||
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
|
||||
print "{0} Received Messages =\n{1}".format(stack0.name, stack0.udpRxMsgs)
|
||||
print "{0} Received Messages =\n{1}".format(stack0.name, stack0.rxMsgs)
|
||||
|
||||
print "{0} did={1}".format(stack1.name, stack1.device.did)
|
||||
print "{0} devices=\n{1}".format(stack1.name, stack1.devices)
|
||||
print "{0} dids=\n{1}".format(stack1.name, stack1.dids)
|
||||
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
|
||||
print "{0} Received Messages =\n{1}".format(stack1.name, stack1.udpRxMsgs)
|
||||
print "{0} Received Messages =\n{1}".format(stack1.name, stack1.rxMsgs)
|
||||
|
||||
print "\n********* Message Transaction Minion to Master **********"
|
||||
print "\n********* Message Transactions Both Ways **********"
|
||||
|
||||
stack1.udpTxMsgs.append((odict(house="Mama mia1", queue="fix me"), None))
|
||||
stack1.udpTxMsgs.append((odict(house="Mama mia2", queue="help me"), None))
|
||||
stack1.udpTxMsgs.append((odict(house="Mama mia3", queue="stop me"), None))
|
||||
stack1.udpTxMsgs.append((odict(house="Mama mia4", queue="run me"), None))
|
||||
stack1.txMsgs.append((odict(house="Mama mia1", queue="fix me"), None))
|
||||
stack1.txMsgs.append((odict(house="Mama mia2", queue="help me"), None))
|
||||
stack1.txMsgs.append((odict(house="Mama mia3", queue="stop me"), None))
|
||||
stack1.txMsgs.append((odict(house="Mama mia4", queue="run me"), None))
|
||||
|
||||
stack0.udpTxMsgs.append((odict(house="Papa pia1", queue="fix me"), None))
|
||||
stack0.udpTxMsgs.append((odict(house="Papa pia2", queue="help me"), None))
|
||||
stack0.udpTxMsgs.append((odict(house="Papa pia3", queue="stop me"), None))
|
||||
stack0.udpTxMsgs.append((odict(house="Papa pia4", queue="run me"), None))
|
||||
stack0.txMsgs.append((odict(house="Papa pia1", queue="fix me"), None))
|
||||
stack0.txMsgs.append((odict(house="Papa pia2", queue="help me"), None))
|
||||
stack0.txMsgs.append((odict(house="Papa pia3", queue="stop me"), None))
|
||||
stack0.txMsgs.append((odict(house="Papa pia4", queue="run me"), None))
|
||||
|
||||
#segmented packets
|
||||
stuff = []
|
||||
@ -204,8 +214,8 @@ def test():
|
||||
stuff.append(str(i).rjust(4, " "))
|
||||
stuff = "".join(stuff)
|
||||
|
||||
stack1.udpTxMsgs.append((odict(house="Mama mia1", queue="big stuff", stuff=stuff), None))
|
||||
stack0.udpTxMsgs.append((odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None))
|
||||
stack1.txMsgs.append((odict(house="Mama mia1", queue="big stuff", stuff=stuff), None))
|
||||
stack0.txMsgs.append((odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None))
|
||||
|
||||
stack1.serviceUdpTxMsg()
|
||||
stack0.serviceUdpTxMsg()
|
||||
@ -234,14 +244,14 @@ def test():
|
||||
print "{0} devices=\n{1}".format(stack0.name, stack0.devices)
|
||||
print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
|
||||
print "{0} Received Messages".format(stack0.name)
|
||||
for msg in stack0.udpRxMsgs:
|
||||
for msg in stack0.rxMsgs:
|
||||
print msg
|
||||
print
|
||||
print "{0} did={1}".format(stack1.name, stack1.device.did)
|
||||
print "{0} devices=\n{1}".format(stack1.name, stack1.devices)
|
||||
print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
|
||||
print "{0} Received Messages".format(stack1.name)
|
||||
for msg in stack1.udpRxMsgs:
|
||||
for msg in stack1.rxMsgs:
|
||||
print msg
|
||||
|
||||
|
||||
|
@ -47,7 +47,7 @@ class Transaction(object):
|
||||
if timeout is None:
|
||||
timeout = self.Timeout
|
||||
self.timeout = timeout
|
||||
self.timer = aiding.Timer(duration=self.timeout)
|
||||
self.timer = aiding.StoreTimer(self.stack.store, duration=self.timeout)
|
||||
if start: #enables synchronized starts not just current time
|
||||
self.timer.restart(start=start)
|
||||
|
||||
@ -209,7 +209,8 @@ class Joiner(Initiator):
|
||||
emsg = "Invalid remote destination device id '{0}'".format(self.rdid)
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
body = odict([('verhex', self.stack.device.signer.verhex),
|
||||
body = odict([('name', self.stack.device.name),
|
||||
('verhex', self.stack.device.signer.verhex),
|
||||
('pubhex', self.stack.device.priver.pubhex)])
|
||||
packet = packeting.TxPacket(stack=self.stack,
|
||||
kind=raeting.pcktKinds.request,
|
||||
@ -253,6 +254,11 @@ class Joiner(Initiator):
|
||||
emsg = "Missing remote device id in accept packet"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
name = body.get('name')
|
||||
if not name:
|
||||
emsg = "Missing remote name in accept packet"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
verhex = body.get('verhex')
|
||||
if not verhex:
|
||||
emsg = "Missing remote verifier key in accept packet"
|
||||
@ -270,7 +276,9 @@ class Joiner(Initiator):
|
||||
remote.verfer = nacling.Verifier(key=verhex)
|
||||
remote.pubber = nacling.Publican(key=pubhex)
|
||||
if remote.did != rdid: #move remote device to new index
|
||||
self.stack.moveRemoteDevice(remote.did, rdid)
|
||||
self.stack.moveRemoteDevice(old=remote.did, new=rdid)
|
||||
if remote.name != name: # rename remote device to new name
|
||||
self.stack.renameRemoteDevice(old=remote.name, new=name)
|
||||
remote.joined = True #accepted
|
||||
remote.nextSid()
|
||||
self.remove(index)
|
||||
@ -318,14 +326,10 @@ class Joinent(Correspondent):
|
||||
|
||||
# need to add search for existing device with same host,port address
|
||||
|
||||
remote = devicing.RemoteDevice(stack=self.stack,
|
||||
host=data['sh'],
|
||||
port=data['sp'],
|
||||
rsid=self.sid,
|
||||
rtid=self.tid, )
|
||||
self.stack.addRemoteDevice(remote) #provisionally add .accepted is None
|
||||
|
||||
self.rdid = remote.did
|
||||
name = body.get('name')
|
||||
if not name:
|
||||
emsg = "Missing remote name in join packet"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
verhex = body.get('verhex')
|
||||
if not verhex:
|
||||
@ -337,8 +341,22 @@ class Joinent(Correspondent):
|
||||
emsg = "Missing remote crypt key in join packet"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
remote.verfer = nacling.Verifier(key=verhex)
|
||||
remote.pubber = nacling.Publican(key=pubhex)
|
||||
if name in self.stack.dids:
|
||||
emsg = "Another device with name already exists"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
remote = devicing.RemoteDevice( stack=self.stack,
|
||||
name=name,
|
||||
host=data['sh'],
|
||||
port=data['sp'],
|
||||
verkey=verhex,
|
||||
pubkey=pubhex,
|
||||
rsid=self.sid,
|
||||
rtid=self.tid, )
|
||||
self.stack.addRemoteDevice(remote) #provisionally add .accepted is None
|
||||
self.rdid = remote.did # auto generated at instance creation above
|
||||
#remote.verfer = nacling.Verifier(key=verhex)
|
||||
#remote.pubber = nacling.Publican(key=pubhex)
|
||||
|
||||
self.ackJoin()
|
||||
|
||||
@ -379,6 +397,7 @@ class Joinent(Correspondent):
|
||||
|
||||
body = odict([ ('ldid', self.rdid),
|
||||
('rdid', self.stack.device.did),
|
||||
('name', self.stack.device.name),
|
||||
('verhex', self.stack.device.signer.verhex),
|
||||
('pubhex', self.stack.device.priver.pubhex)])
|
||||
packet = packeting.TxPacket(stack=self.stack,
|
||||
@ -908,8 +927,8 @@ class Messengent(Correspondent):
|
||||
'''
|
||||
Process message packet
|
||||
'''
|
||||
print "segment count =", self.rxPacket.data['sc']
|
||||
print "tid", self.tid
|
||||
console.verbose("segment count = {0} tid={1}".format(
|
||||
self.rxPacket.data['sc'], self.tid))
|
||||
if self.rxPacket.segmentive:
|
||||
if not self.segmentage:
|
||||
self.segmentage = packeting.RxPacket(stack=self.stack,
|
||||
@ -926,7 +945,7 @@ class Messengent(Correspondent):
|
||||
return
|
||||
body = self.rxPacket.body.data
|
||||
|
||||
self.stack.udpRxMsgs.append(body)
|
||||
self.stack.rxMsgs.append(body)
|
||||
self.ackMessage()
|
||||
|
||||
def ackMessage(self):
|
||||
|
Loading…
Reference in New Issue
Block a user