Merge pull request #19253 from jacksontj/runner_event

Refactor of evented runners (#17283)
This commit is contained in:
Thomas S Hatch 2015-01-12 10:51:16 -07:00
commit 7406b778f1
22 changed files with 520 additions and 416 deletions

View File

@ -194,7 +194,7 @@ dummy-variables-rgx=_|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=__opts__,__salt__,__pillar__,__grains__,__context__,__ret__,__env__,__low__,__states__,__lowstate__,__running__,__active_provider_name__,__master_opts__,__progress__
additional-builtins=__opts__,__salt__,__pillar__,__grains__,__context__,__ret__,__env__,__low__,__states__,__lowstate__,__running__,__active_provider_name__,__master_opts__,__jid_event__
[SIMILARITIES]

View File

@ -154,7 +154,7 @@ indent-string=' '
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=__opts__,__salt__,__pillar__,__grains__,__context__,__ret__,__env__,__low__,__states__,__lowstate__,__running__,__active_provider_name__,__master_opts__,__progress__
additional-builtins=__opts__,__salt__,__pillar__,__grains__,__context__,__ret__,__env__,__low__,__states__,__lowstate__,__running__,__active_provider_name__,__master_opts__,__jid_event__
[IMPORTS]

View File

@ -4,12 +4,18 @@ A collection of mixins useful for the various *Client interfaces
'''
from __future__ import print_function
from __future__ import absolute_import
import datetime
import __builtin__
import collections
import logging
import time
import multiprocessing
import salt.exceptions
import salt.utils
import salt.utils.event
import salt.utils.jid
import salt.transport
from salt.utils.error import raise_error
from salt.utils.event import tagify
from salt.utils.doc import strip_rst as _strip_rst
@ -33,15 +39,209 @@ class SyncClientMixin(object):
err = 'Function {0!r} is unavailable'.format(fun)
raise salt.exceptions.CommandExecutionError(err)
def master_call(self, **kwargs):
'''
Execute a function through the master network interface.
'''
load = kwargs
load['cmd'] = self.client
channel = salt.transport.Channel.factory(self.opts,
crypt='clear',
usage='master_call')
ret = channel.send(load)
if isinstance(ret, collections.Mapping):
if 'error' in ret:
raise_error(**ret['error'])
return ret
def cmd_sync(self, low, timeout=None):
'''
Execute a runner function synchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized to execute runner functions: (``@runner``).
.. code-block:: python
runner.eauth_sync({
'fun': 'jobs.list_jobs',
'username': 'saltdev',
'password': 'saltdev',
'eauth': 'pam',
})
'''
event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
job = self.master_call(**low)
ret_tag = salt.utils.event.tagify('ret', base=job['tag'])
if timeout is None:
timeout = 300
ret = event.get_event(tag=ret_tag, full=True, wait=timeout)
if ret is None:
raise salt.exceptions.SaltClientTimeout(
"RunnerClient job '{0}' timed out".format(job['jid']),
jid=job['jid'])
return ret['data']['return']
def cmd(self, fun, arg=None, pub_data=None, kwarg=None):
'''
Execute a function
.. code-block:: python
>>> opts = salt.config.master_config('/etc/salt/master')
>>> runner = salt.runner.RunnerClient(opts)
>>> runner.cmd('jobs.list_jobs', [])
{
'20131219215650131543': {
'Arguments': [300],
'Function': 'test.sleep',
'StartTime': '2013, Dec 19 21:56:50.131543',
'Target': '*',
'Target-type': 'glob',
'User': 'saltdev'
},
'20131219215921857715': {
'Arguments': [300],
'Function': 'test.sleep',
'StartTime': '2013, Dec 19 21:59:21.857715',
'Target': '*',
'Target-type': 'glob',
'User': 'saltdev'
},
}
'''
if arg is None:
arg = tuple()
if not isinstance(arg, list) and not isinstance(arg, tuple):
raise salt.exceptions.SaltInvocationError(
'arg must be formatted as a list/tuple'
)
if pub_data is None:
pub_data = {}
if not isinstance(pub_data, dict):
raise salt.exceptions.SaltInvocationError(
'pub_data must be formatted as a dictionary'
)
if kwarg is None:
kwarg = {}
if not isinstance(kwarg, dict):
raise salt.exceptions.SaltInvocationError(
'kwarg must be formatted as a dictionary'
)
arglist = salt.utils.args.parse_input(arg)
# if you were passed kwarg, add it to arglist
if kwarg:
kwarg['__kwarg__'] = True
arglist.append(kwarg)
args, kwargs = salt.minion.load_args_and_kwargs(
self.functions[fun], arglist, pub_data
)
low = {'fun': fun,
'args': args,
'kwargs': kwargs}
return self.low(fun, low)
def low(self, fun, low):
'''
Execute a function from low data
Low data includes:
required:
- fun: the name of the function to run
optional:
- args: a list of args to pass to fun
- kwargs: kwargs for fun
- __user__: user who is running the command
- __jid__: jid to run under
- __tag__: tag to run under
'''
self._verify_fun(fun)
l_fun = self.functions[fun]
f_call = salt.utils.format_call(l_fun, low)
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
return ret
jid = low.get('__jid__', salt.utils.jid.gen_jid())
tag = low.get('__tag__', tagify(jid, prefix=self.tag_prefix))
data = {'fun': '{0}.{1}'.format(self.client, fun),
'jid': jid,
'user': low.get('__user__', 'UNKNOWN'),
}
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
event.fire_event(data, tagify('new', base=tag))
# TODO: document these, and test that they exist
# TODO: Other things to inject??
func_globals = {'__jid__': jid,
'__user__': data['user'],
'__tag__': tag,
'__jid_event__': salt.utils.event.NamespacedEvent(event, tag),
}
def over_print(output):
'''
Print and duplicate the print to an event
'''
print_event = {'data': output,
'outputter': 'pprint'}
func_globals['__jid_event__'].fire_event(print_event, 'print')
__builtin__.print(output) # and do the old style printout
func_globals['print'] = over_print
# Inject some useful globals to the funciton's global namespace
for global_key, value in func_globals.iteritems():
self.functions[fun].func_globals[global_key] = value
try:
self._verify_fun(fun)
# There are some descrepencies of what a "low" structure is
# in the publisher world it is a dict including stuff such as jid,
# fun, arg (a list of args, with kwargs packed in). Historically
# this particular one has had no "arg" and just has had all the
# kwargs packed into the top level object. The plan is to move away
# from that since the caller knows what is an arg vs a kwarg, but
# while we make the transition we will load "kwargs" using format_call
# if there are no kwargs in the low object passed in
f_call = None
if 'args' not in low:
f_call = salt.utils.format_call(self.functions[fun], low)
args = f_call.get('args', ())
else:
args = low['args']
if 'kwargs' not in low:
if f_call is None:
f_call = salt.utils.format_call(self.functions[fun], low)
kwargs = f_call.get('kwargs', {})
# throw a warning for the badly formed low data if we found
# kwargs using the old mechanism
if kwargs:
salt.utils.warn_until(
'Boron',
'kwargs must be passed inside the low under "kwargs"'
)
else:
kwargs = low['kwargs']
data['return'] = self.functions[fun](*args, **kwargs)
data['success'] = True
except Exception as exc:
data['return'] = 'Exception occurred in {0} {1}: {2}: {3}'.format(
self.client,
fun,
exc.__class__.__name__,
exc,
)
data['success'] = False
event.fire_event(data, tagify('ret', base=tag))
# if we fired an event, make sure to delete the event object.
# This will ensure that we call destroy, which will do the 0MQ linger
del event
return data['return']
def get_docs(self, arg=None):
'''
@ -72,42 +272,40 @@ class AsyncClientMixin(object):
multiprocess and fire the return data on the event bus
'''
salt.utils.daemonize()
data = {'fun': '{0}.{1}'.format(self.client, fun),
'jid': jid,
'user': user,
}
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
event.fire_event(data, tagify('new', base=tag))
try:
data['return'] = self.low(fun, low)
data['success'] = True
except Exception as exc:
data['return'] = 'Exception occurred in {0} {1}: {2}: {3}'.format(
self.client,
fun,
exc.__class__.__name__,
exc,
)
data['success'] = False
data['user'] = user
# pack a few things into low
low['__jid__'] = jid
low['__user__'] = user
low['__tag__'] = tag
event.fire_event(data, tagify('ret', base=tag))
# if we fired an event, make sure to delete the event object.
# This will ensure that we call destroy, which will do the 0MQ linger
del event
self.low(fun, low)
def cmd_async(self, low):
'''
Execute a function asynchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized
.. code-block:: python
>>> wheel.cmd_async({
'fun': 'key.finger',
'match': 'jerry',
'eauth': 'auto',
'username': 'saltdev',
'password': 'saltdev',
})
{'jid': '20131219224744416681', 'tag': 'salt/wheel/20131219224744416681'}
'''
return self.master_call(**low)
def async(self, fun, low, user='UNKNOWN'):
'''
Execute the function in a multiprocess and return the event tag to use
to watch for the return
'''
jid = '{0:%Y%m%d%H%M%S%f}'.format(datetime.datetime.now())
jid = salt.utils.jid.gen_jid()
tag = tagify(jid, prefix=self.tag_prefix)
proc = multiprocessing.Process(
@ -116,3 +314,61 @@ class AsyncClientMixin(object):
proc.start()
proc.join() # MUST join, otherwise we leave zombies all over
return {'tag': tag, 'jid': jid}
def print_async_event(self, suffix, event):
'''
Print all of the events with the prefix 'tag'
'''
# some suffixes we don't want to print
if suffix in ('new', ):
return
# TODO: clean up this event print out. We probably want something
# more general, since this will get *really* messy as
# people use more events that don't quite fit into this mold
if suffix == 'ret': # for "ret" just print out return
salt.output.display_output(event['return'], '', self.opts)
elif isinstance(event, dict) and 'outputter' in event and event['outputter'] is not None:
print(self.outputters[event['outputter']](event['data']))
# otherwise fall back on basic printing
else:
event.pop('_stamp') # remove the timestamp before printing
print('{tag}: {event}'.format(tag=suffix,
event=event))
def get_async_returns(self, tag, timeout=None, event=None):
'''
Yield all events from a given tag until "ret" is recieved or timeout is
reached.
'''
if timeout is None:
timeout = self.opts['timeout'] * 2
if event is None:
event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
timeout_at = time.time() + timeout
last_progress_timestamp = time.time()
basetag_depth = tag.count('/') + 1
# no need to have a sleep, get_event has one inside
while True:
raw = event.get_event(timeout, tag=tag, full=True)
# If we saw no events in the event bus timeout
# OR
# we have reached the total timeout
# AND
# have not seen any progress events for the length of the timeout.
now = time.time()
if raw is None and (now > timeout_at and
now - last_progress_timestamp > timeout):
# Timeout reached
break
try:
tag_parts = raw['tag'].split('/')
suffix = '/'.join(tag_parts[basetag_depth:])
last_progress_timestamp = now
yield suffix, raw['data']
if tag_parts[3] == 'ret':
raise StopIteration() # we are done, we got return
except (IndexError, KeyError):
continue

View File

@ -999,7 +999,8 @@ class Minion(MinionBase):
)
else:
process = threading.Thread(
target=target, args=(instance, self.opts, data),
target=target,
args=(instance, self.opts, data),
name=data['jid']
)
process.start()

View File

@ -6,24 +6,16 @@ Execute salt convenience routines
# Import python libs
from __future__ import print_function
from __future__ import absolute_import
import collections
import logging
import time
import sys
import multiprocessing
# Import salt libs
import salt.exceptions
import salt.loader
import salt.minion
import salt.utils
import salt.utils.args
import salt.utils.event
from salt.client import mixins
from salt.output import display_output
from salt.utils.error import raise_error
from salt.utils.event import tagify
import salt.ext.six as six
log = logging.getLogger(__name__)
@ -49,165 +41,6 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
def __init__(self, opts):
self.opts = opts
self.functions = salt.loader.runner(opts) # Must be self.functions for mixin to work correctly :-/
self.returners = salt.loader.returners(opts, self.functions)
self.outputters = salt.loader.outputters(opts)
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
def cmd(self, fun, arg, pub_data=None, kwarg=None):
'''
Execute a runner function
.. code-block:: python
>>> opts = salt.config.master_config('/etc/salt/master')
>>> runner = salt.runner.RunnerClient(opts)
>>> runner.cmd('jobs.list_jobs', [])
{
'20131219215650131543': {
'Arguments': [300],
'Function': 'test.sleep',
'StartTime': '2013, Dec 19 21:56:50.131543',
'Target': '*',
'Target-type': 'glob',
'User': 'saltdev'
},
'20131219215921857715': {
'Arguments': [300],
'Function': 'test.sleep',
'StartTime': '2013, Dec 19 21:59:21.857715',
'Target': '*',
'Target-type': 'glob',
'User': 'saltdev'
},
}
'''
if kwarg is None:
kwarg = {}
if not isinstance(kwarg, dict):
raise salt.exceptions.SaltInvocationError(
'kwarg must be formatted as a dictionary'
)
if pub_data is None:
pub_data = {}
if not isinstance(pub_data, dict):
raise salt.exceptions.SaltInvocationError(
'pub_data must be formatted as a dictionary'
)
arglist = salt.utils.args.parse_input(arg)
def _append_kwarg(arglist, kwarg):
'''
Append the kwarg dict to the arglist
'''
kwarg['__kwarg__'] = True
arglist.append(kwarg)
if kwarg:
try:
if isinstance(arglist[-1], dict) \
and '__kwarg__' in arglist[-1]:
for key, val in six.iteritems(kwarg):
if key in arglist[-1]:
log.warning(
'Overriding keyword argument {0!r}'.format(key)
)
arglist[-1][key] = val
else:
# No kwargs yet present in arglist
_append_kwarg(arglist, kwarg)
except IndexError:
# arglist is empty, just append
_append_kwarg(arglist, kwarg)
self._verify_fun(fun)
args, kwargs = salt.minion.load_args_and_kwargs(
self.functions[fun], arglist, pub_data
)
fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
jid = self.returners[fstr]()
log.debug('Runner starting with jid {0}'.format(jid))
self.event.fire_event({'runner_job': fun}, tagify([jid, 'new'], 'job'))
target = RunnerClient._thread_return
data = {'fun': fun, 'jid': jid, 'args': args, 'kwargs': kwargs}
args = (self, self.opts, data)
ret = jid
if self.opts.get('async', False):
process = multiprocessing.Process(
target=target, args=args
)
process.start()
else:
ret = target(*args)
return ret
@classmethod
def _thread_return(cls, instance, opts, data):
'''
The multiprocessing process calls back here
to stream returns
'''
# Runners modules runtime injection:
# - the progress event system with the correct jid
# - Provide JID if the runner wants to access it directly
done = {}
if opts.get('async', False):
progress = salt.utils.event.get_runner_event(opts, data['jid'], listen=False).fire_progress
else:
progress = _progress_print
for func_name, func in instance.functions.items():
if func.__module__ in done:
continue
mod = sys.modules[func.__module__]
mod.__jid__ = data['jid']
mod.__progress__ = progress
done[func.__module__] = mod
ret = instance.functions[data['fun']](*data['args'], **data['kwargs'])
# Sleep for just a moment to let any progress events return
time.sleep(0.1)
ret_load = {'return': ret, 'fun': data['fun'], 'fun_args': data['args']}
# Don't use the invoking processes' event socket because it could be closed down by the time we arrive here.
# Create another, for safety's sake.
master_event = salt.utils.event.get_master_event(opts, opts['sock_dir'], listen=False)
master_event.fire_event(ret_load, tagify([data['jid'], 'return'], 'runner'))
master_event.destroy()
try:
fstr = '{0}.save_runner_load'.format(opts['master_job_cache'])
instance.returners[fstr](data['jid'], ret_load)
except KeyError:
log.debug(
'The specified returner used for the master job cache '
'"{0}" does not have a save_runner_load function! The results '
'of this runner execution will not be stored.'.format(
opts['master_job_cache']
)
)
except Exception:
log.critical(
'The specified returner threw a stack trace:\n',
exc_info=True
)
if opts.get('async', False):
return data['jid']
else:
return ret
def master_call(self, **kwargs):
'''
Execute a runner function through the master network interface (eauth).
'''
load = kwargs
load['cmd'] = 'runner'
channel = salt.transport.Channel.factory(self.opts,
crypt='clear',
usage='master_call')
ret = channel.send(load)
if isinstance(ret, collections.Mapping):
if 'error' in ret:
raise_error(**ret['error'])
return ret
def _reformat_low(self, low):
'''
@ -243,7 +76,8 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
})
'''
reformatted_low = self._reformat_low(low)
return self.master_call(**reformatted_low)
return mixins.AsyncClientMixin.cmd_async(self, reformatted_low)
def cmd_sync(self, low, timeout=None):
'''
@ -262,24 +96,19 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
})
'''
reformatted_low = self._reformat_low(low)
job = self.master_call(**reformatted_low)
ret_tag = tagify('ret', base=job['tag'])
if timeout is None:
timeout = 300
ret = self.event.get_event(tag=ret_tag, full=True, wait=timeout)
if ret is None:
raise salt.exceptions.SaltClientTimeout(
"RunnerClient job '{0}' timed out".format(job['jid']),
jid=job['jid'])
return ret['data']['return']
return mixins.SyncClientMixin.cmd_sync(self, reformatted_low)
class Runner(RunnerClient):
'''
Execute the salt runner interface
'''
def __init__(self, opts):
super(Runner, self).__init__(opts)
self.returners = salt.loader.returners(opts, self.functions)
self.outputters = salt.loader.outputters(opts)
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
def print_docs(self):
'''
Print out the documentation!
@ -290,6 +119,7 @@ class Runner(RunnerClient):
display_output('{0}:'.format(fun), 'text', self.opts)
print(docs[fun])
# TODO: move to mixin whenever we want a salt-wheel cli
def run(self):
'''
Execute the runner sequence
@ -299,70 +129,34 @@ class Runner(RunnerClient):
self.print_docs()
else:
try:
low = {'fun': self.opts['fun']}
args, kwargs = salt.minion.load_args_and_kwargs(
self.functions[low['fun']],
salt.utils.args.parse_input(self.opts['arg']),
)
low['args'] = args
low['kwargs'] = kwargs
async_pub = self.async(self.opts['fun'], low)
# Run the runner!
jid = super(Runner, self).cmd(
self.opts['fun'], self.opts['arg'], self.opts)
if self.opts.get('async', False):
log.info('Running in async mode. Results of this execution may '
'be collected by attaching to the master event bus or '
'by examing the master job cache, if configured.')
rets = self.get_runner_returns(jid)
else:
rets = [jid]
# Gather the returns
for ret in rets:
'by examing the master job cache, if configured. '
'This execution is running under tag {tag}'.format(**async_pub))
return async_pub['jid'] # return the jid
# output rets if you have some
for suffix, event in self.get_async_returns(async_pub['tag'], event=self.event):
if not self.opts.get('quiet', False):
if isinstance(ret, dict) and 'outputter' in ret and ret['outputter'] is not None:
print(self.outputters[ret['outputter']](ret['data']))
else:
salt.output.display_output(ret, '', self.opts)
self.print_async_event(suffix, event)
if suffix == 'ret':
ret = event['return']
except salt.exceptions.SaltException as exc:
ret = str(exc)
print(ret)
if not self.opts.get('quiet', False):
print(ret)
return ret
log.debug('Runner return: {0}'.format(ret))
return ret
def get_runner_returns(self, jid, timeout=None):
'''
Gather the return data from the event system, break hard when timeout
is reached.
'''
if timeout is None:
timeout = self.opts['timeout'] * 2
timeout_at = time.time() + timeout
last_progress_timestamp = time.time()
while True:
raw = self.event.get_event(timeout, full=True)
time.sleep(0.1)
# If we saw no events in the event bus timeout
# OR
# we have reached the total timeout
# AND
# have not seen any progress events for the length of the timeout.
if raw is None and (time.time() > timeout_at and
time.time() - last_progress_timestamp > timeout):
# Timeout reached
break
try:
if not raw['tag'].split('/')[1] == 'runner' and raw['tag'].split('/')[2] == jid:
continue
elif raw['tag'].split('/')[3] == 'progress' and raw['tag'].split('/')[2] == jid:
last_progress_timestamp = time.time()
yield {'data': raw['data']['data'], 'outputter': raw['data']['outputter']}
elif raw['tag'].split('/')[3] == 'return' and raw['tag'].split('/')[2] == jid:
yield raw['data']['return']
break
# Handle a findjob that might have been kicked off under the covers
elif raw['data']['fun'] == 'saltutil.findjob':
timeout_at = timeout_at + 10
continue
except (IndexError, KeyError):
continue
def _progress_print(text, *args, **kwargs):
print(text)

View File

@ -39,10 +39,10 @@ def active(outputter=None, display_progress=False):
client = salt.client.get_local_client(__opts__['conf_file'])
active_ = client.cmd('*', 'saltutil.running', timeout=__opts__['timeout'])
if display_progress:
__progress__('Attempting to contact minions: {0}'.format(list(active_.keys())))
__jid_event__.fire_event({'message': 'Attempting to contact minions: {0}'.format(active_.keys())}, 'progress')
for minion, data in active_.items():
if display_progress:
__progress__('Received reply from minion {0}'.format(minion))
__jid_event__.fire_event({'message': 'Received reply from minion {0}'.format(minion)}, 'progress')
if not isinstance(data, list):
continue
for job in data:
@ -105,7 +105,7 @@ def lookup_jid(jid,
mminion = salt.minion.MasterMinion(__opts__)
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
if display_progress:
__progress__('Querying returner: {0}'.format(returner))
__jid_event__.fire_event({'message': 'Querying returner: {0}'.format(returner)}, 'progress')
try:
data = mminion.returners['{0}.get_jid'.format(returner)](jid)
@ -113,7 +113,7 @@ def lookup_jid(jid,
return 'Requested returner could not be loaded. No JIDs could be retrieved.'
for minion in data:
if display_progress:
__progress__(minion)
__jid_event__.fire_event({'message': minion}, 'progress')
if u'return' in data[minion]:
ret[minion] = data[minion].get(u'return')
else:
@ -170,7 +170,7 @@ def list_jobs(ext_source=None,
'''
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
if display_progress:
__progress__('Querying returner {0} for jobs.'.format(returner))
__jid_event__.fire_event({'message': 'Querying returner {0} for jobs.'.format(returner)}, 'progress')
mminion = salt.minion.MasterMinion(__opts__)
try:
@ -314,5 +314,5 @@ def _walk_through(job_dir, display_progress=False):
job = serial.load(salt.utils.fopen(load_path, 'rb'))
jid = job['jid']
if display_progress:
__progress__('Found JID {0}'.format(jid))
__jid_event__.fire_event({'message': 'Found JID {0}'.format(jid)}, 'progress')
yield jid, job, t_path, final

