Merge pull request #2046 from FireHost/windows_multiprocessing

Windows multiprocessing support and Esky bugfixes
This commit is contained in:
Thomas S Hatch 2012-09-20 16:49:03 -07:00
commit 25f9c5386a
10 changed files with 101 additions and 101 deletions

View File

@ -19,6 +19,7 @@ except ImportError as e:
if e.args[0] != 'No module named _msgpack': if e.args[0] != 'No module named _msgpack':
raise raise
class Master(parsers.MasterOptionParser): class Master(parsers.MasterOptionParser):
''' '''
Creates a master server Creates a master server
@ -50,6 +51,7 @@ class Master(parsers.MasterOptionParser):
sys.exit(err.errno) sys.exit(err.errno)
self.setup_logfile_logger() self.setup_logfile_logger()
log = logging.getLogger('salt.master')
# Late import so logging works correctly # Late import so logging works correctly
if not verify_socket(self.config['interface'], if not verify_socket(self.config['interface'],
@ -58,6 +60,7 @@ class Master(parsers.MasterOptionParser):
self.exit(4, 'The ports are not available to bind') self.exit(4, 'The ports are not available to bind')
import salt.master import salt.master
log.warn('Starting the Salt Master')
master = salt.master.Master(self.config) master = salt.master.Master(self.config)
self.daemonize_if_required() self.daemonize_if_required()
self.set_pidfile() self.set_pidfile()
@ -96,6 +99,7 @@ class Minion(parsers.MinionOptionParser):
sys.exit(err.errno) sys.exit(err.errno)
self.setup_logfile_logger() self.setup_logfile_logger()
log = logging.getLogger('salt.minion')
# Late import so logging works correctly # Late import so logging works correctly
import salt.minion import salt.minion
@ -105,12 +109,13 @@ class Minion(parsers.MinionOptionParser):
# This is the latest safe place to daemonize # This is the latest safe place to daemonize
self.daemonize_if_required() self.daemonize_if_required()
try: try:
log.warn('Starting the Salt Minion "{0}"'.format(self.config['id']))
minion = salt.minion.Minion(self.config) minion = salt.minion.Minion(self.config)
self.set_pidfile() self.set_pidfile()
if check_user(self.config['user']): if check_user(self.config['user']):
minion.tune_in() minion.tune_in()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.getLogger(__name__).warn('Stopping the Salt Minion') log.warn('Stopping the Salt Minion')
raise SystemExit('\nExiting on Ctrl-c') raise SystemExit('\nExiting on Ctrl-c')
@ -137,8 +142,8 @@ class Syndic(parsers.SyndicOptionParser):
except OSError, err: except OSError, err:
sys.exit(err.errno) sys.exit(err.errno)
self.setup_logfile_logger() self.setup_logfile_logger()
log = logging.getLogger('salt.syndic')
# Late import so logging works correctly # Late import so logging works correctly
import salt.minion import salt.minion
@ -147,10 +152,10 @@ class Syndic(parsers.SyndicOptionParser):
if check_user(self.config['user']): if check_user(self.config['user']):
try: try:
log.warn('Starting the Salt Syndic Minion "{0}"'.format(
self.config['id']))
syndic = salt.minion.Syndic(self.config) syndic = salt.minion.Syndic(self.config)
syndic.tune_in() syndic.tune_in()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.getLogger(__name__).warn( log.warn('Stopping the Salt Syndic Minion')
'Stopping the Salt Syndic Minion'
)
raise SystemExit('\nExiting on Ctrl-c') raise SystemExit('\nExiting on Ctrl-c')

View File

@ -138,6 +138,7 @@ class Client(object):
''' '''
ret = [] ret = []
path = self._check_proto(path) path = self._check_proto(path)
log.info("Caching directory '%s' for environment '%s'" % (path, env))
for fn_ in self.file_list(env): for fn_ in self.file_list(env):
if fn_.startswith(path): if fn_.startswith(path):
local = self.cache_file('salt://{0}'.format(fn_), env) local = self.cache_file('salt://{0}'.format(fn_), env)
@ -224,9 +225,9 @@ class Client(object):
if path.endswith('.sls'): if path.endswith('.sls'):
# is an sls module! # is an sls module!
if path.endswith('{0}init.sls'.format(os.sep)): if path.endswith('{0}init.sls'.format(os.sep)):
states.append(path.replace(os.sep, '.')[:-9]) states.append(path.replace('/', '.')[:-9])
else: else:
states.append(path.replace(os.sep, '.')[:-4]) states.append(path.replace('/', '.')[:-4])
return states return states
def get_state(self, sls, env): def get_state(self, sls, env):
@ -532,6 +533,7 @@ class RemoteClient(Client):
dest is ommited, then the downloaded file will be placed in the minion dest is ommited, then the downloaded file will be placed in the minion
cache cache
''' '''
log.info("Fetching file '%s'" % path)
path = self._check_proto(path) path = self._check_proto(path)
load = {'path': path, load = {'path': path,
'env': env, 'env': env,

View File

@ -219,7 +219,6 @@ class Master(SMaster):
''' '''
enable_sigusr1_handler() enable_sigusr1_handler()
log.warn('Starting the Salt Master')
self.__set_max_open_files() self.__set_max_open_files()
clear_old_jobs_proc = multiprocessing.Process( clear_old_jobs_proc = multiprocessing.Process(
target=self._clear_old_jobs) target=self._clear_old_jobs)

View File

@ -14,6 +14,7 @@ import re
import threading import threading
import time import time
import traceback import traceback
import sys
# Import third party libs # Import third party libs
import zmq import zmq
@ -132,12 +133,6 @@ class Minion(object):
self.functions, self.returners = self.__load_modules() self.functions, self.returners = self.__load_modules()
self.matcher = Matcher(self.opts, self.functions) self.matcher = Matcher(self.opts, self.functions)
self.proc_dir = get_proc_dir(opts['cachedir']) self.proc_dir = get_proc_dir(opts['cachedir'])
if hasattr(self, '_syndic') and self._syndic:
log.warn(
'Starting the Salt Syndic Minion "{0}"'.format(self.opts['id'])
)
else:
log.warn('Starting the Salt Minion "{0}"'.format(self.opts['id']))
self.authenticate() self.authenticate()
opts['pillar'] = salt.pillar.get_pillar( opts['pillar'] = salt.pillar.get_pillar(
opts, opts,
@ -233,36 +228,39 @@ class Minion(object):
if isinstance(data['fun'], string_types): if isinstance(data['fun'], string_types):
if data['fun'] == 'sys.reload_modules': if data['fun'] == 'sys.reload_modules':
self.functions, self.returners = self.__load_modules() self.functions, self.returners = self.__load_modules()
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
if self.opts['multiprocessing']: target = Minion._thread_multi_return
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
multiprocessing.Process(
target=lambda: self._thread_multi_return(data)
).start()
else:
multiprocessing.Process(
target=lambda: self._thread_return(data)
).start()
else: else:
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list): target = Minion._thread_return
threading.Thread( # We stash an instance references to allow for the socket
target=lambda: self._thread_multi_return(data) # communication in Windows. You can't pickle functions, and thus
).start() # python needs to be able to reconstruct the reference on the other
else: # side.
threading.Thread( instance = self
target=lambda: self._thread_return(data) if self.opts['multiprocessing']:
).start() if sys.platform.startswith('win'):
# let python reconstruct the minion on the other side if we're
# running on windows
instance = None
multiprocessing.Process(target=target, args=(instance, self.opts, data)).start()
else:
threading.Thread(target=target, args=(instance, self.opts, data)).start()
def _thread_return(self, data): @classmethod
def _thread_return(class_, minion_instance, opts, data):
''' '''
This method should be used as a threading target, start the actual This method should be used as a threading target, start the actual
minion side execution. minion side execution.
''' '''
if self.opts['multiprocessing']: # this seems awkward at first, but it's a workaround for Windows
fn_ = os.path.join(self.proc_dir, data['jid']) # multiprocessing communication.
if not minion_instance:
minion_instance = class_(opts)
if opts['multiprocessing']:
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
sdata = {'pid': os.getpid()} sdata = {'pid': os.getpid()}
sdata.update(data) sdata.update(data)
open(fn_, 'w+').write(self.serial.dumps(sdata)) open(fn_, 'w+').write(minion_instance.serial.dumps(sdata))
ret = {} ret = {}
for ind in range(0, len(data['arg'])): for ind in range(0, len(data['arg'])):
try: try:
@ -277,10 +275,10 @@ class Minion(object):
pass pass
function_name = data['fun'] function_name = data['fun']
if function_name in self.functions: if function_name in minion_instance.functions:
ret['success'] = False ret['success'] = False
try: try:
func = self.functions[data['fun']] func = minion_instance.functions[data['fun']]
args, kw = detect_kwargs(func, data['arg'], data) args, kw = detect_kwargs(func, data['arg'], data)
ret['return'] = func(*args, **kw) ret['return'] = func(*args, **kw)
ret['success'] = True ret['success'] = True
@ -306,12 +304,12 @@ class Minion(object):
ret['jid'] = data['jid'] ret['jid'] = data['jid']
ret['fun'] = data['fun'] ret['fun'] = data['fun']
self._return_pub(ret) minion_instance._return_pub(ret)
if data['ret']: if data['ret']:
for returner in set(data['ret'].split(',')): for returner in set(data['ret'].split(',')):
ret['id'] = self.opts['id'] ret['id'] = opts['id']
try: try:
self.returners[returner](ret) minion_instance.returners[returner](ret)
except Exception as exc: except Exception as exc:
log.error( log.error(
'The return failed for job {0} {1}'.format( 'The return failed for job {0} {1}'.format(
@ -320,11 +318,16 @@ class Minion(object):
) )
) )
def _thread_multi_return(self, data): @classmethod
def _thread_multi_return(class_, minion_instance, opts, data):
''' '''
This method should be used as a threading target, start the actual This method should be used as a threading target, start the actual
minion side execution. minion side execution.
''' '''
# this seems awkward at first, but it's a workaround for Windows
# multiprocessing communication.
if not minion_instance:
minion_instance = class_(opts)
ret = {'return': {}} ret = {'return': {}}
for ind in range(0, len(data['fun'])): for ind in range(0, len(data['fun'])):
for index in range(0, len(data['arg'][ind])): for index in range(0, len(data['arg'][ind])):
@ -341,7 +344,7 @@ class Minion(object):
ret['success'][data['fun'][ind]] = False ret['success'][data['fun'][ind]] = False
try: try:
func = self.functions[data['fun'][ind]] func = minion_instance.functions[data['fun'][ind]]
args, kw = detect_kwargs(func, data['arg'][ind], data) args, kw = detect_kwargs(func, data['arg'][ind], data)
ret['return'][data['fun'][ind]] = func(*args, **kw) ret['return'][data['fun'][ind]] = func(*args, **kw)
ret['success'][data['fun'][ind]] = True ret['success'][data['fun'][ind]] = True
@ -354,12 +357,12 @@ class Minion(object):
) )
ret['return'][data['fun'][ind]] = trb ret['return'][data['fun'][ind]] = trb
ret['jid'] = data['jid'] ret['jid'] = data['jid']
self._return_pub(ret) minion_instance._return_pub(ret)
if data['ret']: if data['ret']:
for returner in set(data['ret'].split(',')): for returner in set(data['ret'].split(',')):
ret['id'] = self.opts['id'] ret['id'] = opts['id']
try: try:
self.returners[returner](ret) minion_instance.returners[returner](ret)
except Exception as exc: except Exception as exc:
log.error( log.error(
'The return failed for job {0} {1}'.format( 'The return failed for job {0} {1}'.format(

View File

@ -3,11 +3,14 @@ Minion side functions for salt-cp
''' '''
# Import python libs # Import python libs
import os import os
import logging
# Import salt libs # Import salt libs
import salt.minion import salt.minion
import salt.fileclient import salt.fileclient
log = logging.getLogger(__name__)
def recv(files, dest): def recv(files, dest):
''' '''

View File

@ -44,9 +44,12 @@ def _sync(form, env=None):
source = os.path.join('salt://_{0}'.format(form)) source = os.path.join('salt://_{0}'.format(form))
mod_dir = os.path.join(__opts__['extension_modules'], '{0}'.format(form)) mod_dir = os.path.join(__opts__['extension_modules'], '{0}'.format(form))
if not os.path.isdir(mod_dir): if not os.path.isdir(mod_dir):
log.info("Creating module dir '%s'" % mod_dir)
os.makedirs(mod_dir) os.makedirs(mod_dir)
for sub_env in env: for sub_env in env:
log.info("Syncing %s for environment '%s'" % (form, sub_env))
cache = [] cache = []
log.info('Loading cache from %s,for %s)' % (source, sub_env))
cache.extend(__salt__['cp.cache_dir'](source, sub_env)) cache.extend(__salt__['cp.cache_dir'](source, sub_env))
local_cache_dir=os.path.join( local_cache_dir=os.path.join(
__opts__['cachedir'], __opts__['cachedir'],
@ -54,11 +57,13 @@ def _sync(form, env=None):
sub_env, sub_env,
'_{0}'.format(form) '_{0}'.format(form)
) )
log.debug("Local cache dir: '%s'" % local_cache_dir)
for fn_ in cache: for fn_ in cache:
relpath=os.path.relpath(fn_, local_cache_dir) relpath = os.path.relpath(fn_, local_cache_dir)
relname=os.path.splitext(relpath)[0].replace('/','.') relname = os.path.splitext(relpath)[0].replace(os.sep, '.')
remote.add(relpath) remote.add(relpath)
dest = os.path.join(mod_dir, relpath) dest = os.path.join(mod_dir, relpath)
log.info("Copying '%s' to '%s'" % (fn_, dest))
if os.path.isfile(dest): if os.path.isfile(dest):
# The file is present, if the sum differs replace it # The file is present, if the sum differs replace it
srch = hashlib.md5(open(fn_, 'r').read()).hexdigest() srch = hashlib.md5(open(fn_, 'r').read()).hexdigest()
@ -131,10 +136,11 @@ def update(version=None):
if not has_esky: if not has_esky:
return "Esky not available as import" return "Esky not available as import"
if not getattr(sys, "frozen", False): if not getattr(sys, "frozen", False):
return "Instance is not a frozen instance" return "Minion is not running an Esky build"
if not __opts__['update_url']: if not __opts__['update_url']:
return "'update_url' not configured on this minion" return "'update_url' not configured on this minion"
app = esky.Esky(sys.executable, __opts__['update_url']) app = esky.Esky(sys.executable, __opts__['update_url'])
oldversion = __grains__['saltversion']
try: try:
if not version: if not version:
version = app.find_update() version = app.find_update()
@ -145,10 +151,10 @@ def update(version=None):
app.cleanup() app.cleanup()
except Exception as e: except Exception as e:
return e return e
restarted = [] restarted = {}
for service in __opts__['update_restart_services']: for service in __opts__['update_restart_services']:
restarted.append(__salt__['service.restart'](service)) restarted[service] = __salt__['service.restart'](service)
return {'comment': "Updated from %s to %s" % (__version__, version), return {'comment': "Updated from %s to %s" % (oldversion, version),
'restarted': restarted} 'restarted': restarted}
def sync_modules(env=None): def sync_modules(env=None):
@ -230,6 +236,7 @@ def sync_all(env=None):
salt '*' saltutil.sync_all salt '*' saltutil.sync_all
''' '''
logging.debug("Syncing all")
ret = [] ret = []
ret.append(sync_modules(env)) ret.append(sync_modules(env))
ret.append(sync_states(env)) ret.append(sync_states(env))

View File

@ -114,49 +114,6 @@ def daemonize():
''' '''
Daemonize a process Daemonize a process
''' '''
if 'os' in os.environ:
if os.environ['os'].startswith('Windows'):
import ctypes
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
import win32api
executablepath = sys.executable
pypath = executablepath.split('\\')
win32api.ShellExecute(
0,
'runas',
executablepath,
os.path.join(
pypath[0],
os.sep,
pypath[1],
'Lib\\site-packages\\salt\\utils\\saltminionservice.py'
),
os.path.join(pypath[0], os.sep, pypath[1]),
0
)
sys.exit(0)
else:
from . import saltminionservice
import win32serviceutil
import win32service
import winerror
servicename = 'salt-minion'
try:
status = win32serviceutil.QueryServiceStatus(servicename)
except win32service.error as details:
if details[0] == winerror.ERROR_SERVICE_DOES_NOT_EXIST:
saltminionservice.instart(
saltminionservice.MinionService,
servicename,
'Salt Minion'
)
sys.exit(0)
if status[1] == win32service.SERVICE_RUNNING:
win32serviceutil.StopServiceWithDeps(servicename)
win32serviceutil.StartService(servicename)
else:
win32serviceutil.StartService(servicename)
sys.exit(0)
try: try:
pid = os.fork() pid = os.fork()
if pid > 0: if pid > 0:
@ -201,6 +158,8 @@ def daemonize_if(opts, **kwargs):
return return
if not opts['multiprocessing']: if not opts['multiprocessing']:
return return
if sys.platform.startswith('win'):
return
# Daemonizing breaks the proc dir, so the proc needs to be rewritten # Daemonizing breaks the proc dir, so the proc needs to be rewritten
data = {} data = {}
for key, val in kwargs.items(): for key, val in kwargs.items():

View File

@ -3,10 +3,10 @@ Jinja loading utils to enable a more powerful backend for jinja templates
''' '''
# Import python libs # Import python libs
from os import path from os import path
import logging
# Import third-party libs # Import third-party libs
from jinja2 import Template, BaseLoader, Environment, StrictUndefined from jinja2 import Template, BaseLoader, Environment, StrictUndefined
from jinja2.loaders import split_template_path
from jinja2.exceptions import TemplateNotFound from jinja2.exceptions import TemplateNotFound
# Import Salt libs # Import Salt libs
@ -14,6 +14,9 @@ import salt
import salt.fileclient import salt.fileclient
log = logging.getLogger(__name__)
def get_template(filename, opts, env): def get_template(filename, opts, env):
loader = SaltCacheLoader(opts, env) loader = SaltCacheLoader(opts, env)
if filename.startswith(loader.searchpath): if filename.startswith(loader.searchpath):
@ -44,6 +47,7 @@ class SaltCacheLoader(BaseLoader):
self.env = env self.env = env
self.encoding = encoding self.encoding = encoding
self.searchpath = path.join(opts['cachedir'], 'files', env) self.searchpath = path.join(opts['cachedir'], 'files', env)
log.debug("Jinja search path: '%s'" % self.searchpath)
self._file_client = None self._file_client = None
self.cached = [] self.cached = []
@ -72,7 +76,10 @@ class SaltCacheLoader(BaseLoader):
def get_source(self, environment, template): def get_source(self, environment, template):
# checks for relative '..' paths # checks for relative '..' paths
template = path.join(*split_template_path(template)) if '..' in template:
log.warning("Discarded template path '%s', relative paths are"
"prohibited" % template)
raise TemplateNotFound(template)
self.check_cache(template) self.check_cache(template)
filepath = path.join(self.searchpath, template) filepath = path.join(self.searchpath, template)
with open(filepath, 'rb') as f: with open(filepath, 'rb') as f:

View File

@ -9,7 +9,10 @@ import stat
import socket import socket
import getpass import getpass
import logging import logging
import resource if sys.platform.startswith('win'):
import win32file
else:
import resource
from salt.log import is_console_configured from salt.log import is_console_configured
from salt.exceptions import SaltClientError from salt.exceptions import SaltClientError
@ -268,7 +271,14 @@ def check_parent_dirs(fname, user='root'):
def check_max_open_files(opts): def check_max_open_files(opts):
mof_c = opts.get('max_open_files', 100000) mof_c = opts.get('max_open_files', 100000)
mof_s, mof_h = resource.getrlimit(resource.RLIMIT_NOFILE) if sys.platform.startswith('win'):
# Check the windows api for more detail on this
# http://msdn.microsoft.com/en-us/library/xt874334(v=vs.71).aspx
# and the python binding http://timgolden.me.uk/pywin32-docs/win32file.html
mof_s = mof_h = win32file._getmaxstdio()
else:
mof_s, mof_h = resource.getrlimit(resource.RLIMIT_NOFILE)
accepted_keys_dir = os.path.join(opts.get('pki_dir'), 'minions') accepted_keys_dir = os.path.join(opts.get('pki_dir'), 'minions')
accepted_count = len([ accepted_count = len([
key for key in os.listdir(accepted_keys_dir) if key for key in os.listdir(accepted_keys_dir) if

View File

@ -144,13 +144,18 @@ freezer_includes = [
'zmq.core.*', 'zmq.core.*',
'zmq.utils.*', 'zmq.utils.*',
'ast', 'ast',
'difflib',
'distutils'
] ]
if sys.platform == 'win32': if sys.platform.startswith('win'):
freezer_includes.extend([ freezer_includes.extend([
'win32api',
'win32file',
'win32con', 'win32con',
'win32security', 'win32security',
'ntsecuritycon' 'ntsecuritycon',
'_winreg'
]) ])
elif sys.platform.startswith('linux'): elif sys.platform.startswith('linux'):
freezer_includes.extend([ freezer_includes.extend([