Merge pull request #37878 from kstreee/2016.11

Makes threads avoid blocking waiting while communicating using Zeromq.
This commit is contained in:
Mike Place 2016-12-05 12:50:46 -07:00 committed by GitHub
commit 78295516e7
3 changed files with 103 additions and 16 deletions

View File

@ -336,6 +336,59 @@ class LocalClient(object):
return self._check_pub_data(pub_data)
@tornado.gen.coroutine
def run_job_async(
self,
tgt,
fun,
arg=(),
expr_form='glob',
ret='',
timeout=None,
jid='',
kwarg=None,
listen=True,
io_loop=None,
**kwargs):
'''
Asynchronously send a command to connected minions
Prep the job directory and publish a command to any targeted minions.
:return: A dictionary of (validated) ``pub_data`` or an empty
dictionary on failure. The ``pub_data`` contains the job ID and a
list of all minions that are expected to return data.
.. code-block:: python
>>> local.run_job_async('*', 'test.sleep', [300])
{'jid': '20131219215650131543', 'minions': ['jerry']}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
try:
pub_data = yield self.pub_async(
tgt,
fun,
arg,
expr_form,
ret,
jid=jid,
timeout=self._get_timeout(timeout),
io_loop=io_loop,
listen=listen,
**kwargs)
except SaltClientError:
# Re-raise error with specific message
raise SaltClientError(
'The salt master could not be contacted. Is master running?'
)
except Exception as general_exception:
# Convert to generic client error and pass along message
raise SaltClientError(general_exception)
raise tornado.gen.Return(self._check_pub_data(pub_data))
def cmd_async(
self,
tgt,

View File

@ -250,10 +250,10 @@ class SaltClientsMixIn(object):
local_client = salt.client.get_local_client(mopts=self.application.opts)
# TODO: refreshing clients using cachedict
SaltClientsMixIn.__saltclients = {
'local': local_client.run_job,
'local': local_client.run_job_async,
# not the actual client we'll use.. but its what we'll use to get args
'local_batch': local_client.cmd_batch,
'local_async': local_client.run_job,
'local_async': local_client.run_job_async,
'runner': salt.runner.RunnerClient(opts=self.application.opts).cmd_async,
'runner_async': None, # empty, since we use the same client as `runner`
}
@ -985,10 +985,10 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
'''
chunk_ret = {}
f_call = salt.utils.format_call(self.saltclients['local'], chunk)
f_call = self._format_call_run_job_async(chunk)
# fire a job off
try:
pub_data = self.saltclients['local'](*f_call.get('args', ()), **f_call.get('kwargs', {}))
pub_data = yield self.saltclients['local'](*f_call.get('args', ()), **f_call.get('kwargs', {}))
except EauthAuthenticationError:
raise tornado.gen.Return('Not authorized to run this job')
@ -1069,7 +1069,7 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
if minions_remaining is None:
minions_remaining = []
ping_pub_data = self.saltclients['local'](tgt,
ping_pub_data = yield self.saltclients['local'](tgt,
'saltutil.find_job',
[jid],
expr_form=tgt_type)
@ -1086,7 +1086,7 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
if not minion_running:
raise tornado.gen.Return(True)
else:
ping_pub_data = self.saltclients['local'](tgt,
ping_pub_data = yield self.saltclients['local'](tgt,
'saltutil.find_job',
[jid],
expr_form=tgt_type)
@ -1106,9 +1106,9 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
'''
Disbatch local client_async commands
'''
f_call = salt.utils.format_call(self.saltclients['local_async'], chunk)
f_call = self._format_call_run_job_async(chunk)
# fire a job off
pub_data = self.saltclients['local_async'](*f_call.get('args', ()), **f_call.get('kwargs', {}))
pub_data = yield self.saltclients['local_async'](*f_call.get('args', ()), **f_call.get('kwargs', {}))
raise tornado.gen.Return(pub_data)
@ -1135,6 +1135,12 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
pub_data = self.saltclients['runner'](chunk)
raise tornado.gen.Return(pub_data)
# salt.utils.format_call doesn't work for functions having the annotation tornado.gen.coroutine
def _format_call_run_job_async(self, chunk):
f_call = salt.utils.format_call(salt.client.LocalClient.run_job, chunk)
f_call.get('kwargs', {})['io_loop'] = tornado.ioloop.IOLoop.current()
return f_call
class MinionSaltAPIHandler(SaltAPIHandler): # pylint: disable=W0223
'''

View File

@ -113,7 +113,7 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# Recreate the message client because it will fail to be deep
# copied. The reason is the same as the io_loop skip above.
setattr(result, key,
AsyncReqMessageClient(result.opts,
AsyncReqMessageClientPool(result.opts,
self.master_uri,
io_loop=result._io_loop))
continue
@ -151,7 +151,7 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self._io_loop)
self.message_client = AsyncReqMessageClient(self.opts,
self.message_client = AsyncReqMessageClientPool(self.opts,
self.master_uri,
io_loop=self._io_loop,
)
@ -809,6 +809,34 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
context.term()
# TODO: unit tests!
class AsyncReqMessageClientPool(object):
def __init__(self, opts, addr, linger=0, io_loop=None, socket_pool=1):
self.opts = opts
self.addr = addr
self.linger = linger
self.io_loop = io_loop
self.socket_pool = socket_pool
self.message_clients = []
def destroy(self):
for message_client in self.message_clients:
message_client.destroy()
self.message_clients = []
def __del__(self):
self.destroy()
def send(self, message, timeout=None, tries=3, future=None, callback=None, raw=False):
if len(self.message_clients) < self.socket_pool:
message_client = AsyncReqMessageClient(self.opts, self.addr, self.linger, self.io_loop)
self.message_clients.append(message_client)
return message_client.send(message, timeout, tries, future, callback, raw)
else:
available_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
return available_clients[0].send(message, timeout, tries, future, callback, raw)
# TODO: unit tests!
class AsyncReqMessageClient(object):
'''