salt.cloud: Better multiprocessing handling

This commit is contained in:
Mathieu Le Marec - Pasquet 2014-02-28 10:37:08 +00:00
parent fb3c89dd34
commit 08aa9a8566

View File

@ -8,10 +8,12 @@ correct cloud modules
from __future__ import print_function
import copy
import os
import traceback
import glob
import time
import signal
import logging
from pprint import pprint
import multiprocessing
from itertools import groupby
@ -47,6 +49,125 @@ except ImportError:
MAKO_AVAILABLE = False
def communicator(func):
'''Warning, this is a picklable decorator !'''
def _call(queue, args, kw):
'''called with [queue, args, kwargs] as first optionnal arg'''
kw['queue'] = queue
ret = None
try:
ret = func(*args, **kw)
queue.put('END')
except KeyboardInterrupt, ex:
trace = traceback.format_exc()
queue.put('KEYBOARDINT')
queue.put('Keyboard interrupt')
queue.put('{0}\n{1}\n'.format(ex, trace))
except Exception, ex:
trace = traceback.format_exc()
queue.put('ERROR')
queue.put('Exception')
queue.put('{0}\n{1}\n'.format(ex, trace))
return ret
return _call
def enter_mainloop(target,
mapped_args=None,
args=None,
kwargs=None,
pool=None,
pool_size=None,
callback=None,
queue=None):
'''Manage a multiprocessing pool
- If the queue does not output anything, the pool runs indefinitvly
- If the queue returns KEYBOARDINT or ERROR, this will kill the pool
totally calling terminate & join and ands with a SaltCloudSystemExit
exception notifying callers from the abnormal termination
- If the queue returns END or callback is defined and returns True,
it just join the process and return the data.
target
the function you want to execute in multiproccessing
pool
pool object can be None if you want a default pool, but you ll
have then to define pool_size instead
pool_size
pool size if you did not provide yourself a pool
callback
a boolean taking a string in argument which returns True to
signal that 'target' is finnished and we need to join
the pool
queue
A custom multiproccessing queue in case you want to do
extra stuff and need it later in your program
args
positionnal arguments to call the function with
if you dont want to use pool.map
mapped_args
a list of one or more arguments combinations to call the function with
eg (foo, [[1], [2]]) will call::
foo([1])
foo([2])
kwargs
kwargs to give to the function in case of process
Attention, the function must have the following signature:
target(queue, *args, **kw)
You may use the 'communicator' decorator to generate such a function
(see end of this file)
'''
if not kwargs:
kwargs = {}
if not pool_size:
pool_size = 1
if not pool:
pool = multiprocessing.Pool(pool_size)
if not queue:
manager = multiprocessing.Manager()
queue = manager.Queue()
if mapped_args is not None and not mapped_args:
msg = (
'We are called to asynchronously execute {0}'
' but we do no have anything to execute, weird,'
' we bail out'.format(target))
log.error(msg)
raise SaltCloudSystemExit('Exception caught\n{0}'.format(msg))
elif mapped_args is not None:
iterable = [[queue, [arg], kwargs] for arg in mapped_args]
ret = pool.map(func=target, iterable=iterable)
else:
ret = pool.apply(target, [queue, args, kwargs])
while True:
test = queue.get()
if test in ['ERROR', 'KEYBOARDINT']:
type_ = queue.get()
trace = queue.get()
msg = 'Caught {0}, terminating workers\n'.format(type_)
msg += 'TRACE: {0}\n'.format(trace)
log.error(msg)
pool.terminate()
pool.join()
raise SaltCloudSystemExit('Exception caught\n{0}'.format(msg))
elif test in ['END'] or (callback and callback(test)):
pool.close()
pool.join()
break
else:
time.sleep(0.125)
return ret
class CloudClient(object):
'''
The client class to wrap cloud interactions
@ -357,24 +478,11 @@ class Cloud(object):
output = {}
data_count = len(multiprocessing_data)
pool = multiprocessing.Pool(
data_count < 10 and data_count or 10,
init_pool_worker
)
try:
parallel_pmap = pool.map(
func=run_parallel_map_providers_query,
iterable=multiprocessing_data
)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
pool.terminate()
pool.join()
raise SaltCloudSystemExit('Keyboard Interrupt caught')
else:
pool.close()
pool.join()
pool = multiprocessing.Pool(data_count < 10 and data_count or 10,
init_pool_worker)
parallel_pmap = enter_mainloop(_run_parallel_map_providers_query,
multiprocessing_data,
pool=pool)
for alias, driver, details in parallel_pmap:
if not details:
# There's no providers details?! Skip it!
@ -661,10 +769,8 @@ class Cloud(object):
'Cloud pool size: {0}'.format(pool_size))
# kick off the parallel destroy
output_multip = multiprocessing.Pool(pool_size).map(
func=destroy_multiprocessing,
iterable=parallel_data
)
output_multip = enter_mainloop(
_destroy_multiprocessing, parallel_data, pool_size=pool_size)
# massage the multiprocessing output a bit
ret_multip = {}
@ -1712,10 +1818,8 @@ class Map(Cloud):
else:
pool_size = len(parallel_data)
log.info('Cloud pool size: {0}'.format(pool_size))
output_multip = multiprocessing.Pool(pool_size).map(
func=create_multiprocessing,
iterable=parallel_data
)
output_multip = enter_mainloop(
_create_multiprocessing, parallel_data, pool_size=pool_size)
# We have deployed in parallel, now do start action in
# correct order based on dependencies.
if self.opts['start_action']:
@ -1758,7 +1862,7 @@ def init_pool_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def create_multiprocessing(parallel_data):
def create_multiprocessing(parallel_data, queue=None):
'''
This function will be called from another process when running a map in
parallel mode. The result from the create is always a json object.
@ -1788,7 +1892,7 @@ def create_multiprocessing(parallel_data):
}
def destroy_multiprocessing(parallel_data):
def destroy_multiprocessing(parallel_data, queue=None):
'''
This function will be called from another process when running a map in
parallel mode. The result from the destroy is always a json object.
@ -1817,7 +1921,7 @@ def destroy_multiprocessing(parallel_data):
}
def run_parallel_map_providers_query(data):
def run_parallel_map_providers_query(data, queue=None):
'''
This function will be called from another process when building the
providers map.
@ -1852,3 +1956,18 @@ def run_parallel_map_providers_query(data):
)
# Failed to communicate with the provider, don't list any nodes
return (data['alias'], data['driver'], ())
# for pickle and multiprocessing, we cant use directly decorators
def _run_parallel_map_providers_query(*args, **kw):
return communicator(run_parallel_map_providers_query)(*args[0], **kw)
def _destroy_multiprocessing(*args, **kw):
return communicator(destroy_multiprocessing)(*args[0], **kw)
def _create_multiprocessing(*args, **kw):
return communicator(create_multiprocessing)(*args[0], **kw)
#