View File

@ -93,7 +93,7 @@ def find_guest(name, quiet=False):
for x in 'running', 'frozen', 'stopped':
if name in l[x]:
if not quiet:
__progress__(host, outputter='lxc_find_host')
__jid_event__.fire_event({'data': host, 'outputter': 'lxc_find_host'}, 'progress')
return host
return None
@ -332,7 +332,7 @@ def init(names, host=None, saltcloud_mode=False, quiet=False, **kwargs):
if not done:
ret['result'] = False
if not quiet:
__progress__(ret)
__jid_event__.fire_event({'message': ret}, 'progress')
return ret
@ -395,7 +395,7 @@ def list_(host=None, quiet=False):
for chunk in it:
ret.update(chunk)
if not quiet:
__progress__(chunk, outputter='lxc_list')
__jid_event__.fire_event({'data': chunk, 'outputter': 'lxc_list'}, 'progress')
return ret
@ -420,7 +420,7 @@ def purge(name, delete_key=True, quiet=False):
return
if not quiet:
__progress__(data, outputter='lxc_purge')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_purge'}, 'progress')
return data
@ -434,7 +434,7 @@ def start(name, quiet=False):
'''
data = _do_names(name, 'start')
if data and not quiet:
__progress__(data, outputter='lxc_start')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_start'}, 'progress')
return data
@ -448,7 +448,7 @@ def stop(name, quiet=False):
'''
data = _do_names(name, 'stop')
if data and not quiet:
__progress__(data, outputter='lxc_force_off')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_force_off'}, 'progress')
return data
@ -462,7 +462,7 @@ def freeze(name, quiet=False):
'''
data = _do_names(name, 'freeze')
if data and not quiet:
__progress__(data, outputter='lxc_pause')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_pause'}, 'progress')
return data
@ -476,7 +476,7 @@ def unfreeze(name, quiet=False):
'''
data = _do_names(name, 'unfreeze')
if data and not quiet:
__progress__(data, outputter='lxc_resume')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_resume'}, 'progress')
return data
@ -490,5 +490,5 @@ def info(name, quiet=False):
'''
data = _do_names(name, 'info')
if data and not quiet:
__progress__(data, outputter='lxc_info')
__jid_event__.fire_event({'data': data, 'outputter': 'lxc_info'}, 'progress')
return data

View File

@ -48,7 +48,7 @@ def status(output=True):
ret['up'] = sorted(minions)
ret['down'] = sorted(set(keys['minions']) - set(minions))
if output:
__progress__(ret)
__jid_event__.fire_event({'message': ret}, 'progress')
return ret
@ -467,7 +467,7 @@ def safe_accept(target, expr_form='glob'):
print(message)
print('')
__progress__('Accepted {0:d} keys'.format(len(ret)))
__jid_event__.fire_event({'message': 'Accepted {0:d} keys'.format(len(ret))}, 'progress')
return ret, failures

View File

@ -33,7 +33,7 @@ def wollist(maclist, bcast='255.255.255.255', destport=9):
print('Waking up {0}'.format(mac.strip()))
ret.append(mac)
except Exception as err:
__progress__('Failed to open the MAC file. Error: {0}'.format(err))
__jid_event__.fire_event({'error': 'Failed to open the MAC file. Error: {0}'.format(err)}, 'progress')
return []
return ret
@ -66,5 +66,4 @@ def wol(mac, bcast='255.255.255.255', destport=9):
('\\x' + mac[8:10]).decode('string_escape') + \
('\\x' + mac[10:12]).decode('string_escape')
sock.sendto('\xff' * 6 + dest * 16, (bcast, int(destport)))
__progress__('Sent magic packet to minion.')
return True

View File

@ -30,7 +30,7 @@ def show_top(minion=None, saltenv='base'):
top, errors = pillar.get_top()
if errors:
__progress__(errors, outputter='nested')
__jid_event__.fire_event({'data': errors, 'outputter': 'nested'}, 'progress')
return errors
return top

View File

@ -182,7 +182,7 @@ def process_queue(queue, quantity=1, backend='sqlite'):
items = pop(queue=queue, quantity=quantity, backend=backend)
except SaltInvocationError as exc:
error_txt = '{0}'.format(exc)
__progress__(error_txt)
__jid_event__.fire_event({'errors': error_txt}, 'progress')
return False
data = {'items': items,

View File

@ -56,19 +56,19 @@ def over(saltenv='base', os_fn=None):
for stage in overstate.stages_iter():
if isinstance(stage, dict):
# This is highstate data
__progress__('Stage execution results:')
__jid_event__.fire_event({'message': 'Stage execution results:'}, 'progress')
for key, val in stage.items():
if '_|-' in key:
__progress__({'error': {key: val}}, outputter='highstate')
__jid_event__.fire_event({'data': {'error': {key: val}}, 'outputter': 'highstate'}, 'progress')
else:
__progress__({key: val}, outputter='highstate')
__jid_event__.fire_event({'data': {key: val}, 'outputter': 'highstate'}, 'progress')
elif isinstance(stage, list):
# This is a stage
if stage_num == 0:
__progress__('Executing the following Over State:')
__jid_event__.fire_event({'message': 'Executing the following Over State:'}, 'progress')
else:
__progress__('Executed Stage:')
__progress__(stage, outputter='overstatestage')
__jid_event__.fire_event({'message': 'Executed Stage:'}, 'progress')
__jid_event__.fire_event({'data': stage, 'outputter': 'overstatestage'}, 'progress')
stage_num += 1
return overstate.over_run
@ -113,7 +113,7 @@ def orchestrate(mods, saltenv='base', test=None, exclude=None, pillar=None):
exclude,
pillar=pillar)
ret = {minion.opts['id']: running}
__progress__(ret, outputter='highstate')
__jid_event__.fire_event({'data': ret, 'outputter': 'highstate'}, 'progress')
return ret
# Aliases for orchestrate runner
@ -135,7 +135,7 @@ def show_stages(saltenv='base', os_fn=None):
salt-run state.show_stages saltenv=dev /root/overstate.sls
'''
overstate = salt.overstate.OverState(__opts__, saltenv, os_fn)
__progress__(overstate.over, outputter='overstatestage')
__jid_event__.fire_event({'data': overstate.over, 'outputter': 'overstatestage'}, 'progress')
return overstate.over

