Merge pull request #9738 from abemusic/features/cloud-parallel-destroy

Adding parallel destroy capability to salt cloud
This commit is contained in:
Joseph Hall 2014-01-14 08:31:38 -08:00
commit ceb38ec6fa

View File

@ -583,26 +583,77 @@ class Cloud(object):
names = set(names)
matching = self.get_running_by_names(names, cached=cached)
vms_to_destroy = set()
parallel_data = []
for alias, drivers in matching.iteritems():
for driver, vms in drivers.iteritems():
for name in vms:
if name in names:
vms_to_destroy.add((alias, driver, name))
if self.opts['parallel']:
parallel_data.append({
'opts': self.opts,
'name': name,
'alias': alias,
'driver': driver,
})
# destroying in parallel
if self.opts['parallel'] and len(parallel_data) > 0:
# set the pool size based on configuration or default to
# the number of machines we're destroying
if 'pool_size' in self.opts:
pool_size = self.opts['pool_size']
else:
pool_size = len(parallel_data)
log.info('Destroying in parallel mode; '
'Cloud pool size: {0}'.format(pool_size))
# kick off the parallel destroy
output_multip = multiprocessing.Pool(pool_size).map(
func=destroy_multiprocessing,
iterable=parallel_data
)
# massage the multiprocessing output a bit
ret_multip = {}
for obj in output_multip:
ret_multip.update(obj)
# build up a datastructure similar to what the non-parallel
# destory uses
for obj in parallel_data:
alias = obj['alias']
driver = obj['driver']
name = obj['name']
if alias not in processed:
processed[alias] = {}
if driver not in processed[alias]:
processed[alias][driver] = {}
processed[alias][driver][name] = ret_multip[name]
names.remove(name)
# not destroying in parallel
else:
log.info('Destroying in non-parallel mode.')
for alias, driver, name in vms_to_destroy:
fun = '{0}.destroy'.format(driver)
with context.func_globals_inject(
self.clouds[fun],
__active_provider_name__=':'.join([alias,
driver])):
ret = self.clouds[fun](name)
if alias not in processed:
processed[alias] = {}
if driver not in processed[alias]:
processed[alias][driver] = {}
processed[alias][driver][name] = ret
names.remove(name)
# now the processed data structure contains the output from either
# the parallel or non-parallel destroy and we should finish up
# with removing minion keys if necessary
for alias, driver, name in vms_to_destroy:
fun = '{0}.destroy'.format(driver)
with context.func_globals_inject(
self.clouds[fun],
__active_provider_name__=':'.join([alias,
driver])):
ret = self.clouds[fun](name)
if alias not in processed:
processed[alias] = {}
if driver not in processed[alias]:
processed[alias][driver] = {}
processed[alias][driver][name] = ret
names.remove(name)
ret = processed[alias][driver][name]
if not ret:
continue
@ -1664,6 +1715,35 @@ def create_multiprocessing(parallel_data):
}
def destroy_multiprocessing(parallel_data):
'''
This function will be called from another process when running a map in
parallel mode. The result from the destroy is always a json object.
'''
parallel_data['opts']['output'] = 'json'
clouds = salt.loader.clouds(parallel_data['opts'])
try:
fun = clouds['{0}.destroy'.format(parallel_data['driver'])]
with context.func_globals_inject(
fun,
__active_provider_name__=':'.join([parallel_data['alias'],
parallel_data['driver']])):
output = fun(parallel_data['name'])
except SaltCloudException as exc:
log.error(
'Failed to destroy {0}. Error: {1}'.format(parallel_data['name'], exc),
# Show the traceback if the debug logging level is enabled
exc_info=log.isEnabledFor(logging.DEBUG)
)
return {parallel_data['name']: {'Error': str(exc)}}
return {
parallel_data['name']: salt.utils.cloud.simple_types_filter(output)
}
def run_parallel_map_providers_query(data):
'''
This function will be called from another process when building the