mirror of
https://github.com/valitydev/salt.git
synced 2024-11-09 01:36:48 +00:00
Add the zmq pub packet setup routines
This commit is contained in:
parent
a5196144e7
commit
4e6fad26b0
@ -90,13 +90,16 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
|
|||||||
'''
|
'''
|
||||||
Ioinits = {'opts': '.salt.opts',
|
Ioinits = {'opts': '.salt.opts',
|
||||||
'publish': '.salt.var.publish',
|
'publish': '.salt.var.publish',
|
||||||
'zmq_behavior': '.salt.etc.zmq_behavior'}
|
'zmq_behavior': '.salt.etc.zmq_behavior',
|
||||||
|
'aes': '.salt.var.zmq.aes',
|
||||||
|
'crypticle': '.salt.var.zmq.crypticle'}
|
||||||
|
|
||||||
def postinitio(self):
|
def postinitio(self):
|
||||||
'''
|
'''
|
||||||
Set up tracking value(s)
|
Set up tracking value(s)
|
||||||
'''
|
'''
|
||||||
self.created = False
|
self.created = False
|
||||||
|
self.crypticle.value = salt.crypt.Crypticle(self.opts.value, self.aes.value)
|
||||||
|
|
||||||
def action(self):
|
def action(self):
|
||||||
'''
|
'''
|
||||||
@ -125,7 +128,18 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
|
|||||||
# Don't pop the publish messages! The raet behavior still needs them
|
# Don't pop the publish messages! The raet behavior still needs them
|
||||||
try:
|
try:
|
||||||
for package in self.publish.value:
|
for package in self.publish.value:
|
||||||
payload = package['return']
|
payload = {'enc': 'aes'}
|
||||||
|
load = package['return']
|
||||||
|
payload['load'] = self.crypticle.value.dumps(package['return'])
|
||||||
|
if self.opts['sign_pub_messages']:
|
||||||
|
master_pem_path = os.path.join(self.opts.value['pki_dir'], 'master.pem')
|
||||||
|
log.debug('Signing data packet for publish')
|
||||||
|
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
|
||||||
|
int_payload = {'payload': self.serial.dumps(payload)}
|
||||||
|
|
||||||
|
if load['tgt_type'] == 'list':
|
||||||
|
int_payload['topic_lst'] = load['tgt']
|
||||||
|
send_payload = self.serial.dumps(int_payload)
|
||||||
if self.opts.value['zmq_filtering']:
|
if self.opts.value['zmq_filtering']:
|
||||||
# if you have a specific topic list, use that
|
# if you have a specific topic list, use that
|
||||||
if 'topic_lst' in package:
|
if 'topic_lst' in package:
|
||||||
@ -134,13 +148,13 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
|
|||||||
# to avoid collisions
|
# to avoid collisions
|
||||||
htopic = hashlib.sha1(topic).hexdigest()
|
htopic = hashlib.sha1(topic).hexdigest()
|
||||||
self.pub_sock.send(htopic, flags=zmq.SNDMORE)
|
self.pub_sock.send(htopic, flags=zmq.SNDMORE)
|
||||||
self.pub_sock.send(payload)
|
self.pub_sock.send(send_payload)
|
||||||
# otherwise its a broadcast
|
# otherwise its a broadcast
|
||||||
else:
|
else:
|
||||||
self.pub_sock.send('broadcast', flags=zmq.SNDMORE)
|
self.pub_sock.send('broadcast', flags=zmq.SNDMORE)
|
||||||
self.pub_sock.send(payload)
|
self.pub_sock.send(send_payload)
|
||||||
else:
|
else:
|
||||||
self.pub_sock.send(payload)
|
self.pub_sock.send(send_payload)
|
||||||
except zmq.ZMQError as exc:
|
except zmq.ZMQError as exc:
|
||||||
if exc.errno == errno.EINTR:
|
if exc.errno == errno.EINTR:
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user