mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
commit
fa72900d86
@ -2,6 +2,11 @@
|
||||
'''
|
||||
Encapsulate the different transports available to Salt. Currently this is only ZeroMQ.
|
||||
'''
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
# Import Salt Libs
|
||||
import salt.payload
|
||||
@ -13,8 +18,15 @@ except ImportError:
|
||||
# Don't die on missing transport libs since only one transport is required
|
||||
pass
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Channel(object):
|
||||
# store a cache of the channels so a single pid can re-use a connection.
|
||||
# This needs to be per thread AND per process since we don't control
|
||||
# concurrency across threads or processes, and we can use both
|
||||
channel_cache = defaultdict(dict)
|
||||
|
||||
@staticmethod
|
||||
def factory(opts, **kwargs):
|
||||
# Default to ZeroMQ for now
|
||||
@ -25,9 +37,18 @@ class Channel(object):
|
||||
elif 'transport' in opts.get('pillar', {}).get('master', {}):
|
||||
ttype = opts['pillar']['master']['transport']
|
||||
|
||||
# everyone needs to store per thread and per process
|
||||
process_key = (threading.current_thread().name, os.getpid())
|
||||
|
||||
if ttype == 'zeromq':
|
||||
return ZeroMQChannel(opts, **kwargs)
|
||||
if ttype == 'raet':
|
||||
channel_key = ZeroMQChannel.cachekey(opts, **kwargs)
|
||||
key = (process_key, channel_key)
|
||||
log.debug('Retrieving channel with key {0}'.format(key))
|
||||
if key not in Channel.channel_cache[ttype]:
|
||||
log.debug('Creating new zeromq channel in {0}'.format(process_key))
|
||||
Channel.channel_cache[ttype][key] = ZeroMQChannel(opts, **kwargs)
|
||||
return Channel.channel_cache[ttype][key]
|
||||
elif ttype == 'raet':
|
||||
return RAETChannel(opts, **kwargs)
|
||||
else:
|
||||
raise Exception('Channels are only defined for ZeroMQ')
|
||||
@ -80,6 +101,17 @@ class ZeroMQChannel(Channel):
|
||||
|
||||
ZMQ Channels default to 'crypt=aes'
|
||||
'''
|
||||
@staticmethod
|
||||
def cachekey(opts, **kwargs):
|
||||
'''
|
||||
Return a tuple which uniquely defines this channel (for caching)
|
||||
'''
|
||||
return (opts.get('id', opts['pki_dir']), # for self.auth
|
||||
kwargs['crypt'] if 'crypt' in kwargs else 'aes', # for self.crypt
|
||||
kwargs.get('master_uri', opts['master_uri']), # for self.sreq,
|
||||
opts.get('serial', 'msgpack'), # for self.serial
|
||||
)
|
||||
|
||||
def __init__(self, opts, **kwargs):
|
||||
self.opts = opts
|
||||
self.ttype = 'zeromq'
|
||||
|
Loading…
Reference in New Issue
Block a user