Refactored RunnerClient to use sync/async mixin classes

This commit is contained in:
Seth House 2014-07-02 18:40:16 -06:00
parent aa40d6af4d
commit fa168828dc

View File

@ -6,10 +6,7 @@ Execute salt convenience routines
# Import python libs
from __future__ import print_function
import collections
import datetime
import logging
import multiprocessing
import time
# Import salt libs
import salt.exceptions
@ -18,15 +15,15 @@ import salt.minion
import salt.utils
import salt.utils.args
import salt.utils.event
from salt.client import mixins
from salt.output import display_output
from salt.utils.doc import strip_rst as _strip_rst
from salt.utils.error import raise_error
from salt.utils.event import tagify
log = logging.getLogger(__name__)
class RunnerClient(object):
class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
'''
The interface used by the :command:`salt-run` CLI tool on the Salt Master
@ -41,65 +38,13 @@ class RunnerClient(object):
eauth user must be authorized to execute runner modules: (``@runner``).
Only the :py:meth:`master_call` below supports eauth.
'''
client = 'runner'
tag_prefix = 'run'
def __init__(self, opts):
self.opts = opts
self.functions = salt.loader.runner(opts)
def _proc_runner(self, fun, low, user, tag, jid):
'''
Run this method in a multiprocess target to execute the runner in a
multiprocess and fire the return data on the event bus
'''
salt.utils.daemonize()
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
listen=False)
data = {'fun': 'runner.{0}'.format(fun),
'jid': jid,
'user': user,
}
event.fire_event(data, tagify('new', base=tag))
try:
data['return'] = self.low(fun, low)
data['success'] = True
except Exception as exc:
data['return'] = 'Exception occurred in runner {0}: {1}: {2}'.format(
fun,
exc.__class__.__name__,
exc,
)
data['success'] = False
data['user'] = user
event.fire_event(data, tagify('ret', base=tag))
# this is a workaround because process reaping is defeating 0MQ linger
time.sleep(2.0) # delay so 0MQ event gets out before runner process reaped
def _verify_fun(self, fun):
'''
Check that the function passed really exists
'''
if fun not in self.functions:
err = 'Function {0!r} is unavailable'.format(fun)
raise salt.exceptions.CommandExecutionError(err)
def get_docs(self, arg=None):
'''
Return a dictionary of functions and the inline documentation for each
'''
if arg:
target_mod = arg + '.' if not arg.endswith('.') else arg
docs = [(fun, self.functions[fun].__doc__)
for fun in sorted(self.functions)
if fun == arg or fun.startswith(target_mod)]
else:
docs = [(fun, self.functions[fun].__doc__)
for fun in sorted(self.functions)]
docs = dict(docs)
return _strip_rst(docs)
def cmd(self, fun, arg, pub_data=None, kwarg=None):
'''
Execute a runner function
@ -175,36 +120,6 @@ class RunnerClient(object):
)
return self.functions[fun](*args, **kwargs)
def low(self, fun, low):
'''
Pass in the runner function name and the low data structure
.. code-block:: python
runner.low({'fun': 'jobs.lookup_jid', 'jid': '20131219215921857715'})
'''
self._verify_fun(fun)
l_fun = self.functions[fun]
f_call = salt.utils.format_call(l_fun, low)
ret = l_fun(*f_call.get('args', ()), **f_call.get('kwargs', {}))
return ret
def async(self, fun, low, user='UNKNOWN'):
'''
Execute the runner in a multiprocess and return the event tag to use
to watch for the return
'''
jid = '{0:%Y%m%d%H%M%S%f}'.format(datetime.datetime.now())
tag = tagify(jid, prefix='run')
#low['tag'] = tag
#low['jid'] = jid
proc = multiprocessing.Process(
target=self._proc_runner,
args=(fun, low, user, tag, jid))
proc.start()
return {'tag': tag, 'jid': jid}
def master_call(self, **kwargs):
'''
Execute a runner function through the master network interface (eauth).