mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
Basic support in packeting for segmentation and desegmentation
Now need to add to message transaction
This commit is contained in:
parent
69ebab1dac
commit
fb6a01c22d
@ -65,8 +65,9 @@ class TxHead(Head):
|
||||
'''
|
||||
self.packed = ''
|
||||
data = self.packet.data # for speed
|
||||
data['bl'] = self.packet.body.size
|
||||
data['cl'] = self.packet.coat.size
|
||||
if not self.packet.segmentive: #only set these if packet is not a segment
|
||||
data['bl'] = self.packet.body.size
|
||||
data['cl'] = self.packet.coat.size
|
||||
data['fl'] = self.packet.foot.size
|
||||
|
||||
data['fg'] = "{:02x}".format(self.packFlags())
|
||||
@ -366,18 +367,20 @@ class Packet(object):
|
||||
'''
|
||||
RAET protocol packet object
|
||||
'''
|
||||
def __init__(self, stack=None, kind=None):
|
||||
def __init__(self, stack=None, data=None, kind=None):
|
||||
''' Setup Packet instance. Meta data for a packet. '''
|
||||
self.stack = stack
|
||||
self.packed = '' # packed string
|
||||
self.segments = None # subpackets when segmented
|
||||
self.data = odict(raeting.PACKET_DEFAULTS)
|
||||
if data:
|
||||
self.data.update(data)
|
||||
if kind:
|
||||
if kind not in raeting.PCKT_KIND_NAMES:
|
||||
self.data['pk'] = raeting.pcktKinds.unknown
|
||||
emsg = "Unrecognizible packet kind."
|
||||
raise raeting.PacketError(emsg)
|
||||
self.data.update(pk=kind)
|
||||
self.segments = None
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
@ -386,6 +389,20 @@ class Packet(object):
|
||||
'''
|
||||
return len(self.packed)
|
||||
|
||||
@property
|
||||
def segmented(self):
|
||||
'''
|
||||
Property is True if packet has at least one entry in .segments
|
||||
'''
|
||||
return (True if self.segments else False)
|
||||
|
||||
@property
|
||||
def segmentive(self):
|
||||
'''
|
||||
Property is True if packet segment count is > 1
|
||||
'''
|
||||
return (True if self.data.get('sc', 1) > 1 else False)
|
||||
|
||||
def refresh(self, data=None):
|
||||
'''
|
||||
Refresh .data to defaults and update if data
|
||||
@ -399,7 +416,7 @@ class TxPacket(Packet):
|
||||
'''
|
||||
RAET Protocol Transmit Packet object
|
||||
'''
|
||||
def __init__(self, embody=None, data=None, **kwa):
|
||||
def __init__(self, embody=None, **kwa):
|
||||
'''
|
||||
Setup TxPacket instance
|
||||
'''
|
||||
@ -408,8 +425,6 @@ class TxPacket(Packet):
|
||||
self.body = TxBody(packet=self, data=embody)
|
||||
self.coat = TxCoat(packet=self)
|
||||
self.foot = TxFoot(packet=self)
|
||||
if data:
|
||||
self.data.update(data)
|
||||
|
||||
@property
|
||||
def index(self):
|
||||
@ -483,7 +498,6 @@ class TxPacket(Packet):
|
||||
haul = full[i * haulsize: (i+1) * haulsize]
|
||||
|
||||
segment = TxPacket( stack=self.stack,
|
||||
kind=raeting.pcktKinds.message,
|
||||
data=self.data)
|
||||
segment.data.update(sn=i, sc=segcount, )
|
||||
segment.body.packed = haul
|
||||
@ -506,6 +520,7 @@ class RxPacket(Packet):
|
||||
self.body = RxBody(packet=self)
|
||||
self.coat = RxCoat(packet=self)
|
||||
self.foot = RxFoot(packet=self)
|
||||
self.segments = odict() # desegmentize assumes odict()
|
||||
self.packed = packed or ''
|
||||
|
||||
@property
|
||||
@ -599,6 +614,14 @@ class RxPacket(Packet):
|
||||
self.unpack()
|
||||
self.foot.parse()
|
||||
|
||||
def parseSegment(self, packet):
|
||||
'''
|
||||
If packet is segmentive then adds to .segments at index of segment number
|
||||
Assumes packet parseOuter has already happened
|
||||
'''
|
||||
if packet.segmentive:
|
||||
self.segments[packet.data['sn']] = packet
|
||||
|
||||
def parseInner(self):
|
||||
'''
|
||||
Assumes the head as already been parsed so self.data is valid
|
||||
@ -613,8 +636,31 @@ class RxPacket(Packet):
|
||||
self.coat.parse()
|
||||
self.body.parse()
|
||||
|
||||
def unsegmentize(self):
|
||||
def desegmentable(self):
|
||||
'''
|
||||
Create packeted segments from existing packet
|
||||
Return True of .segments is complete
|
||||
'''
|
||||
pass
|
||||
if not self.segmentive or not self.segmented:
|
||||
return False
|
||||
|
||||
sc = self.data['sc']
|
||||
for i in range(0, sc):
|
||||
if i not in self.segments:
|
||||
return False
|
||||
if not self.segments[i]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def desegmentize(self):
|
||||
'''
|
||||
Reassemble packet from segments
|
||||
'''
|
||||
hauls = []
|
||||
for segment in self.segments.values():
|
||||
hauls.append(segment.body.packed)
|
||||
packed = "".join(hauls)
|
||||
if self.data['ck'] != raeting.coatKinds.nada:
|
||||
self.coat.packed = packed
|
||||
else:
|
||||
self.body.packed = packed
|
||||
self.parseInner()
|
||||
|
@ -7,8 +7,8 @@ Tests to try out packeting. Potentially ephemeral
|
||||
|
||||
from ioflo.base.odicting import odict
|
||||
|
||||
from salt.transport.road.raet import raeting
|
||||
from salt.transport.road.raet import packeting
|
||||
from salt.transport.road.raet import (raeting, nacling, packeting,
|
||||
devicing, transacting, stacking)
|
||||
|
||||
|
||||
|
||||
@ -28,13 +28,76 @@ def test():
|
||||
|
||||
data.update(bk=raeting.bodyKinds.raw)
|
||||
packet1 = packeting.TxPacket(embody=stuff, data=data, )
|
||||
#print packet1.body.data
|
||||
packet1.pack()
|
||||
print packet1.packed
|
||||
|
||||
if packet1.segments:
|
||||
rejoin = []
|
||||
if packet1.segmented:
|
||||
for index, segment in packet1.segments.items():
|
||||
print index, segment.packed
|
||||
rejoin.append(segment.body.packed)
|
||||
|
||||
rejoin = "".join(rejoin)
|
||||
print stuff == rejoin
|
||||
|
||||
|
||||
signer = nacling.Signer()
|
||||
masterSignKeyHex = signer.keyhex
|
||||
privateer = nacling.Privateer()
|
||||
masterPriKeyHex = privateer.keyhex
|
||||
|
||||
#master stack
|
||||
device = devicing.LocalDevice( did=1,
|
||||
signkey=masterSignKeyHex,
|
||||
prikey=masterPriKeyHex,)
|
||||
stack0 = stacking.StackUdp(device=device)
|
||||
|
||||
data.update(fk=raeting.footKinds.nacl)
|
||||
packet1 = packeting.TxPacket(stack=stack0, embody=stuff, data=data, )
|
||||
packet1.pack()
|
||||
print packet1.packed
|
||||
|
||||
rejoin = []
|
||||
if packet1.segmented:
|
||||
for index, segment in packet1.segments.items():
|
||||
print index, segment.packed
|
||||
rejoin.append(segment.body.packed)
|
||||
|
||||
rejoin = "".join(rejoin)
|
||||
print stuff == rejoin
|
||||
|
||||
segmentage = None
|
||||
if packet1.segmented:
|
||||
for segment in packet1.segments.values():
|
||||
if segment.segmentive:
|
||||
if not segmentage:
|
||||
segmentage = packeting.RxPacket(stack=segment.stack,
|
||||
data=segment.data)
|
||||
segmentage.parseSegment(segment)
|
||||
if segmentage.desegmentable():
|
||||
segmentage.desegmentize()
|
||||
break
|
||||
|
||||
if segmentage:
|
||||
print segmentage.body.packed
|
||||
print segmentage.body.data
|
||||
|
||||
|
||||
#minion stack
|
||||
signer = nacling.Signer()
|
||||
minionSignKeyHex = signer.keyhex
|
||||
privateer = nacling.Privateer()
|
||||
minionPriKeyHex = privateer.keyhex
|
||||
|
||||
#minion stack
|
||||
device = devicing.LocalDevice( did=0,
|
||||
ha=("", raeting.RAET_TEST_PORT),
|
||||
signkey=minionSignKeyHex,
|
||||
prikey=minionPriKeyHex,)
|
||||
stack1 = stacking.StackUdp(device=device)
|
||||
|
||||
# exchange keys
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -12,6 +12,14 @@ from salt.transport.road.raet import (raeting, nacling, packeting,
|
||||
|
||||
|
||||
def test():
|
||||
'''
|
||||
initially
|
||||
master on port 7530 with did of 1
|
||||
minion on port 7531 with did of 0
|
||||
eventually
|
||||
master did of 1
|
||||
minion did of 2
|
||||
'''
|
||||
|
||||
signer = nacling.Signer()
|
||||
masterSignKeyHex = signer.keyhex
|
||||
@ -21,13 +29,7 @@ def test():
|
||||
signer = nacling.Signer()
|
||||
minionSignKeyHex = signer.keyhex
|
||||
privateer = nacling.Privateer()
|
||||
masterPriKeyHex = privateer.keyhex
|
||||
|
||||
# initially
|
||||
# master on port 7530 with did of 1
|
||||
# minion on port 7531 with did of 0
|
||||
# eventually
|
||||
# minion did of 2
|
||||
minionPriKeyHex = privateer.keyhex
|
||||
|
||||
#master stack
|
||||
device = devicing.LocalDevice( did=1,
|
||||
@ -39,7 +41,7 @@ def test():
|
||||
device = devicing.LocalDevice( did=0,
|
||||
ha=("", raeting.RAET_TEST_PORT),
|
||||
signkey=minionSignKeyHex,
|
||||
prikey=masterPriKeyHex,)
|
||||
prikey=minionPriKeyHex,)
|
||||
stack1 = stacking.StackUdp(device=device)
|
||||
|
||||
|
||||
|
@ -305,21 +305,21 @@ class Joinent(Correspondent):
|
||||
def join(self):
|
||||
'''
|
||||
Process join packet
|
||||
Perform pend operation of pending device being accepted onto channel
|
||||
Perform pend operation of pending remote device being accepted onto channel
|
||||
'''
|
||||
data = self.rxPacket.data
|
||||
body = self.rxPacket.body.data
|
||||
|
||||
# need to add search for existing device with same host,port address
|
||||
|
||||
device = devicing.RemoteDevice(stack=self.stack,
|
||||
remote = devicing.RemoteDevice(stack=self.stack,
|
||||
host=data['sh'],
|
||||
port=data['sp'],
|
||||
rsid=self.sid,
|
||||
rtid=self.tid, )
|
||||
self.stack.addRemoteDevice(device) #provisionally add .accepted is None
|
||||
self.stack.addRemoteDevice(remote) #provisionally add .accepted is None
|
||||
|
||||
self.rdid = device.did
|
||||
self.rdid = remote.did
|
||||
|
||||
verhex = body.get('verhex')
|
||||
if not verhex:
|
||||
@ -331,8 +331,8 @@ class Joinent(Correspondent):
|
||||
emsg = "Missing remote crypt key in join packet"
|
||||
raise raeting.TransactionError(emsg)
|
||||
|
||||
device.verfer = nacling.Verifier(key=verhex)
|
||||
device.pubber = nacling.Publican(key=pubhex)
|
||||
remote.verfer = nacling.Verifier(key=verhex)
|
||||
remote.pubber = nacling.Publican(key=pubhex)
|
||||
|
||||
self.ackJoin()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user