From ae9b62c27f0c15e270391f776301280e46b03462 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Wed, 5 Mar 2014 18:00:56 -0800 Subject: [PATCH] Reuse Channels with support for multiple ttypes Submitting merge request to get jenkins run. Last time this was merged there were some problems which seem to be related to this but i'm unable to reproduce them locally-- so hopefully we can get something from Jenkins :) --- salt/transport/__init__.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/salt/transport/__init__.py b/salt/transport/__init__.py index 3c3f85a16f..8bb0028db0 100644 --- a/salt/transport/__init__.py +++ b/salt/transport/__init__.py @@ -2,6 +2,11 @@ ''' Encapsulate the different transports available to Salt. Currently this is only ZeroMQ. ''' +import logging +import os +import threading + +from collections import defaultdict # Import Salt Libs import salt.payload @@ -13,8 +18,15 @@ except ImportError: # Don't die on missing transport libs since only one transport is required pass +log = logging.getLogger(__name__) + class Channel(object): + # store a cache of the channels so a single pid can re-use a connection. + # This needs to be per thread AND per process since we don't control + # concurrency across threads or processes, and we can use both + channel_cache = defaultdict(dict) + @staticmethod def factory(opts, **kwargs): # Default to ZeroMQ for now @@ -25,9 +37,18 @@ class Channel(object): elif 'transport' in opts.get('pillar', {}).get('master', {}): ttype = opts['pillar']['master']['transport'] + # everyone needs to store per thread and per process + process_key = (threading.current_thread().name, os.getpid()) + if ttype == 'zeromq': - return ZeroMQChannel(opts, **kwargs) - if ttype == 'raet': + channel_key = ZeroMQChannel.cachekey(opts, **kwargs) + key = (process_key, channel_key) + log.debug('Retrieving channel with key {0}'.format(key)) + if key not in Channel.channel_cache[ttype]: + log.debug('Creating new zeromq channel in {0}'.format(process_key)) + Channel.channel_cache[ttype][key] = ZeroMQChannel(opts, **kwargs) + return Channel.channel_cache[ttype][key] + elif ttype == 'raet': return RAETChannel(opts, **kwargs) else: raise Exception('Channels are only defined for ZeroMQ') @@ -80,6 +101,17 @@ class ZeroMQChannel(Channel): ZMQ Channels default to 'crypt=aes' ''' + @staticmethod + def cachekey(opts, **kwargs): + ''' + Return a tuple which uniquely defines this channel (for caching) + ''' + return (opts.get('id', opts['pki_dir']), # for self.auth + kwargs['crypt'] if 'crypt' in kwargs else 'aes', # for self.crypt + kwargs.get('master_uri', opts['master_uri']), # for self.sreq, + opts.get('serial', 'msgpack'), # for self.serial + ) + def __init__(self, opts, **kwargs): self.opts = opts self.ttype = 'zeromq'