Fix Windows salt-master support for ZeroMQ

The following changes were required:

salt/master.py:
- 'SMaster.aes' no longer exists. 'SMaster.secrets' now does. The
pickling code is adjusted accordingly.
- MWorker __init__ no longer uses 'self.serial'. 'self.req_channels'
was added. The pickling code is adjusted accordingly.
- On Windows, Service account home directories may not initially exist.
If this is the case, make sure the directory exists before continuing.

salt/minion.py:
- The 'Minion' __init__ function will not set certain fields that are
required in '_thread_return'. On Windows, '_thread_return' will
instantiate a new object that is derived from 'MinionBase', and this
object may be a 'Minion' object. Due to this, make sure that the
required fields are set before they are used.

salt/transport/zeromq.py:
- ZeroMQ for Windows doesn't support 'IPC' mode. Due to this, allow
'ipc_mode' to be set to 'tcp' and react accordingly.

General:
On Windows os.rename() will fail if the destination file exists. Due to
this, use 'salt.utils.atomicfile.atomic_rename' where appropriate.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2015-04-21 13:12:00 -05:00
parent ac582030e6
commit af65bba00b
5 changed files with 74 additions and 27 deletions

View File

@ -709,7 +709,8 @@ class RemoteFuncs(object):
{'grains': load['grains'],
'pillar': data})
)
os.rename(tmpfname, datap)
# On Windows, os.rename will fail if the destination file exists.
salt.utils.atomicfile.atomic_rename(tmpfname, datap)
return data
def _minion_event(self, load):

View File

@ -100,8 +100,8 @@ class SMaster(object):
self.master_key = salt.crypt.MasterKeys(self.opts)
self.key = self.__prep_key()
# We need __setstate__ and __getstate__ to also pickle 'SMaster.aes'.
# Otherwise, 'SMaster.aes' won't be copied over to the spawned process
# We need __setstate__ and __getstate__ to also pickle 'SMaster.secrets'.
# Otherwise, 'SMaster.secrets' won't be copied over to the spawned process
# on Windows since spawning processes on Windows requires pickling.
# These methods are only used when pickling so will not be used on
# non-Windows platforms.
@ -109,13 +109,13 @@ class SMaster(object):
self.opts = state['opts']
self.master_key = state['master_key']
self.key = state['key']
SMaster.aes = state['aes']
SMaster.secrets = state['secrets']
def __getstate__(self):
return {'opts': self.opts,
'master_key': self.master_key,
'key': self.key,
'aes': SMaster.aes}
'secrets': SMaster.secrets}
def __prep_key(self):
'''
@ -387,6 +387,11 @@ class Master(SMaster):
else:
home = os.path.expanduser('~' + self.opts['user'])
try:
if salt.utils.is_windows() and not os.path.isdir(home):
# On Windows, Service account home directories may not
# initially exist. If this is the case, make sure the
# directory exists before continuing.
os.mkdir(home, 0o755)
os.chdir(home)
except OSError as err:
errors.append(
@ -646,27 +651,27 @@ class MWorker(multiprocessing.Process):
self.key = key
self.k_mtime = 0
# We need __setstate__ and __getstate__ to also pickle 'SMaster.aes'.
# Otherwise, 'SMaster.aes' won't be copied over to the spawned process
# We need __setstate__ and __getstate__ to also pickle 'SMaster.secrets'.
# Otherwise, 'SMaster.secrets' won't be copied over to the spawned process
# on Windows since spawning processes on Windows requires pickling.
# These methods are only used when pickling so will not be used on
# non-Windows platforms.
def __setstate__(self, state):
multiprocessing.Process.__init__(self)
self.opts = state['opts']
self.serial = state['serial']
self.req_channels = state['req_channels']
self.mkey = state['mkey']
self.key = state['key']
self.k_mtime = state['k_mtime']
SMaster.aes = state['aes']
SMaster.secrets = state['secrets']
def __getstate__(self):
return {'opts': self.opts,
'serial': self.serial,
'req_channels': self.req_channels,
'mkey': self.mkey,
'key': self.key,
'k_mtime': self.k_mtime,
'aes': SMaster.aes}
'secrets': SMaster.secrets}
def __bind(self):
'''
@ -1132,7 +1137,8 @@ class AESFuncs(object):
{'grains': load['grains'],
'pillar': data})
)
os.rename(tmpfname, datap)
# On Windows, os.rename will fail if the destination file exists.
salt.utils.atomicfile.atomic_rename(tmpfname, datap)
return data
def _minion_event(self, load):

