ZMQ topics

Salt targeting is done using the zmq pub/sub mechanism. No filtering is done so all requests go to all minions. This is an attempt to make the requests more target-able. ZeroMQ supports filters on these sockets which are processed publisher side. So the thought is to set it to the minion id and a shared "broadcast" one. Only requests of tgt_type == list will use this. The thought is if we like this we can use other data (mine etc) to determine a more specific target on the master (such as a list) from a glob match etc.

Syndication masters do not have zmq filters applied, but will send out their syndicated requests with a filter as well. So all targeted messages aren't gauranteed to go to only the minion, its just best effort
This commit is contained in:
Thomas Jackson 2014-02-21 12:25:20 -08:00
parent 655da0bd5c
commit 83a725ffb0
3 changed files with 56 additions and 5 deletions

View File

@ -27,6 +27,11 @@ class SaltMasterError(SaltException):
Problem reading the master root key
'''
class SaltSyndicMasterError(SaltException):
'''
Problem while proxying a request in the syndication master
'''
class MasterExit(SystemExit):
'''

View File

@ -411,7 +411,21 @@ class Publisher(multiprocessing.Process):
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = pull_sock.recv()
pub_sock.send(package)
unpacked_package = salt.payload.unpackage(package)
payload = unpacked_package['payload']
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
# otherwise its a broadcast
else:
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
@ -2638,7 +2652,14 @@ class ClearFuncs(object):
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
pub_sock.send(self.serial.dumps(payload))
int_payload = {'payload': self.serial.dumps(payload)}
# add some targeting stuff for lists only (for now)
if load['tgt_type'] == 'list':
int_payload['topic_lst'] = load['tgt']
pub_sock.send(self.serial.dumps(int_payload))
return {
'enc': 'clear',
'load': {

View File

@ -607,6 +607,10 @@ class Minion(MinionBase):
self.grains_cache = self.opts['grains']
# store your hexid to subscribe to zmq, hash since zmq filters are prefix
# matches this way we can avoid collisions
self.hexid = hashlib.sha1(self.opts['id']).hexdigest()
if 'proxy' in self.opts['pillar']:
log.debug('I am {0} and I need to start some proxies for {0}'.format(self.opts['id'],
self.opts['pillar']['proxy']))
@ -1129,7 +1133,8 @@ class Minion(MinionBase):
)
def _setsockopts(self):
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.SUBSCRIBE, 'broadcast')
self.socket.setsockopt(zmq.SUBSCRIBE, self.hexid)
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
self._set_ipv4only()
self._set_reconnect_ivl_max()
@ -1381,7 +1386,12 @@ class Minion(MinionBase):
def _do_socket_recv(self, socks):
if socks.get(self.socket) == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
# topic filtering is done at the zmq level, so we just strip it
recv_str = self.socket.recv(zmq.NOBLOCK)
# if you have a header, then you have another one coming down the pipe
if recv_str in (self.hexid, 'broadcast'):
recv_str = self.socket.recv(zmq.NOBLOCK)
payload = self.serial.loads(recv_str)
log.trace('Handling payload')
self._handle_payload(payload)
@ -1500,6 +1510,8 @@ class Syndic(Minion):
# Share the poller with the event object
self.poller = self.local.event.poller
self.socket = self.context.socket(zmq.SUB)
# no filters for syndication masters, unless we want to maintain a
# list of all connected minions and update the filter
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
if hasattr(zmq, 'RECONNECT_IVL_MAX'):
@ -1577,7 +1589,20 @@ class Syndic(Minion):
def _process_cmd_socket(self):
try:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
messages = self.socket.recv_multipart(zmq.NOBLOCK)
messages_len = len(messages)
idx = None
if messages_len == 1:
idx = 0
elif messages_len == 2:
idx = 1
else:
raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len))
payload = self.serial.loads(messages[idx])
except zmq.ZMQError as e:
# Swallow errors for bad wakeups or signals needing processing
if e.errno != errno.EAGAIN and e.errno != errno.EINTR: