mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 08:58:59 +00:00
commit
9457506e7d
@ -12,16 +12,9 @@ import sys
|
||||
from glob import glob
|
||||
|
||||
# Import salt libs
|
||||
import salt.cli.caller
|
||||
import salt.cli.cp
|
||||
import salt.cli.batch
|
||||
import salt.client
|
||||
import salt.client.ssh
|
||||
import salt.client.netapi
|
||||
import salt.output
|
||||
import salt.runner
|
||||
import salt.auth
|
||||
import salt.key
|
||||
import salt.client.ssh
|
||||
from salt.config import _expand_glob_path
|
||||
|
||||
from salt.utils import parsers, print_cli
|
||||
@ -45,6 +38,8 @@ class SaltCMD(parsers.SaltCMDOptionParser):
|
||||
'''
|
||||
Execute the salt command line
|
||||
'''
|
||||
import salt.auth
|
||||
import salt.cli.batch
|
||||
self.parse_args()
|
||||
|
||||
if self.config['verify_env']:
|
||||
@ -323,6 +318,7 @@ class SaltCP(parsers.SaltCPOptionParser):
|
||||
'''
|
||||
Execute salt-cp
|
||||
'''
|
||||
import salt.cli.cp
|
||||
self.parse_args()
|
||||
|
||||
if self.config['verify_env']:
|
||||
@ -351,6 +347,8 @@ class SaltKey(parsers.SaltKeyOptionParser):
|
||||
'''
|
||||
Execute salt-key
|
||||
'''
|
||||
|
||||
import salt.key
|
||||
self.parse_args()
|
||||
|
||||
if self.config['verify_env']:
|
||||
@ -397,6 +395,7 @@ class SaltCall(parsers.SaltCallOptionParser):
|
||||
'''
|
||||
Used to locally execute a salt command
|
||||
'''
|
||||
import salt.cli.caller
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
@ -457,10 +456,12 @@ class SaltRun(parsers.SaltRunOptionParser):
|
||||
'''
|
||||
Used to execute Salt runners
|
||||
'''
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Execute salt-run
|
||||
'''
|
||||
import salt.runner
|
||||
self.parse_args()
|
||||
|
||||
if self.config['verify_env']:
|
||||
@ -502,6 +503,7 @@ class SaltSSH(parsers.SaltSSHOptionParser):
|
||||
'''
|
||||
Used to Execute the salt ssh routine
|
||||
'''
|
||||
|
||||
def run(self):
|
||||
self.parse_args()
|
||||
|
||||
@ -531,6 +533,7 @@ class SaltAPI(six.with_metaclass(parsers.OptionParserMeta, # pylint: disable=W0
|
||||
'''
|
||||
Run the api
|
||||
'''
|
||||
import salt.client.netapi
|
||||
self.parse_args()
|
||||
try:
|
||||
if self.config['verify_env']:
|
||||
|
@ -24,7 +24,6 @@ except Exception:
|
||||
pass
|
||||
|
||||
# Import salt libs
|
||||
import salt.crypt
|
||||
import salt.utils
|
||||
import salt.utils.network
|
||||
import salt.syspaths
|
||||
@ -2043,6 +2042,7 @@ def apply_master_config(overrides=None, defaults=None):
|
||||
'''
|
||||
Returns master configurations dict.
|
||||
'''
|
||||
import salt.crypt
|
||||
if defaults is None:
|
||||
defaults = DEFAULT_MASTER_OPTS
|
||||
|
||||
|
@ -32,7 +32,6 @@ import salt.utils
|
||||
import salt.payload
|
||||
import salt.utils.verify
|
||||
import salt.version
|
||||
import salt.minion
|
||||
from salt.exceptions import (
|
||||
AuthenticationError, SaltClientError, SaltReqTimeoutError
|
||||
)
|
||||
|
@ -40,10 +40,12 @@ import salt.daemons.masterapi
|
||||
import salt.defaults.exitcodes
|
||||
import salt.utils.atomicfile
|
||||
import salt.utils.event
|
||||
import salt.utils.reactor
|
||||
import salt.utils.verify
|
||||
import salt.utils.minions
|
||||
import salt.utils.gzip_util
|
||||
import salt.utils.process
|
||||
import salt.utils.zeromq
|
||||
from salt.defaults import DEFAULT_TARGET_DELIM
|
||||
from salt.utils.debug import enable_sigusr1_handler, enable_sigusr2_handler, inspect_stack
|
||||
from salt.utils.event import tagify
|
||||
@ -400,7 +402,7 @@ class Master(SMaster):
|
||||
|
||||
if self.opts.get('reactor'):
|
||||
log.info('Creating master reactor process')
|
||||
process_manager.add_process(salt.utils.event.Reactor, args=(self.opts,))
|
||||
process_manager.add_process(salt.utils.reactor.Reactor, args=(self.opts,))
|
||||
|
||||
if self.opts.get('event_return'):
|
||||
log.info('Creating master event return process')
|
||||
@ -510,7 +512,7 @@ class Publisher(multiprocessing.Process):
|
||||
pull_uri = 'ipc://{0}'.format(
|
||||
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
|
||||
)
|
||||
salt.utils.check_ipc_path_max_len(pull_uri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
|
||||
|
||||
# Start the minion command publisher
|
||||
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
|
||||
|
@ -67,6 +67,7 @@ import salt.utils.args
|
||||
import salt.utils.event
|
||||
import salt.utils.minion
|
||||
import salt.utils.schedule
|
||||
import salt.utils.zeromq
|
||||
import salt.defaults.exitcodes
|
||||
|
||||
from salt.defaults import DEFAULT_TARGET_DELIM
|
||||
@ -170,7 +171,7 @@ def load_args_and_kwargs(func, args, data=None):
|
||||
Detect the args and kwargs that need to be passed to a function call, and
|
||||
check them against what was passed.
|
||||
'''
|
||||
argspec = salt.utils.get_function_argspec(func)
|
||||
argspec = salt.utils.args.get_function_argspec(func)
|
||||
_args = []
|
||||
_kwargs = {}
|
||||
invalid_kwargs = []
|
||||
@ -331,9 +332,9 @@ class MinionBase(object):
|
||||
)
|
||||
else:
|
||||
epub_uri = 'ipc://{0}'.format(epub_sock_path)
|
||||
salt.utils.check_ipc_path_max_len(epub_uri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(epub_uri)
|
||||
epull_uri = 'ipc://{0}'.format(epull_sock_path)
|
||||
salt.utils.check_ipc_path_max_len(epull_uri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(epull_uri)
|
||||
|
||||
log.debug(
|
||||
'{0} PUB socket URI: {1}'.format(
|
||||
|
@ -11,7 +11,6 @@ import logging
|
||||
|
||||
# Import salt libs
|
||||
import salt.log
|
||||
import salt.crypt
|
||||
import salt.ext.six as six
|
||||
|
||||
from salt.exceptions import SaltReqTimeoutError
|
||||
|
@ -827,7 +827,7 @@ class State(object):
|
||||
)
|
||||
else:
|
||||
# First verify that the parameters are met
|
||||
aspec = salt.utils.get_function_argspec(self.states[full])
|
||||
aspec = salt.utils.args.get_function_argspec(self.states[full])
|
||||
arglen = 0
|
||||
deflen = 0
|
||||
if isinstance(aspec.args, list):
|
||||
|
@ -130,7 +130,7 @@ def run(name, **kwargs):
|
||||
ret['comment'] = 'Module function {0} is set to execute'.format(name)
|
||||
return ret
|
||||
|
||||
aspec = salt.utils.get_function_argspec(__salt__[name])
|
||||
aspec = salt.utils.args.get_function_argspec(__salt__[name])
|
||||
args = []
|
||||
defaults = {}
|
||||
|
||||
|
@ -16,7 +16,6 @@ import fnmatch
|
||||
import hashlib
|
||||
import imp
|
||||
import io
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@ -32,7 +31,6 @@ import tempfile
|
||||
import time
|
||||
import types
|
||||
import warnings
|
||||
import yaml
|
||||
import string
|
||||
import locale
|
||||
from calendar import month_abbr as months
|
||||
@ -76,12 +74,6 @@ try:
|
||||
except ImportError:
|
||||
HAS_WIN32API = False
|
||||
|
||||
try:
|
||||
import zmq
|
||||
except ImportError:
|
||||
# Running as purely local
|
||||
pass
|
||||
|
||||
try:
|
||||
import grp
|
||||
HAS_GRP = True
|
||||
@ -142,27 +134,6 @@ log = logging.getLogger(__name__)
|
||||
_empty = object()
|
||||
|
||||
|
||||
def get_function_argspec(func):
|
||||
'''
|
||||
A small wrapper around getargspec that also supports callable classes
|
||||
'''
|
||||
if not callable(func):
|
||||
raise TypeError('{0} is not a callable'.format(func))
|
||||
|
||||
if inspect.isfunction(func):
|
||||
aspec = inspect.getargspec(func)
|
||||
elif inspect.ismethod(func):
|
||||
aspec = inspect.getargspec(func)
|
||||
del aspec.args[0] # self
|
||||
elif isinstance(func, object):
|
||||
aspec = inspect.getargspec(func.__call__)
|
||||
del aspec.args[0] # self
|
||||
else:
|
||||
raise TypeError('Cannot inspect argument list for {0!r}'.format(func))
|
||||
|
||||
return aspec
|
||||
|
||||
|
||||
def safe_rm(tgt):
|
||||
'''
|
||||
Safely remove a file
|
||||
@ -826,7 +797,7 @@ def format_call(fun,
|
||||
ret['args'] = []
|
||||
ret['kwargs'] = {}
|
||||
|
||||
aspec = get_function_argspec(fun)
|
||||
aspec = salt.utils.args.get_function_argspec(fun)
|
||||
|
||||
args, kwargs = iter(arg_lookup(fun).values())
|
||||
|
||||
@ -940,7 +911,7 @@ def arg_lookup(fun):
|
||||
function.
|
||||
'''
|
||||
ret = {'kwargs': {}}
|
||||
aspec = get_function_argspec(fun)
|
||||
aspec = salt.utils.args.get_function_argspec(fun)
|
||||
if aspec.defaults:
|
||||
ret['kwargs'] = dict(zip(aspec.args[::-1], aspec.defaults[::-1]))
|
||||
ret['args'] = [arg for arg in aspec.args if arg not in ret['kwargs']]
|
||||
@ -1427,22 +1398,6 @@ def check_include_exclude(path_str, include_pat=None, exclude_pat=None):
|
||||
return ret
|
||||
|
||||
|
||||
def check_ipc_path_max_len(uri):
|
||||
# The socket path is limited to 107 characters on Solaris and
|
||||
# Linux, and 103 characters on BSD-based systems.
|
||||
ipc_path_max_len = getattr(zmq, 'IPC_PATH_MAX_LEN', 103)
|
||||
if ipc_path_max_len and len(uri) > ipc_path_max_len:
|
||||
raise SaltSystemExit(
|
||||
'The socket path is longer than allowed by OS. '
|
||||
'{0!r} is longer than {1} characters. '
|
||||
'Either try to reduce the length of this setting\'s '
|
||||
'path or switch to TCP; in the configuration file, '
|
||||
'set "ipc_mode: tcp".'.format(
|
||||
uri, ipc_path_max_len
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def gen_state_tag(low):
|
||||
'''
|
||||
Generate the running dict tag string from the low data structure
|
||||
@ -1807,49 +1762,6 @@ def date_format(date=None, format="%Y-%m-%d"):
|
||||
return date_cast(date).strftime(format)
|
||||
|
||||
|
||||
def yaml_dquote(text):
|
||||
"""Make text into a double-quoted YAML string with correct escaping
|
||||
for special characters. Includes the opening and closing double
|
||||
quote characters.
|
||||
"""
|
||||
with io.StringIO() as ostream:
|
||||
yemitter = yaml.emitter.Emitter(ostream)
|
||||
yemitter.write_double_quoted(six.text_type(text))
|
||||
return ostream.getvalue()
|
||||
|
||||
|
||||
def yaml_squote(text):
|
||||
"""Make text into a single-quoted YAML string with correct escaping
|
||||
for special characters. Includes the opening and closing single
|
||||
quote characters.
|
||||
"""
|
||||
with io.StringIO() as ostream:
|
||||
yemitter = yaml.emitter.Emitter(ostream)
|
||||
yemitter.write_single_quoted(six.text_type(text))
|
||||
return ostream.getvalue()
|
||||
|
||||
|
||||
def yaml_encode(data):
|
||||
"""A simple YAML encode that can take a single-element datatype and return
|
||||
a string representation.
|
||||
"""
|
||||
yrepr = yaml.representer.SafeRepresenter()
|
||||
ynode = yrepr.represent_data(data)
|
||||
if not isinstance(ynode, yaml.ScalarNode):
|
||||
raise TypeError(
|
||||
"yaml_encode() only works with YAML scalar data;"
|
||||
" failed for {0}".format(type(data))
|
||||
)
|
||||
|
||||
tag = ynode.tag.rsplit(':', 1)[-1]
|
||||
ret = ynode.value
|
||||
|
||||
if tag == "str":
|
||||
ret = yaml_dquote(ynode.value)
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def warn_until(version,
|
||||
message,
|
||||
category=DeprecationWarning,
|
||||
@ -1900,6 +1812,7 @@ def warn_until(version,
|
||||
_version_ = salt.version.SaltStackVersion(*_version_info_)
|
||||
|
||||
if _version_ >= version:
|
||||
import inspect
|
||||
caller = inspect.getframeinfo(sys._getframe(stacklevel - 1))
|
||||
raise RuntimeError(
|
||||
'The warning triggered on filename {filename!r}, line number '
|
||||
@ -2098,7 +2011,7 @@ def argspec_report(functions, module=''):
|
||||
if _use_fnmatch:
|
||||
for fun in fnmatch.filter(functions, target_mod):
|
||||
try:
|
||||
aspec = get_function_argspec(functions[fun])
|
||||
aspec = salt.utils.args.get_function_argspec(functions[fun])
|
||||
except TypeError:
|
||||
# this happens if not callable
|
||||
continue
|
||||
@ -2115,7 +2028,7 @@ def argspec_report(functions, module=''):
|
||||
for fun in functions:
|
||||
if fun == module or fun.startswith(target_module):
|
||||
try:
|
||||
aspec = get_function_argspec(functions[fun])
|
||||
aspec = salt.utils.args.get_function_argspec(functions[fun])
|
||||
except TypeError:
|
||||
# this happens if not callable
|
||||
continue
|
||||
@ -2227,6 +2140,7 @@ def repack_dictlist(data):
|
||||
'''
|
||||
if isinstance(data, string_types):
|
||||
try:
|
||||
import yaml
|
||||
data = yaml.safe_load(data)
|
||||
except yaml.parser.ParserError as err:
|
||||
log.error(err)
|
||||
|
@ -6,6 +6,7 @@ from __future__ import absolute_import
|
||||
|
||||
# Import python libs
|
||||
import re
|
||||
import inspect
|
||||
|
||||
# Import salt libs
|
||||
from salt.ext.six import string_types, integer_types
|
||||
@ -138,3 +139,24 @@ def yamlify_arg(arg):
|
||||
except Exception:
|
||||
# In case anything goes wrong...
|
||||
return original_arg
|
||||
|
||||
|
||||
def get_function_argspec(func):
|
||||
'''
|
||||
A small wrapper around getargspec that also supports callable classes
|
||||
'''
|
||||
if not callable(func):
|
||||
raise TypeError('{0} is not a callable'.format(func))
|
||||
|
||||
if inspect.isfunction(func):
|
||||
aspec = inspect.getargspec(func)
|
||||
elif inspect.ismethod(func):
|
||||
aspec = inspect.getargspec(func)
|
||||
del aspec.args[0] # self
|
||||
elif isinstance(func, object):
|
||||
aspec = inspect.getargspec(func.__call__)
|
||||
del aspec.args[0] # self
|
||||
else:
|
||||
raise TypeError('Cannot inspect argument list for {0!r}'.format(func))
|
||||
|
||||
return aspec
|
||||
|
@ -51,8 +51,6 @@ from __future__ import absolute_import
|
||||
|
||||
# Import python libs
|
||||
import os
|
||||
import fnmatch
|
||||
import glob
|
||||
import hashlib
|
||||
import errno
|
||||
import logging
|
||||
@ -67,17 +65,14 @@ try:
|
||||
except ImportError:
|
||||
# Local mode does not need zmq
|
||||
pass
|
||||
import yaml
|
||||
|
||||
# Import salt libs
|
||||
import salt.payload
|
||||
import salt.loader
|
||||
import salt.state
|
||||
import salt.utils
|
||||
import salt.utils.cache
|
||||
from salt.ext.six import string_types
|
||||
import salt.utils.process
|
||||
from salt._compat import string_types
|
||||
import salt.utils.zeromq
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# The SUB_EVENT set is for functions that require events fired based on
|
||||
@ -198,12 +193,12 @@ class SaltEvent(object):
|
||||
sock_dir,
|
||||
'master_event_pub.ipc'
|
||||
))
|
||||
salt.utils.check_ipc_path_max_len(puburi)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(puburi)
|
||||
pulluri = 'ipc://{0}'.format(os.path.join(
|
||||
sock_dir,
|
||||
'master_event_pull.ipc'
|
||||
))
|
||||
salt.utils.check_ipc_path_max_len(pulluri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(pulluri)
|
||||
else:
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
puburi = 'tcp://127.0.0.1:{0}'.format(
|
||||
@ -217,12 +212,12 @@ class SaltEvent(object):
|
||||
sock_dir,
|
||||
'minion_event_{0}_pub.ipc'.format(id_hash)
|
||||
))
|
||||
salt.utils.check_ipc_path_max_len(puburi)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(puburi)
|
||||
pulluri = 'ipc://{0}'.format(os.path.join(
|
||||
sock_dir,
|
||||
'minion_event_{0}_pull.ipc'.format(id_hash)
|
||||
))
|
||||
salt.utils.check_ipc_path_max_len(pulluri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(pulluri)
|
||||
log.debug(
|
||||
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
|
||||
)
|
||||
@ -573,13 +568,13 @@ class EventPublisher(multiprocessing.Process):
|
||||
epub_uri = 'ipc://{0}'.format(
|
||||
os.path.join(self.opts['sock_dir'], 'master_event_pub.ipc')
|
||||
)
|
||||
salt.utils.check_ipc_path_max_len(epub_uri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(epub_uri)
|
||||
# Prepare master event pull socket
|
||||
self.epull_sock = self.context.socket(zmq.PULL)
|
||||
epull_uri = 'ipc://{0}'.format(
|
||||
os.path.join(self.opts['sock_dir'], 'master_event_pull.ipc')
|
||||
)
|
||||
salt.utils.check_ipc_path_max_len(epull_uri)
|
||||
salt.utils.zeromq.check_ipc_path_max_len(epull_uri)
|
||||
|
||||
# Start the master event publisher
|
||||
old_umask = os.umask(0o177)
|
||||
@ -672,177 +667,6 @@ class EventReturn(multiprocessing.Process):
|
||||
return True
|
||||
|
||||
|
||||
class Reactor(multiprocessing.Process, salt.state.Compiler):
|
||||
'''
|
||||
Read in the reactor configuration variable and compare it to events
|
||||
processed on the master.
|
||||
The reactor has the capability to execute pre-programmed executions
|
||||
as reactions to events
|
||||
'''
|
||||
def __init__(self, opts):
|
||||
multiprocessing.Process.__init__(self)
|
||||
salt.state.Compiler.__init__(self, opts)
|
||||
self.wrap = ReactWrap(self.opts)
|
||||
|
||||
local_minion_opts = self.opts.copy()
|
||||
local_minion_opts['file_client'] = 'local'
|
||||
self.minion = salt.minion.MasterMinion(local_minion_opts)
|
||||
|
||||
def render_reaction(self, glob_ref, tag, data):
|
||||
'''
|
||||
Execute the render system against a single reaction file and return
|
||||
the data structure
|
||||
'''
|
||||
react = {}
|
||||
|
||||
if glob_ref.startswith('salt://'):
|
||||
glob_ref = self.minion.functions['cp.cache_file'](glob_ref)
|
||||
|
||||
for fn_ in glob.glob(glob_ref):
|
||||
try:
|
||||
react.update(self.render_template(
|
||||
fn_,
|
||||
tag=tag,
|
||||
data=data))
|
||||
except Exception:
|
||||
log.error('Failed to render "{0}"'.format(fn_))
|
||||
return react
|
||||
|
||||
def list_reactors(self, tag):
|
||||
'''
|
||||
Take in the tag from an event and return a list of the reactors to
|
||||
process
|
||||
'''
|
||||
log.debug('Gathering reactors for tag {0}'.format(tag))
|
||||
reactors = []
|
||||
if isinstance(self.opts['reactor'], string_types):
|
||||
try:
|
||||
with salt.utils.fopen(self.opts['reactor']) as fp_:
|
||||
react_map = yaml.safe_load(fp_.read())
|
||||
except (OSError, IOError):
|
||||
log.error(
|
||||
'Failed to read reactor map: "{0}"'.format(
|
||||
self.opts['reactor']
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
log.error(
|
||||
'Failed to parse YAML in reactor map: "{0}"'.format(
|
||||
self.opts['reactor']
|
||||
)
|
||||
)
|
||||
else:
|
||||
react_map = self.opts['reactor']
|
||||
for ropt in react_map:
|
||||
if not isinstance(ropt, dict):
|
||||
continue
|
||||
if len(ropt) != 1:
|
||||
continue
|
||||
key = next(iter(ropt.keys()))
|
||||
val = ropt[key]
|
||||
if fnmatch.fnmatch(tag, key):
|
||||
if isinstance(val, string_types):
|
||||
reactors.append(val)
|
||||
elif isinstance(val, list):
|
||||
reactors.extend(val)
|
||||
return reactors
|
||||
|
||||
def reactions(self, tag, data, reactors):
|
||||
'''
|
||||
Render a list of reactor files and returns a reaction struct
|
||||
'''
|
||||
log.debug('Compiling reactions for tag {0}'.format(tag))
|
||||
high = {}
|
||||
chunks = []
|
||||
for fn_ in reactors:
|
||||
high.update(self.render_reaction(fn_, tag, data))
|
||||
if high:
|
||||
errors = self.verify_high(high)
|
||||
if errors:
|
||||
return errors
|
||||
chunks = self.order_chunks(self.compile_high_data(high))
|
||||
return chunks
|
||||
|
||||
def call_reactions(self, chunks):
|
||||
'''
|
||||
Execute the reaction state
|
||||
'''
|
||||
for chunk in chunks:
|
||||
self.wrap.run(chunk)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Enter into the server loop
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
self.event = SaltEvent('master', self.opts['sock_dir'])
|
||||
events = self.event.iter_events(full=True)
|
||||
self.event.fire_event({}, 'salt/reactor/start')
|
||||
for data in events:
|
||||
reactors = self.list_reactors(data['tag'])
|
||||
if not reactors:
|
||||
continue
|
||||
chunks = self.reactions(data['tag'], data['data'], reactors)
|
||||
if chunks:
|
||||
self.call_reactions(chunks)
|
||||
|
||||
|
||||
class ReactWrap(object):
|
||||
'''
|
||||
Create a wrapper that executes low data for the reaction system
|
||||
'''
|
||||
# class-wide cache of clients
|
||||
client_cache = None
|
||||
|
||||
def __init__(self, opts):
|
||||
self.opts = opts
|
||||
if ReactWrap.client_cache is None:
|
||||
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
|
||||
|
||||
def run(self, low):
|
||||
'''
|
||||
Execute the specified function in the specified state by passing the
|
||||
LowData
|
||||
'''
|
||||
l_fun = getattr(self, low['state'])
|
||||
try:
|
||||
f_call = salt.utils.format_call(l_fun, low)
|
||||
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
|
||||
except Exception:
|
||||
log.error(
|
||||
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
|
||||
exc_info=True
|
||||
)
|
||||
return False
|
||||
return ret
|
||||
|
||||
def local(self, *args, **kwargs):
|
||||
'''
|
||||
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
|
||||
'''
|
||||
if 'local' not in self.client_cache:
|
||||
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
|
||||
return self.client_cache['local'].cmd_async(*args, **kwargs)
|
||||
|
||||
cmd = local
|
||||
|
||||
def runner(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
|
||||
'''
|
||||
if 'runner' not in self.client_cache:
|
||||
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
|
||||
return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
|
||||
|
||||
def wheel(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
|
||||
'''
|
||||
if 'wheel' not in self.client_cache:
|
||||
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
|
||||
return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
|
||||
|
||||
|
||||
class StateFire(object):
|
||||
'''
|
||||
Evaluate the data from a state run and fire events on the master and minion
|
||||
|
191
salt/utils/reactor.py
Normal file
191
salt/utils/reactor.py
Normal file
@ -0,0 +1,191 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import
|
||||
|
||||
# Import python libs
|
||||
import fnmatch
|
||||
import glob
|
||||
import logging
|
||||
import multiprocessing
|
||||
|
||||
import yaml
|
||||
|
||||
# Import salt libs
|
||||
import salt.state
|
||||
import salt.utils
|
||||
import salt.utils.cache
|
||||
import salt.utils.event
|
||||
from salt.ext.six import string_types
|
||||
import salt.utils.process
|
||||
from salt._compat import string_types
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Reactor(multiprocessing.Process, salt.state.Compiler):
|
||||
'''
|
||||
Read in the reactor configuration variable and compare it to events
|
||||
processed on the master.
|
||||
The reactor has the capability to execute pre-programmed executions
|
||||
as reactions to events
|
||||
'''
|
||||
def __init__(self, opts):
|
||||
multiprocessing.Process.__init__(self)
|
||||
salt.state.Compiler.__init__(self, opts)
|
||||
self.wrap = ReactWrap(self.opts)
|
||||
|
||||
local_minion_opts = self.opts.copy()
|
||||
local_minion_opts['file_client'] = 'local'
|
||||
self.minion = salt.minion.MasterMinion(local_minion_opts)
|
||||
|
||||
def render_reaction(self, glob_ref, tag, data):
|
||||
'''
|
||||
Execute the render system against a single reaction file and return
|
||||
the data structure
|
||||
'''
|
||||
react = {}
|
||||
|
||||
if glob_ref.startswith('salt://'):
|
||||
glob_ref = self.minion.functions['cp.cache_file'](glob_ref)
|
||||
|
||||
for fn_ in glob.glob(glob_ref):
|
||||
try:
|
||||
react.update(self.render_template(
|
||||
fn_,
|
||||
tag=tag,
|
||||
data=data))
|
||||
except Exception:
|
||||
log.error('Failed to render "{0}"'.format(fn_))
|
||||
return react
|
||||
|
||||
def list_reactors(self, tag):
|
||||
'''
|
||||
Take in the tag from an event and return a list of the reactors to
|
||||
process
|
||||
'''
|
||||
log.debug('Gathering reactors for tag {0}'.format(tag))
|
||||
reactors = []
|
||||
if isinstance(self.opts['reactor'], string_types):
|
||||
try:
|
||||
with salt.utils.fopen(self.opts['reactor']) as fp_:
|
||||
react_map = yaml.safe_load(fp_.read())
|
||||
except (OSError, IOError):
|
||||
log.error(
|
||||
'Failed to read reactor map: "{0}"'.format(
|
||||
self.opts['reactor']
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
log.error(
|
||||
'Failed to parse YAML in reactor map: "{0}"'.format(
|
||||
self.opts['reactor']
|
||||
)
|
||||
)
|
||||
else:
|
||||
react_map = self.opts['reactor']
|
||||
for ropt in react_map:
|
||||
if not isinstance(ropt, dict):
|
||||
continue
|
||||
if len(ropt) != 1:
|
||||
continue
|
||||
key = next(iter(ropt.keys()))
|
||||
val = ropt[key]
|
||||
if fnmatch.fnmatch(tag, key):
|
||||
if isinstance(val, string_types):
|
||||
reactors.append(val)
|
||||
elif isinstance(val, list):
|
||||
reactors.extend(val)
|
||||
return reactors
|
||||
|
||||
def reactions(self, tag, data, reactors):
|
||||
'''
|
||||
Render a list of reactor files and returns a reaction struct
|
||||
'''
|
||||
log.debug('Compiling reactions for tag {0}'.format(tag))
|
||||
high = {}
|
||||
chunks = []
|
||||
for fn_ in reactors:
|
||||
high.update(self.render_reaction(fn_, tag, data))
|
||||
if high:
|
||||
errors = self.verify_high(high)
|
||||
if errors:
|
||||
return errors
|
||||
chunks = self.order_chunks(self.compile_high_data(high))
|
||||
return chunks
|
||||
|
||||
def call_reactions(self, chunks):
|
||||
'''
|
||||
Execute the reaction state
|
||||
'''
|
||||
for chunk in chunks:
|
||||
self.wrap.run(chunk)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Enter into the server loop
|
||||
'''
|
||||
salt.utils.appendproctitle(self.__class__.__name__)
|
||||
self.event = salt.utils.event.SaltEvent('master', self.opts['sock_dir'])
|
||||
events = self.event.iter_events(full=True)
|
||||
self.event.fire_event({}, 'salt/reactor/start')
|
||||
for data in events:
|
||||
reactors = self.list_reactors(data['tag'])
|
||||
if not reactors:
|
||||
continue
|
||||
chunks = self.reactions(data['tag'], data['data'], reactors)
|
||||
if chunks:
|
||||
self.call_reactions(chunks)
|
||||
|
||||
|
||||
class ReactWrap(object):
|
||||
'''
|
||||
Create a wrapper that executes low data for the reaction system
|
||||
'''
|
||||
# class-wide cache of clients
|
||||
client_cache = None
|
||||
|
||||
def __init__(self, opts):
|
||||
self.opts = opts
|
||||
if ReactWrap.client_cache is None:
|
||||
ReactWrap.client_cache = salt.utils.cache.CacheDict(opts['reactor_refresh_interval'])
|
||||
|
||||
def run(self, low):
|
||||
'''
|
||||
Execute the specified function in the specified state by passing the
|
||||
LowData
|
||||
'''
|
||||
l_fun = getattr(self, low['state'])
|
||||
try:
|
||||
f_call = salt.utils.format_call(l_fun, low)
|
||||
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
|
||||
except Exception:
|
||||
log.error(
|
||||
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),
|
||||
exc_info=True
|
||||
)
|
||||
return False
|
||||
return ret
|
||||
|
||||
def local(self, *args, **kwargs):
|
||||
'''
|
||||
Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
|
||||
'''
|
||||
if 'local' not in self.client_cache:
|
||||
self.client_cache['local'] = salt.client.LocalClient(self.opts['conf_file'])
|
||||
return self.client_cache['local'].cmd_async(*args, **kwargs)
|
||||
|
||||
cmd = local
|
||||
|
||||
def runner(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
|
||||
'''
|
||||
if 'runner' not in self.client_cache:
|
||||
self.client_cache['runner'] = salt.runner.RunnerClient(self.opts)
|
||||
return self.client_cache['runner'].async(fun, kwargs, fire_event=False)
|
||||
|
||||
def wheel(self, fun, **kwargs):
|
||||
'''
|
||||
Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
|
||||
'''
|
||||
if 'wheel' not in self.client_cache:
|
||||
self.client_cache['wheel'] = salt.wheel.Wheel(self.opts)
|
||||
return self.client_cache['wheel'].async(fun, kwargs, fire_event=False)
|
@ -20,6 +20,7 @@ import jinja2.ext
|
||||
|
||||
# Import salt libs
|
||||
import salt.utils
|
||||
import salt.utils.yamlencoding
|
||||
from salt.exceptions import (
|
||||
SaltRenderError, CommandExecutionError, SaltInvocationError
|
||||
)
|
||||
@ -264,9 +265,9 @@ def render_jinja_tmpl(tmplstr, context, tmplpath=None):
|
||||
|
||||
jinja_env.filters['strftime'] = salt.utils.date_format
|
||||
jinja_env.filters['sequence'] = ensure_sequence_filter
|
||||
jinja_env.filters['yaml_dquote'] = salt.utils.yaml_dquote
|
||||
jinja_env.filters['yaml_squote'] = salt.utils.yaml_squote
|
||||
jinja_env.filters['yaml_encode'] = salt.utils.yaml_encode
|
||||
jinja_env.filters['yaml_dquote'] = salt.utils.yamlencoding.yaml_dquote
|
||||
jinja_env.filters['yaml_squote'] = salt.utils.yamlencoding.yaml_squote
|
||||
jinja_env.filters['yaml_encode'] = salt.utils.yamlencoding.yaml_encode
|
||||
|
||||
jinja_env.globals['odict'] = OrderedDict
|
||||
jinja_env.globals['show_full_context'] = show_full_context
|
||||
|
50
salt/utils/yamlencoding.py
Normal file
50
salt/utils/yamlencoding.py
Normal file
@ -0,0 +1,50 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import io
|
||||
import yaml
|
||||
import six
|
||||
|
||||
|
||||
def yaml_dquote(text):
|
||||
'''
|
||||
Make text into a double-quoted YAML string with correct escaping
|
||||
for special characters. Includes the opening and closing double
|
||||
quote characters.
|
||||
'''
|
||||
with io.StringIO() as ostream:
|
||||
yemitter = yaml.emitter.Emitter(ostream)
|
||||
yemitter.write_double_quoted(six.text_type(text))
|
||||
return ostream.getvalue()
|
||||
|
||||
|
||||
def yaml_squote(text):
|
||||
'''
|
||||
Make text into a single-quoted YAML string with correct escaping
|
||||
for special characters. Includes the opening and closing single
|
||||
quote characters.
|
||||
'''
|
||||
with io.StringIO() as ostream:
|
||||
yemitter = yaml.emitter.Emitter(ostream)
|
||||
yemitter.write_single_quoted(six.text_type(text))
|
||||
return ostream.getvalue()
|
||||
|
||||
|
||||
def yaml_encode(data):
|
||||
'''
|
||||
A simple YAML encode that can take a single-element datatype and return
|
||||
a string representation.
|
||||
'''
|
||||
yrepr = yaml.representer.SafeRepresenter()
|
||||
ynode = yrepr.represent_data(data)
|
||||
if not isinstance(ynode, yaml.ScalarNode):
|
||||
raise TypeError(
|
||||
"yaml_encode() only works with YAML scalar data;"
|
||||
" failed for {0}".format(type(data))
|
||||
)
|
||||
|
||||
tag = ynode.tag.rsplit(':', 1)[-1]
|
||||
ret = ynode.value
|
||||
|
||||
if tag == "str":
|
||||
ret = yaml_dquote(ynode.value)
|
||||
|
||||
return ret
|
26
salt/utils/zeromq.py
Normal file
26
salt/utils/zeromq.py
Normal file
@ -0,0 +1,26 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
try:
|
||||
import zmq
|
||||
HAS_ZMQ = True
|
||||
except ImportError:
|
||||
HAS_ZMQ = False
|
||||
|
||||
from salt.exceptions import SaltSystemExit
|
||||
|
||||
|
||||
def check_ipc_path_max_len(uri):
|
||||
# The socket path is limited to 107 characters on Solaris and
|
||||
# Linux, and 103 characters on BSD-based systems.
|
||||
if not HAS_ZMQ:
|
||||
return
|
||||
ipc_path_max_len = getattr(zmq, 'IPC_PATH_MAX_LEN', 103)
|
||||
if ipc_path_max_len and len(uri) > ipc_path_max_len:
|
||||
raise SaltSystemExit(
|
||||
'The socket path is longer than allowed by OS. '
|
||||
'{0!r} is longer than {1} characters. '
|
||||
'Either try to reduce the length of this setting\'s '
|
||||
'path or switch to TCP; in the configuration file, '
|
||||
'set "ipc_mode: tcp".'.format(
|
||||
uri, ipc_path_max_len
|
||||
)
|
||||
)
|
@ -411,18 +411,7 @@ __saltstack_version__ = SaltStackVersion.from_last_named_version()
|
||||
|
||||
|
||||
# ----- Dynamic/Runtime Salt Version Information -------------------------------------------------------------------->
|
||||
def __get_version(saltstack_version):
|
||||
'''
|
||||
If we can get a version provided at installation time or from Git, use
|
||||
that instead, otherwise we carry on.
|
||||
'''
|
||||
try:
|
||||
# Try to import the version information provided at install time
|
||||
from salt._version import __saltstack_version__ # pylint: disable=E0611,F0401
|
||||
return __saltstack_version__
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def __discover_version(saltstack_version):
|
||||
# This might be a 'python setup.py develop' installation type. Let's
|
||||
# discover the version information at runtime.
|
||||
import os
|
||||
@ -487,6 +476,19 @@ def __get_version(saltstack_version):
|
||||
return saltstack_version
|
||||
|
||||
|
||||
def __get_version(saltstack_version):
|
||||
'''
|
||||
If we can get a version provided at installation time or from Git, use
|
||||
that instead, otherwise we carry on.
|
||||
'''
|
||||
try:
|
||||
# Try to import the version information provided at install time
|
||||
from salt._version import __saltstack_version__ # pylint: disable=E0611,F0401
|
||||
return __saltstack_version__
|
||||
except ImportError:
|
||||
return __discover_version(saltstack_version)
|
||||
|
||||
|
||||
# Get additional version information if available
|
||||
__saltstack_version__ = __get_version(__saltstack_version__)
|
||||
# This function has executed once, we're done with it. Delete it!
|
||||
|
@ -59,7 +59,7 @@ class ModuleStateTest(TestCase):
|
||||
comment = 'Module function {0} is set to execute'.format(CMD)
|
||||
self.assertEqual(ret['comment'], comment)
|
||||
|
||||
@patch('salt.utils.get_function_argspec', MagicMock(return_value=aspec))
|
||||
@patch('salt.utils.args.get_function_argspec', MagicMock(return_value=aspec))
|
||||
def test_module_run_missing_arg(self):
|
||||
'''
|
||||
Tests the return of module.run state when arguments are missing
|
||||
|
@ -115,7 +115,7 @@ class UtilsTestCase(TestCase):
|
||||
|
||||
expected_argspec = namedtuple('ArgSpec', 'args varargs keywords defaults')(
|
||||
args=['first', 'second', 'third', 'fourth'], varargs=None, keywords=None, defaults=('fifth',))
|
||||
ret = utils.get_function_argspec(dummy_func)
|
||||
ret = utils.args.get_function_argspec(dummy_func)
|
||||
|
||||
self.assertEqual(ret, expected_argspec)
|
||||
|
||||
@ -141,12 +141,12 @@ class UtilsTestCase(TestCase):
|
||||
self.assertTrue(False, "utils.safe_rm raised exception when it should not have")
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
@patch.multiple('salt.utils', get_function_argspec=DEFAULT, arg_lookup=DEFAULT)
|
||||
def test_format_call(self, arg_lookup, get_function_argspec):
|
||||
@patch('salt.utils.arg_lookup')
|
||||
def test_format_call(self, arg_lookup):
|
||||
def dummy_func(first=None, second=None, third=None):
|
||||
pass
|
||||
|
||||
arg_lookup.return_value = {'args': ['first', 'second', 'third'], 'kwargs': {}}
|
||||
get_function_argspec = DEFAULT
|
||||
get_function_argspec.return_value = namedtuple('ArgSpec', 'args varargs keywords defaults')(
|
||||
args=['first', 'second', 'third', 'fourth'], varargs=None, keywords=None, defaults=('fifth',))
|
||||
|
||||
@ -435,7 +435,7 @@ class UtilsTestCase(TestCase):
|
||||
Ensure we throw an exception if we have a too-long IPC URI
|
||||
'''
|
||||
with patch('zmq.IPC_PATH_MAX_LEN', 1):
|
||||
self.assertRaises(SaltSystemExit, utils.check_ipc_path_max_len, '1' * 1024)
|
||||
self.assertRaises(SaltSystemExit, utils.zeromq.check_ipc_path_max_len, '1' * 1024)
|
||||
|
||||
def test_test_mode(self):
|
||||
self.assertTrue(utils.test_mode(test=True))
|
||||
@ -523,18 +523,18 @@ class UtilsTestCase(TestCase):
|
||||
|
||||
def test_yaml_dquote(self):
|
||||
for teststr in (r'"\ []{}"',):
|
||||
self.assertEqual(teststr, yaml.safe_load(utils.yaml_dquote(teststr)))
|
||||
self.assertEqual(teststr, yaml.safe_load(utils.yamlencoding.yaml_dquote(teststr)))
|
||||
|
||||
def test_yaml_squote(self):
|
||||
ret = utils.yaml_squote(r'"')
|
||||
ret = utils.yamlencoding.yaml_squote(r'"')
|
||||
self.assertEqual(ret, r"""'"'""")
|
||||
|
||||
def test_yaml_encode(self):
|
||||
for testobj in (None, True, False, '[7, 5]', '"monkey"', 5, 7.5, "2014-06-02 15:30:29.7"):
|
||||
self.assertEqual(testobj, yaml.safe_load(utils.yaml_encode(testobj)))
|
||||
self.assertEqual(testobj, yaml.safe_load(utils.yamlencoding.yaml_encode(testobj)))
|
||||
|
||||
for testobj in ({}, [], set()):
|
||||
self.assertRaises(TypeError, utils.yaml_encode, testobj)
|
||||
self.assertRaises(TypeError, utils.yamlencoding.yaml_encode, testobj)
|
||||
|
||||
def test_compare_dicts(self):
|
||||
ret = utils.compare_dicts(old={'foo': 'bar'}, new={'foo': 'bar'})
|
||||
|
Loading…
Reference in New Issue
Block a user