View File

@ -938,6 +938,21 @@ class Minion(MinionBase):
# multiprocessing communication.
if not minion_instance:
minion_instance = cls(opts)
if not hasattr(minion_instance, 'functions'):
functions, returners, function_errors = (
minion_instance._load_modules()
)
minion_instance.functions = functions
minion_instance.returners = returners
minion_instance.function_errors = function_errors
if not hasattr(minion_instance, 'serial'):
minion_instance.serial = salt.payload.Serial(opts)
if not hasattr(minion_instance, 'proc_dir'):
uid = salt.utils.get_uid(user=opts.get('user', None))
minion_instance.proc_dir = (
get_proc_dir(opts['cachedir'], uid=uid)
)
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing']:
salt.utils.daemonize_if(opts)

View File

@ -292,9 +292,15 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self.clients.setsockopt(zmq.IPV4ONLY, 0)
self.workers = self.context.socket(zmq.DEALER)
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
)
if self.opts.get('ipc_mode', '') == 'tcp':
self.w_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_workers', 4515)
)
else:
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
)
log.info('Setting up the master communication server')
self.clients.bind(self.uri)
@ -331,9 +337,14 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self.context = zmq.Context(1)
self._socket = self.context.socket(zmq.REP)
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
)
if self.opts.get('ipc_mode', '') == 'tcp':
self.w_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_workers', 4515)
)
else:
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
)
log.info('Worker binding to socket {0}'.format(self.w_uri))
self._socket.connect(self.w_uri)
@ -423,9 +434,15 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
pub_uri = 'tcp://{interface}:{publish_port}'.format(**self.opts)
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_publish_pull', 4514)
)
else:
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
# Start the minion command publisher
@ -502,9 +519,14 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
# Send 0MQ to the publisher
context = zmq.Context(1)
pub_sock = context.socket(zmq.PUSH)
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
if self.opts.get('ipc_mode', '') == 'tcp':
pull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_publish_pull', 4514)
)
else:
pull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
int_payload = {'payload': self.serial.dumps(payload)}

View File

@ -21,6 +21,7 @@ import salt.log
import salt.client
import salt.pillar
import salt.utils
import salt.utils.atomicfile
import salt.utils.minions
import salt.payload
from salt.exceptions import SaltException
@ -383,13 +384,13 @@ class MasterPillarUtil(object):
os.close(tmpfh)
with salt.utils.fopen(tmpfname, 'w+b') as fp_:
fp_.write(self.serial.dumps({'grains': minion_grains}))
os.rename(tmpfname, data_file)
salt.utils.atomicfile.atomic_rename(tmpfname, data_file)
elif clear_grains and minion_pillar:
tmpfh, tmpfname = tempfile.mkstemp(dir=cdir)
os.close(tmpfh)
with salt.utils.fopen(tmpfname, 'w+b') as fp_:
fp_.write(self.serial.dumps({'pillar': minion_pillar}))
os.rename(tmpfname, data_file)
salt.utils.atomicfile.atomic_rename(tmpfname, data_file)
if clear_mine:
# Delete the whole mine file
os.remove(os.path.join(mine_file))
@ -403,7 +404,9 @@ class MasterPillarUtil(object):
os.close(tmpfh)
with salt.utils.fopen(tmpfname, 'w+b') as fp_:
fp_.write(self.serial.dumps(mine_data))
os.rename(tmpfname, mine_file)
salt.utils.atomicfile.atomic_rename(
tmpfname,
mine_file)
except (OSError, IOError):
return True
return True