Change Master scheduler to use new ClientFuncsDict instaed of runner funcs directly

With the change to forcing all runner calls through low() we forgot about the scheduler which calls functions directly. This adds an API to get that same interface.

Also means we are missing test coverage on the scheduler, since it was non-stop backtracing on the master.
This commit is contained in:
Thomas Jackson 2015-01-30 10:36:09 -08:00
parent a4b95be0b0
commit b6741d89b8
3 changed files with 75 additions and 4 deletions

View File

@ -37,12 +37,71 @@ CLIENT_INTERNAL_KEYWORDS = frozenset([
])
class ClientFuncsDict(collections.MutableMapping):
'''
Class to make a read-only dict for accessing runner funcs "directly"
'''
def __init__(self, client):
self.client = client
def __setitem__(self, key, val):
raise NotImplementedError()
def __delitem__(self, key):
raise NotImplementedError()
def __getitem__(self, key):
'''
Return a function that you can call with regular func params, but
will do all the _proc_function magic
'''
if key not in self.client.functions:
raise KeyError
def wrapper(*args, **kwargs):
low = {'fun': key,
'args': args,
'kwargs': kwargs,
}
pub_data = {}
# pull out pub_data if you have it
for k, v in kwargs.items():
if k.startswith('__pub_'):
pub_data[k] = kwargs.pop(k)
async_pub = self.client._gen_async_pub(pub_data.get('jid'))
user = salt.utils.get_specific_user()
return self.client._proc_function(key,
low,
user,
async_pub['tag'], # TODO: fix
async_pub['jid'], # TODO: fix
False, # Don't daemonize
)
return wrapper
def __len__(self):
return len(self.client.functions)
def __iter__(self):
return iter(self.client.functions)
class SyncClientMixin(object):
'''
A mixin for *Client interfaces to abstract common function execution
'''
functions = ()
def functions_dict(self):
'''
Return a dict that will mimic the "functions" dict used all over salt.
It creates a wrapper around the function allowing **kwargs, and if pub_data
is passed in as kwargs, will re-use the JID passed in
'''
return ClientFuncsDict(self)
def _verify_fun(self, fun):
'''
Check that the function passed really exists
@ -337,8 +396,9 @@ class AsyncClientMixin(object):
'''
return self.master_call(**low)
def _gen_async_pub(self):
jid = salt.utils.jid.gen_jid()
def _gen_async_pub(self, jid=None):
if jid is None:
jid = salt.utils.jid.gen_jid()
tag = tagify(jid, prefix=self.tag_prefix)
return {'tag': tag, 'jid': jid}

View File

@ -167,12 +167,15 @@ class Maintenance(multiprocessing.Process):
# Init fileserver manager
self.fileserver = salt.fileserver.Fileserver(self.opts)
# Load Runners
self.runners = salt.loader.runner(self.opts)
ropts = dict(self.opts)
ropts['quiet'] = True
runner_client = salt.runner.RunnerClient(ropts)
# Load Returners
self.returners = salt.loader.returners(self.opts, {})
# Init Scheduler
self.schedule = salt.utils.schedule.Schedule(self.opts,
self.runners,
runner_client.functions_dict(),
returners=self.returners)
self.ckminions = salt.utils.minions.CkMinions(self.opts)
# Make Event bus for firing

View File

@ -220,6 +220,7 @@ import yaml
# Import Salt libs
import salt.utils
import salt.utils.process
import salt.utils.args
from salt.utils.odict import OrderedDict
from salt.utils.process import os_is_running
import salt.payload
@ -493,6 +494,13 @@ class Schedule(object):
kwargs = {}
if 'kwargs' in data:
kwargs = data['kwargs']
# if the func support **kwargs, lets pack in the pub data we have
# TODO: pack the *same* pub data as a minion?
argspec = salt.utils.args.get_function_argspec(self.functions[func])
if argspec.keywords:
# this function accepts **kwargs, pack in the publish data
for key, val in ret.iteritems():
kwargs['__pub_{0}'.format(key)] = val
try:
ret['return'] = self.functions[func](*args, **kwargs)