View File

@ -3,6 +3,7 @@
This runner is used only for test purposes and servers no production purpose
'''
from __future__ import absolute_import
from __future__ import print_function
# Import python libs
import time
import salt.ext.six as six
@ -36,12 +37,29 @@ def raw_arg(*args, **kwargs):
return ret
def stdout_print():
'''
Print 'foo' and return 'bar'
'''
print ('foo')
return 'bar'
def sleep(s_time=10):
'''
Sleep t seconds, then return True
'''
print (s_time)
time.sleep(s_time)
return True
def stream():
'''
Return True
'''
ret = True
for i in range(1, 100):
__progress__('Runner is {0}% done'.format(i), outputter='pprint')
__jid_event__.fire_event({'message': 'Runner is {0}% done'.format(i)}, 'progress')
time.sleep(0.1)
return ret

View File

@ -30,4 +30,4 @@ def generate(extra_mods='', overwrite=False, so_mods=''):
salt-run thin.generate mako,wempy 1
salt-run thin.generate overwrite=1
'''
__progress__(salt.utils.thin.gen_thin(__opts__['cachedir'], extra_mods, overwrite, so_mods))
return salt.utils.thin.gen_thin(__opts__['cachedir'], extra_mods, overwrite, so_mods)

View File

@ -47,7 +47,7 @@ def _find_vm(name, data, quiet=False):
if name in data[hv_].get('vm_info', {}):
ret = {hv_: {name: data[hv_]['vm_info'][name]}}
if not quiet:
__progress__(ret, outputter='nested')
__jid_event__.fire_event({'data': ret, 'outputter': 'nested'}, 'progress')
return ret
return {}
@ -82,7 +82,7 @@ def query(hyper=None, quiet=False):
chunk[id_] = info[id_]['ret']
ret.update(chunk)
if not quiet:
__progress__(chunk, outputter='virt_query')
__jid_event__.fire_event({'data': chunk, 'outputter': 'virt_query'}, 'progress')
return ret
@ -124,7 +124,7 @@ def list(hyper=None, quiet=False): # pylint: disable=redefined-builtin
chunk[id_] = data
ret.update(chunk)
if not quiet:
__progress__(chunk, outputter='virt_list')
__jid_event__.fire_event({'data': chunk, 'outputter': 'virt_list'}, 'progress')
return ret
@ -148,7 +148,7 @@ def hyper_info(hyper=None):
for id_ in data:
if 'vm_info' in data[id_]:
data[id_].pop('vm_info')
__progress__(data, outputter='nested')
__jid_event__.fire_event({'data': data, 'outputter': 'nested'}, 'progress')
return data
@ -197,30 +197,30 @@ def init(
Set to False to prevent Salt from installing a minion on the new vm
before it spins up.
'''
__progress__('Searching for Hypervisors')
__jid_event__.fire_event({'message': 'Searching for Hypervisors'}, 'progress')
data = query(hyper, quiet=True)
# Check if the name is already deployed
for hyper in data:
if 'vm_info' in data[hyper]:
if name in data[hyper]['vm_info']:
__progress__('Virtual machine {0} is already deployed'.format(name))
__jid_event__.fire_event({'message': 'Virtual machine {0} is already deployed'.format(name)}, 'progress')
return 'fail'
if hyper is None:
hyper = _determine_hyper(data)
if hyper not in data or not hyper:
__progress__('Hypervisor {0} was not found'.format(hyper))
__jid_event__.fire_event({'message': 'Hypervisor {0} was not found'.format(hyper)}, 'progress')
return 'fail'
if seed:
__progress__('Minion will be preseeded')
__jid_event__.fire_event({'message': 'Minion will be preseeded'}, 'progress')
kv_ = salt.utils.virt.VirtKey(hyper, name, __opts__)
kv_.authorize()
client = salt.client.get_local_client(__opts__['conf_file'])
__progress__('Creating VM {0} on hypervisor {1}'.format(name, hyper))
__jid_event__.fire_event({'message': 'Creating VM {0} on hypervisor {1}'.format(name, hyper)}, 'progress')
cmd_ret = client.cmd_iter(
hyper,
'virt.init',
@ -237,10 +237,10 @@ def init(
ret = next(cmd_ret)
if not ret:
__progress__('VM {0} was not initialized.'.format(name))
__jid_event__.fire_event({'message': 'VM {0} was not initialized.'.format(name)}, 'progress')
return 'fail'
__progress__('VM {0} initialized on hypervisor {1}'.format(name, hyper))
__jid_event__.fire_event({'message': 'VM {0} initialized on hypervisor {1}'.format(name, hyper)}, 'progress')
return 'good'
@ -260,7 +260,7 @@ def reset(name):
client = salt.client.get_local_client(__opts__['conf_file'])
data = vm_info(name, quiet=True)
if not data:
__progress__('Failed to find vm {0} to reset'.format(name))
__jid_event__.fire_event({'message': 'Failed to find vm {0} to reset'.format(name)}, 'progress')
return 'fail'
hyper = next(data.iterkeys())
cmd_ret = client.cmd_iter(
@ -270,7 +270,7 @@ def reset(name):
timeout=600)
for comp in cmd_ret:
ret.update(comp)
__progress__('Reset VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Reset VM {0}'.format(name)}, 'progress')
return ret
@ -282,7 +282,7 @@ def start(name):
client = salt.client.get_local_client(__opts__['conf_file'])
data = vm_info(name, quiet=True)
if not data:
__progress__('Failed to find vm {0} to start'.format(name))
__jid_event__.fire_event({'message': 'Failed to find vm {0} to start'.format(name)}, 'progress')
return 'fail'
hyper = next(data.iterkeys())
if data[hyper][name]['state'] == 'running':
@ -295,7 +295,7 @@ def start(name):
timeout=600)
for comp in cmd_ret:
ret.update(comp)
__progress__('Started VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Started VM {0}'.format(name)}, 'progress')
return 'good'
@ -320,7 +320,7 @@ def force_off(name):
timeout=600)
for comp in cmd_ret:
ret.update(comp)
__progress__('Powered off VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Powered off VM {0}'.format(name)}, 'progress')
return 'good'
@ -332,7 +332,7 @@ def purge(name, delete_key=True):
client = salt.client.get_local_client(__opts__['conf_file'])
data = vm_info(name, quiet=True)
if not data:
__progress__('Failed to find vm {0} to purge'.format(name))
__jid_event__.fire_event({'error': 'Failed to find vm {0} to purge'.format(name)}, 'progress')
return 'fail'
hyper = next(data.iterkeys())
cmd_ret = client.cmd_iter(
@ -346,7 +346,7 @@ def purge(name, delete_key=True):
if delete_key:
skey = salt.key.Key(__opts__)
skey.delete_key(name)
__progress__('Purged VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Purged VM {0}'.format(name)}, 'progress')
return 'good'
@ -359,11 +359,11 @@ def pause(name):
data = vm_info(name, quiet=True)
if not data:
__progress__('Failed to find VM {0} to pause'.format(name))
__jid_event__.fire_event({'error': 'Failed to find VM {0} to pause'.format(name)}, 'progress')
return 'fail'
hyper = next(data.iterkeys())
if data[hyper][name]['state'] == 'paused':
__progress__('VM {0} is already paused'.format(name))
__jid_event__.fire_event({'error': 'VM {0} is already paused'.format(name)}, 'progress')
return 'bad state'
cmd_ret = client.cmd_iter(
hyper,
@ -372,7 +372,7 @@ def pause(name):
timeout=600)
for comp in cmd_ret:
ret.update(comp)
__progress__('Paused VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Paused VM {0}'.format(name)}, 'progress')
return 'good'
@ -384,11 +384,11 @@ def resume(name):
client = salt.client.get_local_client(__opts__['conf_file'])
data = vm_info(name, quiet=True)
if not data:
__progress__('Failed to find VM {0} to pause'.format(name))
__jid_event__.fire_event({'error': 'Failed to find VM {0} to pause'.format(name)}, 'progress')
return 'not found'
hyper = next(data.iterkeys())
if data[hyper][name]['state'] != 'paused':
__progress__('VM {0} is not paused'.format(name))
__jid_event__.fire_event({'error': 'VM {0} is not paused'.format(name)}, 'progress')
return 'bad state'
cmd_ret = client.cmd_iter(
hyper,
@ -397,7 +397,7 @@ def resume(name):
timeout=600)
for comp in cmd_ret:
ret.update(comp)
__progress__('Resumed VM {0}'.format(name))
__jid_event__.fire_event({'message': 'Resumed VM {0}'.format(name)}, 'progress')
return 'good'
@ -412,16 +412,16 @@ def migrate(name, target=''):
try:
origin_hyper = list(origin_data.keys())[0]
except IndexError:
__progress__('Named vm {0} was not found to migrate'.format(name))
__jid_event__.fire_event({'error': 'Named vm {0} was not found to migrate'.format(name)}, 'progress')
return ''
disks = origin_data[origin_hyper][name]['disks']
if not origin_data:
__progress__('Named vm {0} was not found to migrate'.format(name))
__jid_event__.fire_event({'error': 'Named vm {0} was not found to migrate'.format(name)}, 'progress')
return ''
if not target:
target = _determine_hyper(data, origin_hyper)
if target not in data:
__progress__('Target hypervisor {0} not found'.format(origin_data))
__jid_event__.fire_event({'error': 'Target hypervisor {0} not found'.format(origin_data)}, 'progress')
return ''
client.cmd(target, 'virt.seed_non_shared_migrate', [disks, True])
jid = client.cmd_async(origin_hyper,
@ -432,4 +432,4 @@ def migrate(name, target=''):
'and can be tracked via jid {2}. The ``salt-run virt.query`` '
'runner can also be used, the target vm will be shown as paused '
'until the migration is complete.').format(name, target, jid)
__progress__(msg)
__jid_event__.fire_event({'message': msg}, 'progress')

View File

@ -53,7 +53,7 @@ def genrepo():
# when log.debug works
log.debug('Failed to compile'
'{0}: {1}'.format(os.path.join(root, name), exc))
__progress__('Failed to compile {0}: {1}'.format(os.path.join(root, name), exc))
__jid_event__.fire_event({'error': 'Failed to compile {0}: {1}'.format(os.path.join(root, name), exc)}, 'progress')
if config:
revmap = {}
for pkgname, versions in config.items():
@ -64,7 +64,7 @@ def genrepo():
if not isinstance(repodata, dict):
log.debug('Failed to compile'
'{0}.'.format(os.path.join(root, name)))
__progress__('Failed to compile {0}.'.format(os.path.join(root, name)))
__jid_event__.fire_event({'error': 'Failed to compile {0}.'.format(os.path.join(root, name))}, 'progress')
continue
revmap[repodata['full_name']] = pkgname
ret.setdefault('repo', {}).update(config)

View File

@ -1492,7 +1492,8 @@ class State(object):
state_func_name = '{0[state]}.{0[fun]}'.format(low)
cdata = salt.utils.format_call(
self.states[state_func_name], low,
self.states[state_func_name],
low,
initial_ret={'full': state_func_name},
expected_extra_kws=STATE_INTERNAL_KEYWORDS
)

View File

@ -133,17 +133,6 @@ def get_master_event(opts, sock_dir, listen=True):
opts=opts, sock_dir=sock_dir, listen=listen)
def get_runner_event(opts, jid, listen=True):
'''
Return an event object suitable for the named transport
'''
if opts['transport'] == 'zeromq':
return RunnerEvent(opts, jid)
elif opts['transport'] == 'raet':
import salt.utils.raetevent
return salt.utils.raetevent.RunnerEvent(opts, jid, listen=listen)
def tagify(suffix='', prefix='', base=SALT):
'''
convenience function to build a namespaced event tag string
@ -186,6 +175,10 @@ class SaltEvent(object):
self.pending_events = []
self.__load_cache_regex()
# since ZMQ connect() has no guarantees about the socket actually being
# connected this is a hack to attempt to do so.
self.get_event(wait=1)
@classmethod
def __load_cache_regex(cls):
'''
@ -573,7 +566,12 @@ class SaltEvent(object):
pass
def __del__(self):
self.destroy()
# skip exceptions in destroy-- since destroy() doesn't cover interpreter
# shutdown-- where globals start going missing
try:
self.destroy()
except Exception as ex:
log.debug(ex)
class MasterEvent(SaltEvent):
@ -595,23 +593,16 @@ class LocalClientEvent(MasterEvent):
'''
class RunnerEvent(MasterEvent):
class NamespacedEvent(object):
'''
Warning! Use the get_runner_event function or the code will not be
RAET compatible
This is used to send progress and return events from runners.
It extends MasterEvent to include information about how to
display events to the user as a runner progresses.
A wrapper for sending events within a specific base namespace
'''
def __init__(self, opts, jid):
super(RunnerEvent, self).__init__(opts['sock_dir'])
self.jid = jid
def __init__(self, event, base):
self.event = event
self.base = base
def fire_progress(self, data, outputter='pprint'):
progress_event = {'data': data,
'outputter': outputter}
self.fire_event(
progress_event, tagify([self.jid, 'progress'], 'runner'))
def fire_event(self, data, tag):
self.event.fire_event(data, tagify(tag, base=self.base))
class MinionEvent(SaltEvent):

View File

@ -259,22 +259,6 @@ class MasterEvent(RAETEvent):
super(MasterEvent, self).__init__('master', opts=opts, sock_dir=sock_dir, listen=listen)
class RunnerEvent(MasterEvent):
'''
This is used to send progress and return events from runners.
It extends MasterEvent to include information about how to
display events to the user as a runner progresses.
'''
def __init__(self, opts, jid, listen=True):
super(RunnerEvent, self).__init__(opts=opts, sock_dir=opts['sock_dir'], listen=listen)
self.jid = jid
def fire_progress(self, data, outputter='pprint'):
progress_event = {'data': data,
'outputter': outputter}
self.fire_event(progress_event, salt.utils.event.tagify([self.jid, 'progress'], 'runner'))
class PresenceEvent(MasterEvent):
def __init__(self, opts, sock_dir, listen=True, state=None):

View File

@ -130,6 +130,9 @@ class Reactor(multiprocessing.Process, salt.state.Compiler):
self.wrap = ReactWrap(self.opts)
for data in self.event.iter_events(full=True):
# skip all events fired by ourselves
if data['data'].get('user') == self.wrap.event_user:
continue
reactors = self.list_reactors(data['tag'])
if not reactors:
continue
@ -147,6 +150,7 @@ class ReactWrap(object):
'''
# class-wide cache of clients
client_cache = None
event_user = 'Reactor'
def __init__(self, opts):
self.opts = opts
@ -166,7 +170,13 @@ class ReactWrap(object):
l_fun = getattr(self, low['state'])
try:
f_call = salt.utils.format_call(l_fun, low)
l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
kwargs = f_call.get('kwargs', {})
# TODO: pick one...
kwargs['__user__'] = self.event_user
kwargs['user'] = self.event_user
l_fun(*f_call.get('args', ()), **kwargs)
except Exception:
log.error(
'Failed to execute {0}: {1}\n'.format(low['state'], l_fun),

View File

@ -4,19 +4,15 @@ Modules used to control the master itself
'''
from __future__ import absolute_import
#import python libs
import collections
import os
import time
import collections
# Import salt libs
from salt import syspaths
import salt.config
import salt.loader
import salt.payload
import salt.utils
from salt.client import mixins
from salt.utils.error import raise_error
from salt.utils.event import tagify
class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
@ -35,35 +31,18 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
tag_prefix = 'wheel'
def __init__(self, opts=None):
if not opts:
opts = salt.config.client_config(
os.environ.get(
'SALT_MASTER_CONFIG',
os.path.join(syspaths.CONFIG_DIR, 'master')
)
)
self.opts = opts
self.functions = salt.loader.wheels(opts)
def cmd(self, fun, **kwargs):
# TODO: remove/deprecate
def call_func(self, fun, **kwargs):
'''
Execute a wheel function
.. code-block:: python
>>> opts = salt.config.master_config('/etc/salt/master')
>>> wheel = salt.wheel.Wheel(opts)
>>> wheel.call_func('key.list_all')
{'local': ['master.pem', 'master.pub'],
'minions': ['jerry'],
'minions_pre': [],
'minions_rejected': []}
Backwards compatibility
'''
return self.low(fun, kwargs)
call_func = cmd # alias for backward-compat
# TODO: Inconsistent with runner client-- the runner client's master_call gives
# an async return, unlike this
def master_call(self, **kwargs):
'''
Execute a wheel function through the master network interface (eauth).
@ -71,7 +50,7 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
load = kwargs
load['cmd'] = 'wheel'
master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \
':' + str(self.opts['ret_port'])
':' + str(self.opts['ret_port'])
channel = salt.transport.Channel.factory(self.opts,
crypt='clear',
master_uri=master_uri)
@ -90,23 +69,25 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
.. code-block:: python
>>> wheel.cmd_sync({
'fun': 'key.finger',
'match': 'jerry',
'eauth': 'auto',
'username': 'saltdev',
'password': 'saltdev',
})
{'minions': {'jerry': '5d:f6:79:43:5e:d4:42:3f:57:b8:45:a8:7e:a4:6e:ca'}}
>>> wheel.cmd_sync({
'fun': 'key.finger',
'match': 'jerry',
'eauth': 'auto',
'username': 'saltdev',
'password': 'saltdev',
})
{'minions': {'jerry': '5d:f6:79:43:5e:d4:42:3f:57:b8:45:a8:7e:a4:6e:ca'}}
'''
return self.master_call(**low)
# TODO: Inconsistent with runner client-- that one uses the master_call function
# and runs within the master daemon. Need to pick one...
def cmd_async(self, low):
'''
Execute a wheel function asynchronously; eauth is respected
Execute a function asynchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized to execute runner functions: (``@wheel``).
and the user is authorized
.. code-block:: python
@ -122,4 +103,5 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
fun = low.pop('fun')
return self.async(fun, low)
Wheel = WheelClient # for backward-compat

View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import time
import shutil
import tempfile
import os
from contextlib import contextmanager
import integration
from salt.utils.process import clean_proc
from salt.utils import event
from salttesting.mock import patch
@contextmanager
def reactor_process(opts, reactor):
opts = dict(opts)
opts['reactor'] = reactor
proc = event.Reactor(opts)
proc.start()
try:
if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
# Travis is slow
time.sleep(10)
else:
time.sleep(2)
yield
finally:
clean_proc(proc)
@contextmanager
def _args_sideffect(*args, **kwargs):
return args, kwargs
class TestReactor(integration.ModuleCase):
def setUp(self):
self.opts = self.get_config('master', from_scratch=True)
self.tempdir = tempfile.mkdtemp(dir=integration.SYS_TMP_DIR)
self.sls_name = os.path.join(self.tempdir, 'test.sls')
with open(self.sls_name, 'w') as fh:
fh.write('''
update_fileserver:
runner.fileserver.update
''')
def tearDown(self):
if os.path.isdir(self.tempdir):
shutil.rmtree(self.tempdir)
def test_basic(self):
reactor_config = [
{'salt/tagA': ['/srv/reactor/A.sls']},
{'salt/tagB': ['/srv/reactor/B.sls']},
{'*': ['/srv/reactor/all.sls']},
]
wrap = event.ReactWrap(self.opts)
with patch('salt.utils.event.ReactWrap.local', _args_sideffect):
ret = wrap.run({'fun': 'test.ping',
'state': 'local',
'order': 1,
'name': 'foo_action',
'__id__': 'foo_action'})
raise Exception(ret)