Merge pull request #22948 from jacksontj/develop

Req channel singletons
This commit is contained in:
Thomas S Hatch 2015-04-30 08:24:22 -07:00
commit 1ae9dcb71c
3 changed files with 91 additions and 10 deletions

View File

@ -13,6 +13,7 @@ import msgpack
import socket
import sys
import os
import weakref
import urlparse # TODO: remove
@ -100,13 +101,54 @@ def socket_frame_recv(s, recv_size=4096):
return buf
# TODO: make singleton
# TODO: move serial down into message library
class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
'''
Encapsulate sending routines to tcp.
Note: this class returns a singleton
'''
# This class is only a singleton per minion/master pair
# mapping of io_loop -> {key -> channel}
instance_map = weakref.WeakKeyDictionary()
def __new__(cls, opts, **kwargs):
'''
Only create one instance of channel per __key()
'''
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
key = cls.__key(opts, **kwargs)
if key not in loop_instance_map:
log.debug('Initializing new AsyncTCPReqChannel for {0}'.format(key))
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
new_obj = object.__new__(cls)
new_obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = new_obj
else:
log.debug('Re-using AsyncTCPReqChannel for {0}'.format(key))
return loop_instance_map[key]
@classmethod
def __key(cls, opts, **kwargs):
return (opts['pki_dir'], # where the keys are stored
opts['id'], # minion ID
opts['master_uri'], # master ID
kwargs.get('crypt', 'aes'), # TODO: use the same channel for crypt
)
# has to remain empty for singletons, since __init__ will *always* be called
def __init__(self, opts, **kwargs):
pass
# an init for the singleton instance to call
def __singleton_init__(self, opts, **kwargs):
self.opts = dict(opts)
self.serial = salt.payload.Serial(self.opts)

View File

@ -9,6 +9,7 @@ import logging
import os
import errno
import hashlib
import weakref
from random import randint
# Import Salt Libs
@ -46,8 +47,47 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
ZMQ Channels default to 'crypt=aes'
'''
# This class is only a singleton per minion/master pair
# mapping of io_loop -> {key -> channel}
instance_map = weakref.WeakKeyDictionary()
def __new__(cls, opts, **kwargs):
'''
Only create one instance of channel per __key()
'''
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
key = cls.__key(opts, **kwargs)
if key not in loop_instance_map:
log.debug('Initializing new AsyncZeroMQReqChannel for {0}'.format(key))
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
new_obj = object.__new__(cls)
new_obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = new_obj
else:
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
return loop_instance_map[key]
@classmethod
def __key(cls, opts, **kwargs):
return (opts['pki_dir'], # where the keys are stored
opts['id'], # minion ID
kwargs.get('master_uri', opts.get('master_uri')), # master ID
kwargs.get('crypt', 'aes'), # TODO: use the same channel for crypt
)
# has to remain empty for singletons, since __init__ will *always* be called
def __init__(self, opts, **kwargs):
pass
# an init for the singleton instance to call
def __singleton_init__(self, opts, **kwargs):
self.opts = dict(opts)
self.ttype = 'zeromq'

View File

@ -13,6 +13,7 @@ if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
import contextlib
import weakref
@contextlib.contextmanager
@ -42,11 +43,17 @@ class SyncWrapper(object):
# the sync wrapper will automatically wait on the future
ret = sync.async_method()
'''
loop_map = weakref.WeakKeyDictionary() # keep a mapping of parent io_loop -> sync_loop
def __init__(self, method, args=tuple(), kwargs=None):
if kwargs is None:
kwargs = {}
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
parent_io_loop = tornado.ioloop.IOLoop.current()
if parent_io_loop not in SyncWrapper.loop_map:
SyncWrapper.loop_map[parent_io_loop] = zmq.eventloop.ioloop.ZMQIOLoop()
self.io_loop = SyncWrapper.loop_map[parent_io_loop]
kwargs['io_loop'] = self.io_loop
with current_ioloop(self.io_loop):
@ -75,11 +82,3 @@ class SyncWrapper(object):
self.io_loop.add_future(future, lambda future: self.io_loop.stop())
self.io_loop.start()
return future.result()
def __del__(self):
'''
On deletion of the async wrapper, make sure to clean up the async stuff
'''
if hasattr(self, 'async'):
del self.async
self.io_loop.close()