From ecd66d1b3a9978f09e33b965f19ab93662f9d950 Mon Sep 17 00:00:00 2001 From: Abe Music Date: Fri, 10 Jan 2014 16:23:33 -0600 Subject: [PATCH] Adding parallel destroy capability to salt cloud --- salt/cloud/__init__.py | 106 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 13 deletions(-) diff --git a/salt/cloud/__init__.py b/salt/cloud/__init__.py index 2384efe3e1..fe09946036 100644 --- a/salt/cloud/__init__.py +++ b/salt/cloud/__init__.py @@ -583,26 +583,77 @@ class Cloud(object): names = set(names) matching = self.get_running_by_names(names, cached=cached) vms_to_destroy = set() + parallel_data = [] for alias, drivers in matching.iteritems(): for driver, vms in drivers.iteritems(): for name in vms: if name in names: vms_to_destroy.add((alias, driver, name)) + if self.opts['parallel']: + parallel_data.append({ + 'opts': self.opts, + 'name': name, + 'alias': alias, + 'driver': driver, + }) + # destroying in parallel + if self.opts['parallel'] and len(parallel_data) > 0: + # set the pool size based on configuration or default to + # the number of machines we're destroying + if 'pool_size' in self.opts: + pool_size = self.opts['pool_size'] + else: + pool_size = len(parallel_data) + log.info('Destroying in parallel mode; ' + 'Cloud pool size: {0}'.format(pool_size)) + + # kick off the parallel destroy + output_multip = multiprocessing.Pool(pool_size).map( + func=destroy_multiprocessing, + iterable=parallel_data + ) + + # massage the multiprocessing output a bit + ret_multip = {} + for obj in output_multip: + ret_multip.update(obj) + + # build up a datastructure similar to what the non-parallel + # destory uses + for obj in parallel_data: + alias = obj['alias'] + driver = obj['driver'] + name = obj['name'] + if alias not in processed: + processed[alias] = {} + if driver not in processed[alias]: + processed[alias][driver] = {} + processed[alias][driver][name] = ret_multip[name] + names.remove(name) + + # not destroying in parallel + else: + log.info('Destroying in non-parallel mode.') + for alias, driver, name in vms_to_destroy: + fun = '{0}.destroy'.format(driver) + with context.func_globals_inject( + self.clouds[fun], + __active_provider_name__=':'.join([alias, + driver])): + ret = self.clouds[fun](name) + if alias not in processed: + processed[alias] = {} + if driver not in processed[alias]: + processed[alias][driver] = {} + processed[alias][driver][name] = ret + names.remove(name) + + # now the processed data structure contains the output from either + # the parallel or non-parallel destroy and we should finish up + # with removing minion keys if necessary for alias, driver, name in vms_to_destroy: - fun = '{0}.destroy'.format(driver) - with context.func_globals_inject( - self.clouds[fun], - __active_provider_name__=':'.join([alias, - driver])): - ret = self.clouds[fun](name) - if alias not in processed: - processed[alias] = {} - if driver not in processed[alias]: - processed[alias][driver] = {} - processed[alias][driver][name] = ret - names.remove(name) - + ret = processed[alias][driver][name] if not ret: continue @@ -1664,6 +1715,35 @@ def create_multiprocessing(parallel_data): } +def destroy_multiprocessing(parallel_data): + ''' + This function will be called from another process when running a map in + parallel mode. The result from the destroy is always a json object. + ''' + parallel_data['opts']['output'] = 'json' + clouds = salt.loader.clouds(parallel_data['opts']) + + try: + fun = clouds['{0}.destroy'.format(parallel_data['driver'])] + with context.func_globals_inject( + fun, + __active_provider_name__=':'.join([parallel_data['alias'], + parallel_data['driver']])): + output = fun(parallel_data['name']) + + except SaltCloudException as exc: + log.error( + 'Failed to destroy {0}. Error: {1}'.format(parallel_data['name'], exc), + # Show the traceback if the debug logging level is enabled + exc_info=log.isEnabledFor(logging.DEBUG) + ) + return {parallel_data['name']: {'Error': str(exc)}} + + return { + parallel_data['name']: salt.utils.cloud.simple_types_filter(output) + } + + def run_parallel_map_providers_query(data): ''' This function will be called from another process when building the