From af65bba00b72b38218a5a8409d35f7ae526a5937 Mon Sep 17 00:00:00 2001 From: Sergey Kizunov Date: Tue, 21 Apr 2015 13:12:00 -0500 Subject: [PATCH] Fix Windows salt-master support for ZeroMQ The following changes were required: salt/master.py: - 'SMaster.aes' no longer exists. 'SMaster.secrets' now does. The pickling code is adjusted accordingly. - MWorker __init__ no longer uses 'self.serial'. 'self.req_channels' was added. The pickling code is adjusted accordingly. - On Windows, Service account home directories may not initially exist. If this is the case, make sure the directory exists before continuing. salt/minion.py: - The 'Minion' __init__ function will not set certain fields that are required in '_thread_return'. On Windows, '_thread_return' will instantiate a new object that is derived from 'MinionBase', and this object may be a 'Minion' object. Due to this, make sure that the required fields are set before they are used. salt/transport/zeromq.py: - ZeroMQ for Windows doesn't support 'IPC' mode. Due to this, allow 'ipc_mode' to be set to 'tcp' and react accordingly. General: On Windows os.rename() will fail if the destination file exists. Due to this, use 'salt.utils.atomicfile.atomic_rename' where appropriate. Signed-off-by: Sergey Kizunov --- salt/daemons/masterapi.py | 3 ++- salt/master.py | 28 ++++++++++++++---------- salt/minion.py | 15 +++++++++++++ salt/transport/zeromq.py | 46 +++++++++++++++++++++++++++++---------- salt/utils/master.py | 9 +++++--- 5 files changed, 74 insertions(+), 27 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 5127c920d7..3ab8ae1b64 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -709,7 +709,8 @@ class RemoteFuncs(object): {'grains': load['grains'], 'pillar': data}) ) - os.rename(tmpfname, datap) + # On Windows, os.rename will fail if the destination file exists. + salt.utils.atomicfile.atomic_rename(tmpfname, datap) return data def _minion_event(self, load): diff --git a/salt/master.py b/salt/master.py index fcaf814eef..8274f2ab9b 100644 --- a/salt/master.py +++ b/salt/master.py @@ -100,8 +100,8 @@ class SMaster(object): self.master_key = salt.crypt.MasterKeys(self.opts) self.key = self.__prep_key() - # We need __setstate__ and __getstate__ to also pickle 'SMaster.aes'. - # Otherwise, 'SMaster.aes' won't be copied over to the spawned process + # We need __setstate__ and __getstate__ to also pickle 'SMaster.secrets'. + # Otherwise, 'SMaster.secrets' won't be copied over to the spawned process # on Windows since spawning processes on Windows requires pickling. # These methods are only used when pickling so will not be used on # non-Windows platforms. @@ -109,13 +109,13 @@ class SMaster(object): self.opts = state['opts'] self.master_key = state['master_key'] self.key = state['key'] - SMaster.aes = state['aes'] + SMaster.secrets = state['secrets'] def __getstate__(self): return {'opts': self.opts, 'master_key': self.master_key, 'key': self.key, - 'aes': SMaster.aes} + 'secrets': SMaster.secrets} def __prep_key(self): ''' @@ -387,6 +387,11 @@ class Master(SMaster): else: home = os.path.expanduser('~' + self.opts['user']) try: + if salt.utils.is_windows() and not os.path.isdir(home): + # On Windows, Service account home directories may not + # initially exist. If this is the case, make sure the + # directory exists before continuing. + os.mkdir(home, 0o755) os.chdir(home) except OSError as err: errors.append( @@ -646,27 +651,27 @@ class MWorker(multiprocessing.Process): self.key = key self.k_mtime = 0 - # We need __setstate__ and __getstate__ to also pickle 'SMaster.aes'. - # Otherwise, 'SMaster.aes' won't be copied over to the spawned process + # We need __setstate__ and __getstate__ to also pickle 'SMaster.secrets'. + # Otherwise, 'SMaster.secrets' won't be copied over to the spawned process # on Windows since spawning processes on Windows requires pickling. # These methods are only used when pickling so will not be used on # non-Windows platforms. def __setstate__(self, state): multiprocessing.Process.__init__(self) self.opts = state['opts'] - self.serial = state['serial'] + self.req_channels = state['req_channels'] self.mkey = state['mkey'] self.key = state['key'] self.k_mtime = state['k_mtime'] - SMaster.aes = state['aes'] + SMaster.secrets = state['secrets'] def __getstate__(self): return {'opts': self.opts, - 'serial': self.serial, + 'req_channels': self.req_channels, 'mkey': self.mkey, 'key': self.key, 'k_mtime': self.k_mtime, - 'aes': SMaster.aes} + 'secrets': SMaster.secrets} def __bind(self): ''' @@ -1132,7 +1137,8 @@ class AESFuncs(object): {'grains': load['grains'], 'pillar': data}) ) - os.rename(tmpfname, datap) + # On Windows, os.rename will fail if the destination file exists. + salt.utils.atomicfile.atomic_rename(tmpfname, datap) return data def _minion_event(self, load): diff --git a/salt/minion.py b/salt/minion.py index fe20577367..98e59bda54 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -938,6 +938,21 @@ class Minion(MinionBase): # multiprocessing communication. if not minion_instance: minion_instance = cls(opts) + if not hasattr(minion_instance, 'functions'): + functions, returners, function_errors = ( + minion_instance._load_modules() + ) + minion_instance.functions = functions + minion_instance.returners = returners + minion_instance.function_errors = function_errors + if not hasattr(minion_instance, 'serial'): + minion_instance.serial = salt.payload.Serial(opts) + if not hasattr(minion_instance, 'proc_dir'): + uid = salt.utils.get_uid(user=opts.get('user', None)) + minion_instance.proc_dir = ( + get_proc_dir(opts['cachedir'], uid=uid) + ) + fn_ = os.path.join(minion_instance.proc_dir, data['jid']) if opts['multiprocessing']: salt.utils.daemonize_if(opts) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index dcd94a4466..af2fa29416 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -292,9 +292,15 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt. self.clients.setsockopt(zmq.IPV4ONLY, 0) self.workers = self.context.socket(zmq.DEALER) - self.w_uri = 'ipc://{0}'.format( - os.path.join(self.opts['sock_dir'], 'workers.ipc') - ) + + if self.opts.get('ipc_mode', '') == 'tcp': + self.w_uri = 'tcp://127.0.0.1:{0}'.format( + self.opts.get('tcp_master_workers', 4515) + ) + else: + self.w_uri = 'ipc://{0}'.format( + os.path.join(self.opts['sock_dir'], 'workers.ipc') + ) log.info('Setting up the master communication server') self.clients.bind(self.uri) @@ -331,9 +337,14 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt. self.context = zmq.Context(1) self._socket = self.context.socket(zmq.REP) - self.w_uri = 'ipc://{0}'.format( - os.path.join(self.opts['sock_dir'], 'workers.ipc') - ) + if self.opts.get('ipc_mode', '') == 'tcp': + self.w_uri = 'tcp://127.0.0.1:{0}'.format( + self.opts.get('tcp_master_workers', 4515) + ) + else: + self.w_uri = 'ipc://{0}'.format( + os.path.join(self.opts['sock_dir'], 'workers.ipc') + ) log.info('Worker binding to socket {0}'.format(self.w_uri)) self._socket.connect(self.w_uri) @@ -423,9 +434,15 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts) # Prepare minion pull socket pull_sock = context.socket(zmq.PULL) - pull_uri = 'ipc://{0}'.format( - os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') - ) + + if self.opts.get('ipc_mode', '') == 'tcp': + pull_uri = 'tcp://127.0.0.1:{0}'.format( + self.opts.get('tcp_master_publish_pull', 4514) + ) + else: + pull_uri = 'ipc://{0}'.format( + os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') + ) salt.utils.zeromq.check_ipc_path_max_len(pull_uri) # Start the minion command publisher @@ -502,9 +519,14 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): # Send 0MQ to the publisher context = zmq.Context(1) pub_sock = context.socket(zmq.PUSH) - pull_uri = 'ipc://{0}'.format( - os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') - ) + if self.opts.get('ipc_mode', '') == 'tcp': + pull_uri = 'tcp://127.0.0.1:{0}'.format( + self.opts.get('tcp_master_publish_pull', 4514) + ) + else: + pull_uri = 'ipc://{0}'.format( + os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') + ) pub_sock.connect(pull_uri) int_payload = {'payload': self.serial.dumps(payload)} diff --git a/salt/utils/master.py b/salt/utils/master.py index 1666c4cb12..81262802d1 100644 --- a/salt/utils/master.py +++ b/salt/utils/master.py @@ -21,6 +21,7 @@ import salt.log import salt.client import salt.pillar import salt.utils +import salt.utils.atomicfile import salt.utils.minions import salt.payload from salt.exceptions import SaltException @@ -383,13 +384,13 @@ class MasterPillarUtil(object): os.close(tmpfh) with salt.utils.fopen(tmpfname, 'w+b') as fp_: fp_.write(self.serial.dumps({'grains': minion_grains})) - os.rename(tmpfname, data_file) + salt.utils.atomicfile.atomic_rename(tmpfname, data_file) elif clear_grains and minion_pillar: tmpfh, tmpfname = tempfile.mkstemp(dir=cdir) os.close(tmpfh) with salt.utils.fopen(tmpfname, 'w+b') as fp_: fp_.write(self.serial.dumps({'pillar': minion_pillar})) - os.rename(tmpfname, data_file) + salt.utils.atomicfile.atomic_rename(tmpfname, data_file) if clear_mine: # Delete the whole mine file os.remove(os.path.join(mine_file)) @@ -403,7 +404,9 @@ class MasterPillarUtil(object): os.close(tmpfh) with salt.utils.fopen(tmpfname, 'w+b') as fp_: fp_.write(self.serial.dumps(mine_data)) - os.rename(tmpfname, mine_file) + salt.utils.atomicfile.atomic_rename( + tmpfname, + mine_file) except (OSError, IOError): return True return True