Performance refactor

salt '*' test.ping...now with 40,000 fewer function calls!
This commit is contained in:
Mike Place 2014-12-09 17:43:00 -07:00
parent ea7af4fb65
commit d7c904c161
4 changed files with 206 additions and 186 deletions

View File

@ -12,16 +12,8 @@ import sys
from glob import glob
# Import salt libs
import salt.cli.caller
import salt.cli.cp
import salt.cli.batch
import salt.client
import salt.client.ssh
import salt.client.netapi
import salt.output
import salt.runner
import salt.auth
import salt.key
from salt.config import _expand_glob_path
from salt.utils import parsers, print_cli
@ -42,6 +34,8 @@ class SaltCMD(parsers.SaltCMDOptionParser):
'''
def run(self):
import salt.auth
import salt.cli.batch
'''
Execute the salt command line
'''
@ -318,6 +312,7 @@ class SaltCP(parsers.SaltCPOptionParser):
'''
Run the salt-cp command line client
'''
import salt.cli.cp
def run(self):
'''
@ -351,6 +346,8 @@ class SaltKey(parsers.SaltKeyOptionParser):
'''
Execute salt-key
'''
import salt.key
self.parse_args()
if self.config['verify_env']:
@ -402,6 +399,7 @@ class SaltCall(parsers.SaltCallOptionParser):
'''
Execute the salt call!
'''
import salt.cli.caller
self.parse_args()
if self.config['verify_env']:
@ -457,10 +455,12 @@ class SaltRun(parsers.SaltRunOptionParser):
'''
Used to execute Salt runners
'''
def run(self):
'''
Execute salt-run
'''
import salt.runner
self.parse_args()
if self.config['verify_env']:
@ -502,7 +502,9 @@ class SaltSSH(parsers.SaltSSHOptionParser):
'''
Used to Execute the salt ssh routine
'''
def run(self):
import salt.client.ssh
self.parse_args()
ssh = salt.client.ssh.SSH(self.config)
@ -531,6 +533,7 @@ class SaltAPI(six.with_metaclass(parsers.OptionParserMeta, # pylint: disable=W0
'''
Run the api
'''
import salt.client.netapi
self.parse_args()
try:
if self.config['verify_env']:

View File

@ -40,6 +40,7 @@ import salt.daemons.masterapi
import salt.defaults.exitcodes
import salt.utils.atomicfile
import salt.utils.event
import salt.utils.reactor
import salt.utils.verify
import salt.utils.minions
import salt.utils.gzip_util
@ -400,7 +401,7 @@ class Master(SMaster):
if self.opts.get('reactor'):
log.info('Creating master reactor process')
process_manager.add_process(salt.utils.event.Reactor, args=(self.opts,))
process_manager.add_process(salt.utils.reactor.Reactor, args=(self.opts,))
if self.opts.get('event_return'):
log.info('Creating master event return process')

View File

@ -51,8 +51,6 @@ from __future__ import absolute_import
# Import python libs
import os
import fnmatch
import glob
import hashlib
import errno
import logging
@ -67,17 +65,13 @@ try:
except ImportError:
# Local mode does not need zmq
pass
import yaml
# Import salt libs
import salt.payload
import salt.loader
import salt.state
import salt.utils
import salt.utils.cache
from salt.ext.six import string_types
import salt.utils.process
from salt._compat import string_types
log = logging.getLogger(__name__)
# The SUB_EVENT set is for functions that require events fired based on
@ -672,177 +666,6 @@ class EventReturn(multiprocessing.Process):
return True
class Reactor(multiprocessing.Process, salt.state.Compiler):
'''
Read in the reactor configuration variable and compare it to events
processed on the master.
The reactor has the capability to execute pre-programmed executions
as reactions to events
'''
def __init__(self, opts):
multiprocessing.Process.__init__(self)
salt.state.Compiler.__init__(self, opts)
self.wrap = ReactWrap(self.opts)
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
def render_reaction(self, glob_ref, tag, data):
'''
Execute the render system against a single reaction file and return
the data structure
'''
react = {}
if glob_ref.startswith('salt://'):
glob_ref = self.minion.functions['cp.cache_file'](glob_ref)
for fn_ in glob.glob(glob_ref):
try:
react.update(self.render_template(
fn_,
tag=tag,
data=data))
except Exception:
log.error('Failed to render "{0}"'.format(fn_))
return react
def list_reactors(self, tag):
'''
Take in the tag from an event and return a list of the reactors to
process
'''
log.debug('Gathering reactors for tag {0}'.format(tag))
reactors = []
if isinstance(self.opts['reactor'], string_types):
try:
with salt.utils.fopen(self.opts['reactor']) as fp_:
react_map = yaml.safe_load(fp_.read())
except (OSError, IOError):
log.error(
'Failed to read reactor map: "{0}"'.format(
self.opts['reactor']
)
)
except Exception:
log.error(
'Failed to parse YAML in reactor map: "{0}"'.format(
self.opts['reactor']
)
)
else:
react_map = self.opts['reactor']
for ropt in react_map:
if not isinstance(ropt, dict):
continue
if len(ropt) != 1:
continue
key = next(iter(ropt.keys()))
val = ropt[key]
if fnmatch.fnmatch(tag, key):
if isinstance(val, string_types):
reactors.append(val)
elif isinstance(val, list):
reactors.extend(val)
return reactors
def reactions(self, tag, data, reactors):
'''
Render a list of reactor files and returns a reaction struct
'''
log.debug('Compiling reactions for tag {0}'.format(tag))
high = {}
chunks = []
for fn_ in reactors:
high.update(self.render_reaction(fn_, tag, data))
if high:
errors = self.verify_high(high)
if errors:
return errors
chunks = self.order_chunks(self.compile_high_data(high))
return chunks
def call_reactions(self, chunks):
'''
Execute the reaction state
'''
for chunk in chunks:
self.wrap.run(chunk)
def run(self):
'''
Enter into the server loop
'''
salt.utils.appendproctitle(self.__class__.__name__)
self.event = SaltEvent('master', self.opts['sock_dir'])
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/reactor/start')
for data in events:
reactors = self.list_reactors(data['tag'])
if not reactors:
continue
chunks = self.reactions(data['tag'], data['data'], reactors)
if chunks:
self.call_reactions(chunks)
class ReactWrap(object):
'''
Create a wrapper that executes low data for the reaction system
'''
# class-wide cache of clients
client_cache = None
def __init__(self, opts):
self.opts = opts
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
def run(self, low):
'''
Execute the specified function in the specified state by passing the
LowData
'''
l_fun = getattr(self, low['state'])
try:
f_call = salt.utils.format_call(l_fun, low)
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
except Exception:
log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
exc_info=True
)
return False
return ret
def local(self, *args, **kwargs):
'''
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
'''
if 'local' not in self.client_cache:
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
return self.client_cache['local'].cmd_async(*args, **kwargs)
cmd = local
def runner(self, fun, **kwargs):
'''
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
'''
if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
def wheel(self, fun, **kwargs):
'''
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
'''
if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
class StateFire(object):
'''
Evaluate the data from a state run and fire events on the master and minion

193
salt/utils/reactor.py Normal file
View File

@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
# Import python libs
import fnmatch
import glob
import logging
import multiprocessing
import yaml
# Import salt libs
import salt.state
import salt.utils
import salt.utils.cache
from salt.ext.six import string_types
import salt.utils.process
from salt._compat import string_types
log = logging.getLogger(__name__)
class Reactor(multiprocessing.Process, salt.state.Compiler):
'''
Read in the reactor configuration variable and compare it to events
processed on the master.
The reactor has the capability to execute pre-programmed executions
as reactions to events
'''
def __init__(self, opts):
multiprocessing.Process.__init__(self)
salt.state.Compiler.__init__(self, opts)
self.wrap = ReactWrap(self.opts)
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
def render_reaction(self, glob_ref, tag, data):
'''
Execute the render system against a single reaction file and return
the data structure
'''
react = {}
if glob_ref.startswith('salt://'):
glob_ref = self.minion.functions['cp.cache_file'](glob_ref)
for fn_ in glob.glob(glob_ref):
try:
react.update(self.render_template(
fn_,
tag=tag,
data=data))
except Exception:
log.error('Failed to render "{0}"'.format(fn_))
return react
def list_reactors(self, tag):
'''
Take in the tag from an event and return a list of the reactors to
process
'''
log.debug('Gathering reactors for tag {0}'.format(tag))
reactors = []
if isinstance(self.opts['reactor'], string_types):
try:
with salt.utils.fopen(self.opts['reactor']) as fp_:
react_map = yaml.safe_load(fp_.read())
except (OSError, IOError):
log.error(
'Failed to read reactor map: "{0}"'.format(
self.opts['reactor']
)
)
except Exception:
log.error(
'Failed to parse YAML in reactor map: "{0}"'.format(
self.opts['reactor']
)
)
else:
react_map = self.opts['reactor']
for ropt in react_map:
if not isinstance(ropt, dict):
continue
if len(ropt) != 1:
continue
key = next(iter(ropt.keys()))
val = ropt[key]
if fnmatch.fnmatch(tag, key):
if isinstance(val, string_types):
reactors.append(val)
elif isinstance(val, list):
reactors.extend(val)
return reactors
def reactions(self, tag, data, reactors):
'''
Render a list of reactor files and returns a reaction struct
'''
log.debug('Compiling reactions for tag {0}'.format(tag))
high = {}
chunks = []
for fn_ in reactors:
high.update(self.render_reaction(fn_, tag, data))
if high:
errors = self.verify_high(high)
if errors:
return errors
chunks = self.order_chunks(self.compile_high_data(high))
return chunks
def call_reactions(self, chunks):
'''
Execute the reaction state
'''
for chunk in chunks:
self.wrap.run(chunk)
def run(self):
'''
Enter into the server loop
'''
salt.utils.appendproctitle(self.__class__.__name__)
self.event = salt.utils.event.SaltEvent('master', self.opts['sock_dir'])
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/reactor/start')
for data in events:
reactors = self.list_reactors(data['tag'])
if not reactors:
continue
chunks = self.reactions(data['tag'], data['data'], reactors)
if chunks:
self.call_reactions(chunks)
class ReactWrap(object):
'''
Create a wrapper that executes low data for the reaction system
'''
# class-wide cache of clients
client_cache = None
def __init__(self, opts):
self.opts = opts
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
self.pool = salt.utils.process.ThreadPool(
self.opts['reactor_worker_threads'], # number of workers for runner/wheel
queue_size=self.opts['reactor_worker_hwm'] # queue size for those workers
)
def run(self, low):
'''
Execute the specified function in the specified state by passing the
LowData
'''
l_fun = getattr(self, low['state'])
try:
f_call = salt.utils.format_call(l_fun, low)
l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
except Exception:
log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
exc_info=True
)
def local(self, *args, **kwargs):
'''
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
'''
if 'local' not in self.client_cache:
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
self.client_cache['local'].cmd_async(*args, **kwargs)
cmd = local
def runner(self, _, **kwargs):
'''
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
'''
if 'runner' not in self.client_cache:
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
self.pool.fire_async(self.client_cache['runner'].low, kwargs)
def wheel(self, _, **kwargs):
'''
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
'''
if 'wheel' not in self.client_cache:
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
self.pool.fire_async(self.client_cache['wheel'].low, kwargs)