mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
Merge pull request #19 from whiteinge/batchclient
Remove batch execution from NetapiClient and Saltnado
This commit is contained in:
commit
c80b20b957
@ -97,20 +97,6 @@ class NetapiClient(object):
|
||||
local = salt.client.get_local_client(mopts=self.opts)
|
||||
return local.cmd(*args, **kwargs)
|
||||
|
||||
def local_batch(self, *args, **kwargs):
|
||||
'''
|
||||
Run :ref:`execution modules <all-salt.modules>` against batches of minions
|
||||
|
||||
.. versionadded:: 0.8.4
|
||||
|
||||
Wraps :py:meth:`salt.client.LocalClient.cmd_batch`
|
||||
|
||||
:return: Returns the result from the exeuction module for each batch of
|
||||
returns
|
||||
'''
|
||||
local = salt.client.get_local_client(mopts=self.opts)
|
||||
return local.cmd_batch(*args, **kwargs)
|
||||
|
||||
def ssh(self, *args, **kwargs):
|
||||
'''
|
||||
Run salt-ssh commands synchronously
|
||||
|
@ -192,7 +192,6 @@ logger = logging.getLogger()
|
||||
# # all of these require coordinating minion stuff
|
||||
# - "local" (done)
|
||||
# - "local_async" (done)
|
||||
# - "local_batch" (done)
|
||||
|
||||
# # master side
|
||||
# - "runner" (done)
|
||||
@ -214,7 +213,6 @@ class SaltClientsMixIn(object):
|
||||
SaltClientsMixIn.__saltclients = {
|
||||
'local': local_client.run_job,
|
||||
# not the actual client we'll use.. but its what we'll use to get args
|
||||
'local_batch': local_client.cmd_batch,
|
||||
'local_async': local_client.run_job,
|
||||
'runner': salt.runner.RunnerClient(opts=self.application.opts).cmd_async,
|
||||
'runner_async': None, # empty, since we use the same client as `runner`
|
||||
@ -355,30 +353,6 @@ class EventListener(object):
|
||||
del self.timeout_map[future]
|
||||
|
||||
|
||||
# TODO: move to a utils function within salt-- the batching stuff is a bit tied together
|
||||
def get_batch_size(batch, num_minions):
|
||||
'''
|
||||
Return the batch size that you should have
|
||||
batch: string
|
||||
num_minions: int
|
||||
|
||||
'''
|
||||
# figure out how many we can keep in flight
|
||||
partition = lambda x: float(x) / 100.0 * num_minions
|
||||
try:
|
||||
if '%' in batch:
|
||||
res = partition(float(batch.strip('%')))
|
||||
if res < 1:
|
||||
return int(math.ceil(res))
|
||||
else:
|
||||
return int(res)
|
||||
else:
|
||||
return int(batch)
|
||||
except ValueError:
|
||||
print(('Invalid batch data sent: {0}\nData must be in the form'
|
||||
'of %10, 10% or 3').format(batch))
|
||||
|
||||
|
||||
class BaseSaltAPIHandler(tornado.web.RequestHandler, SaltClientsMixIn): # pylint: disable=W0223
|
||||
ct_out_map = (
|
||||
('application/json', json.dumps),
|
||||
@ -836,57 +810,6 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
|
||||
self.write(self.serialize({'return': ret}))
|
||||
self.finish()
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _disbatch_local_batch(self, chunk):
|
||||
'''
|
||||
Disbatch local client batched commands
|
||||
'''
|
||||
f_call = salt.utils.format_call(self.saltclients['local_batch'], chunk)
|
||||
|
||||
# ping all the minions (to see who we have to talk to)
|
||||
# Don't catch any exception, since we won't know what to do, we'll
|
||||
# let the upper level deal with this one
|
||||
ping_ret = yield self._disbatch_local({'tgt': chunk['tgt'],
|
||||
'fun': 'test.ping',
|
||||
'expr_form': f_call['kwargs']['expr_form']})
|
||||
|
||||
chunk_ret = {}
|
||||
|
||||
if not isinstance(ping_ret, dict):
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
minions = list(ping_ret.keys())
|
||||
|
||||
maxflight = get_batch_size(f_call['kwargs']['batch'], len(minions))
|
||||
inflight_futures = []
|
||||
|
||||
# override the expr_form
|
||||
f_call['kwargs']['expr_form'] = 'list'
|
||||
# do this batch
|
||||
while len(minions) > 0 or len(inflight_futures) > 0:
|
||||
# if you have more to go, lets disbatch jobs
|
||||
while len(inflight_futures) < maxflight and len(minions) > 0:
|
||||
minion_id = minions.pop(0)
|
||||
batch_chunk = dict(chunk)
|
||||
batch_chunk['tgt'] = [minion_id]
|
||||
batch_chunk['expr_form'] = 'list'
|
||||
future = self._disbatch_local(batch_chunk)
|
||||
inflight_futures.append(future)
|
||||
|
||||
# if we have nothing to wait for, don't wait
|
||||
if len(inflight_futures) == 0:
|
||||
continue
|
||||
|
||||
# wait until someone is done
|
||||
finished_future = yield Any(inflight_futures)
|
||||
try:
|
||||
b_ret = finished_future.result()
|
||||
except TimeoutException:
|
||||
break
|
||||
chunk_ret.update(b_ret)
|
||||
inflight_futures.remove(finished_future)
|
||||
|
||||
raise tornado.gen.Return(chunk_ret)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _disbatch_local(self, chunk):
|
||||
'''
|
||||
|
Loading…
Reference in New Issue
Block a user