Initial move of publisher into PubServerChannel

This commit is contained in:
Thomas Jackson 2015-01-19 11:16:03 -08:00
parent f128e47240
commit 8c66c4e857
3 changed files with 152 additions and 146 deletions

View File

@ -444,7 +444,9 @@ class Master(SMaster):
log.info('Creating master maintenance process')
process_manager.add_process(Maintenance, args=(self.opts,))
log.info('Creating master publisher process')
process_manager.add_process(Publisher, args=(self.opts,))
publish_channel = salt.transport.server.PubServerChannel.factory(self.opts)
publish_channel.pre_fork(process_manager)
log.info('Creating master event publisher process')
process_manager.add_process(salt.utils.event.EventPublisher, args=(self.opts,))
salt.engines.start_engines(self.opts, process_manager)
@ -519,122 +521,6 @@ class Halite(multiprocessing.Process):
halite.start(self.hopts)
class Publisher(multiprocessing.Process):
'''
The publishing interface, a simple zeromq publisher that sends out the
commands.
'''
def __init__(self, opts):
'''
Create a publisher instance
:param dict opts: The salt options
'''
super(Publisher, self).__init__()
self.opts = opts
def run(self):
'''
Bind to the interface specified in the configuration file
Override of multiprocessing.Process.run()
'''
salt.utils.appendproctitle(self.__class__.__name__)
# Set up the context
context = zmq.Context(1)
# Prepare minion publish socket
pub_sock = context.socket(zmq.PUB)
# if 2.1 >= zmq < 3.0, we only have one HWM setting
try:
pub_sock.setsockopt(zmq.HWM, self.opts.get('pub_hwm', 1000))
# in zmq >= 3.0, there are separate send and receive HWM settings
except AttributeError:
pub_sock.setsockopt(zmq.SNDHWM, self.opts.get('pub_hwm', 1000))
pub_sock.setsockopt(zmq.RCVHWM, self.opts.get('pub_hwm', 1000))
if self.opts['ipv6'] is True and hasattr(zmq, 'IPV4ONLY'):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_publish_pull', 4514)
)
else:
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
# Start the minion command publisher
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
pub_sock.bind(pub_uri)
# Securely create socket
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
old_umask = os.umask(0o177)
try:
pull_sock.bind(pull_uri)
finally:
os.umask(old_umask)
try:
while True:
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = pull_sock.recv()
unpacked_package = salt.payload.unpackage(package)
try:
payload = unpacked_package['payload']
except (KeyError,) as exc:
# somehow not packaged !?
if 'enc' in payload and 'load' in payload:
payload = package
else:
try:
log.error(
"Invalid payload: {0}".format(
pformat(unpacked_package), exc_info=True))
except Exception:
# dont fail on a format error here !
# but log something as it is hard to track down
log.error("Received invalid payload", exc_info=True)
raise exc
if self.opts['zmq_filtering']:
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
else:
pub_sock.send(payload)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
raise exc
except KeyboardInterrupt:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if pull_sock.closed is False:
pull_sock.setsockopt(zmq.LINGER, 1)
pull_sock.close()
if context.closed is False:
context.term()
class ReqServer(object):
'''
Starts up the master request server, minions send results to this
@ -2538,19 +2424,15 @@ class ClearFuncs(object):
)
log.debug('Published command details {0}'.format(load))
# TODO: push this into publisher channel
crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
int_payload = {'payload': self.serial.dumps(payload)}
# add some targeting stuff for lists only (for now)
if load['tgt_type'] == 'list':
int_payload['topic_lst'] = load['tgt']
return int_payload
publish_channel = salt.transport.server.PubServerChannel.factory(self.opts)
publish_channel.publish(load)
return {
'enc': 'clear',
'load': {
'jid': clear_load['jid'],
'minions': minions
}
}
class FloMWorker(MWorker):

View File

@ -105,34 +105,28 @@ class PubServerChannel(object):
# switch on available ttypes
if ttype == 'zeromq':
import salt.transport.zeromq
return salt.transport.zeromq.ZeroMQPubChannel(opts, **kwargs)
return salt.transport.zeromq.ZeroMQPubServerChannel(opts, **kwargs)
elif ttype == 'raet': # TODO:
import salt.transport.raet
return salt.transport.raet.RAETPubChannel(opts, **kwargs)
return salt.transport.raet.RAETPubServerChannel(opts, **kwargs)
elif ttype == 'local': # TODO:
import salt.transport.local
return salt.transport.local.LocalPubChannel(opts, **kwargs)
return salt.transport.local.LocalPubServerChannel(opts, **kwargs)
else:
raise Exception('Channels are only defined for ZeroMQ and raet')
# return NewKindOfChannel(opts, **kwargs)
def recv(self, timeout=0):
def pre_fork(self, process_manager):
'''
Get a pub job, with an optional timeout (0==forever)
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to
do the actual publishing
'''
raise NotImplementedError()
pass
def recv_noblock(self):
def publish(self, load):
'''
Get a pub job in a non-blocking manner.
Return pub or None
'''
raise NotImplementedError()
@property
def socket(self):
'''
Return a socket (or fd) which can be used for poll mechanisms
Publish "load" to minions
'''
raise NotImplementedError()

View File

@ -432,3 +432,133 @@ class ZeroMQReqServerChannel(salt.transport.server.ReqServerChannel):
Return a socket (or fd) which can be used for poll mechanisms
'''
self._socket
class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(self.opts) # TODO: in init?
def _publish_daemon(self):
'''
Bind to the interface specified in the configuration file
'''
salt.utils.appendproctitle(self.__class__.__name__)
# Set up the context
context = zmq.Context(1)
# Prepare minion publish socket
pub_sock = context.socket(zmq.PUB)
# if 2.1 >= zmq < 3.0, we only have one HWM setting
try:
pub_sock.setsockopt(zmq.HWM, self.opts.get('pub_hwm', 1000))
# in zmq >= 3.0, there are separate send and receive HWM settings
except AttributeError:
pub_sock.setsockopt(zmq.SNDHWM, self.opts.get('pub_hwm', 1000))
pub_sock.setsockopt(zmq.RCVHWM, self.opts.get('pub_hwm', 1000))
if self.opts['ipv6'] is True and hasattr(zmq, 'IPV4ONLY'):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
# Start the minion command publisher
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
pub_sock.bind(pub_uri)
# Securely create socket
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
old_umask = os.umask(0o177)
try:
pull_sock.bind(pull_uri)
finally:
os.umask(old_umask)
try:
while True:
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = pull_sock.recv()
unpacked_package = salt.payload.unpackage(package)
payload = unpacked_package['payload']
if self.opts['zmq_filtering']:
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
else:
pub_sock.send(payload)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
raise exc
except KeyboardInterrupt:
if pub_sock.closed is False:
pub_sock.setsockopt(zmq.LINGER, 1)
pub_sock.close()
if pull_sock.closed is False:
pull_sock.setsockopt(zmq.LINGER, 1)
pull_sock.close()
if context.closed is False:
context.term()
def pre_fork(self, process_manager):
'''
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to
do the actual publishing
'''
process_manager.add_process(self._publish_daemon)
def publish(self, load):
'''
Publish "load" to minions
'''
payload = {'enc': 'aes'}
crypticle = salt.crypt.Crypticle(self.opts, self.opts['aes'].value)
payload['load'] = crypticle.dumps(load)
if self.opts['sign_pub_messages']:
master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem')
log.debug("Signing data packet")
payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load'])
# Send 0MQ to the publisher
context = zmq.Context(1)
pub_sock = context.socket(zmq.PUSH)
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
int_payload = {'payload': self.serial.dumps(payload)}
# add some targeting stuff for lists only (for now)
if load['tgt_type'] == 'list':
int_payload['topic_lst'] = load['tgt']
pub_sock.send(self.serial.dumps(int_payload))
# EOF