Add transport failover

This commit is contained in:
Joseph Hall 2016-08-16 11:17:31 -06:00
parent e744b795d1
commit 4d83a3f22f
6 changed files with 55 additions and 21 deletions

View File

@ -75,7 +75,7 @@ class Caller(object):
ttype = opts['pillar']['master']['transport']
# switch on available ttypes
if ttype in ('zeromq', 'tcp'):
if ttype in ('zeromq', 'tcp', 'detect'):
return ZeroMQCaller(opts, **kwargs)
elif ttype == 'raet':
return RAETCaller(opts, **kwargs)

View File

@ -478,6 +478,9 @@ class AsyncAuth(object):
error = exc
break
if creds == 'retry':
if self.opts.get('detect_mode') is True:
error = SaltClientError('Detect mode is on')
break
if self.opts.get('caller'):
print('Minion failed to authenticate with the master, '
'has the minion key been accepted?')
@ -491,6 +494,8 @@ class AsyncAuth(object):
continue
break
if not isinstance(creds, dict) or 'aes' not in creds:
if self.opts.get('detect_mode') is True:
self._authenticate_future.set_result(False)
try:
del AsyncAuth.creds_map[self.__key(self.opts)]
except KeyError:
@ -553,6 +558,9 @@ class AsyncAuth(object):
if safe:
log.warning('SaltReqTimeoutError: {0}'.format(e))
raise tornado.gen.Return('retry')
if self.opts.get('detect_mode') is True:
raise tornado.gen.Return('retry')
else:
raise SaltClientError('Attempt to authenticate with the salt master failed with timeout error')
if 'load' in payload:
if 'ret' in payload['load']:

View File

@ -574,6 +574,21 @@ class MinionBase(object):
opts.update(prep_ip_port(opts))
opts.update(resolve_dns(opts))
try:
if self.opts['transport'] == 'detect':
self.opts['detect_mode'] = True
#for trans in ('zeromq', 'tcp'):
for trans in ('tcp', 'zeromq'):
log.debug('00000000000000')
self.opts['transport'] = trans
log.debug('11111111111111')
pub_channel = salt.transport.client.AsyncPubChannel.factory(self.opts, **factory_kwargs)
log.debug('22222222222222')
yield pub_channel.connect()
log.debug('33333333333333')
if not pub_channel.auth.authenticated:
log.debug('44444444444444')
continue
else:
pub_channel = salt.transport.client.AsyncPubChannel.factory(self.opts, **factory_kwargs)
yield pub_channel.connect()
self.tok = pub_channel.auth.gen_token('salt')

View File

@ -7,10 +7,14 @@ This includes client side transport, for the ReqServer and the Publisher
# Import Python Libs
from __future__ import absolute_import
import time
import logging
# Import Salt Libs
from salt.utils.async import SyncWrapper
log = logging.getLogger(__name__)
class ReqChannel(object):
'''
@ -103,20 +107,22 @@ class AsyncReqChannel(AsyncChannel):
if ttype == 'zeromq':
import salt.transport.zeromq
return salt.transport.zeromq.AsyncZeroMQReqChannel(opts, **kwargs)
elif ttype == 'raet':
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver_configured:
# TODO: add opt to specify number of resolver threads
AsyncChannel._config_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
elif ttype == 'raet':
import salt.transport.raet
return salt.transport.raet.AsyncRAETReqChannel(opts, **kwargs)
elif ttype == 'local':
import salt.transport.local
return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
else:
raise Exception('Channels are only defined for ZeroMQ and raet')
raise Exception(
'Channels are only defined for tcp, zeromq, raet, and local'
)
# return NewKindOfChannel(opts, **kwargs)
def send(self, load, tries=3, timeout=60, raw=False):
@ -147,32 +153,30 @@ class AsyncPubChannel(AsyncChannel):
ttype = opts['transport']
elif 'transport' in opts.get('pillar', {}).get('master', {}):
ttype = opts['pillar']['master']['transport']
if ttype == 'detect':
opts['__last_transport'] = 'tcp'
opts['transport'] = 'tcp'
ttype = opts['transport']
elif '__last_transport' in opts:
opts['transport'] = 'zeromq' if opts['transport'] == 'tcp' else 'tcp'
ttype = opts['transport']
# switch on available ttypes
if ttype == 'detect':
opts['detect_mode'] = True
log.info('Transport is set to detect; using {0}'.format(ttype))
if ttype == 'zeromq':
import salt.transport.zeromq
return salt.transport.zeromq.AsyncZeroMQPubChannel(opts, **kwargs)
elif ttype == 'raet': # TODO:
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'tcp':
if not cls._resolver_configured:
# TODO: add opt to specify number of resolver threads
AsyncChannel._config_resolver()
import salt.transport.tcp
return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == 'raet': # TODO:
import salt.transport.raet
return salt.transport.raet.AsyncRAETPubChannel(opts, **kwargs)
elif ttype == 'local': # TODO:
import salt.transport.local
return salt.transport.local.AsyncLocalPubChannel(opts, **kwargs)
else:
raise Exception('Channels are only defined for ZeroMQ and raet')
raise Exception(
'Channels are only defined for tcp, zeromq, raet, and local'
)
# return NewKindOfChannel(opts, **kwargs)
def connect(self):

View File

@ -981,6 +981,9 @@ class SaltMessageClient(object):
# Add this future to the mapping
self.send_future_map[message_id] = future
if self.opts.get('detect_mode') is True:
timeout = 1
if timeout is not None:
send_timeout = self.io_loop.call_later(timeout, self.timeout_message, message_id)
self.send_timeout_map[message_id] = send_timeout

View File

@ -380,6 +380,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
def connect(self):
if not self.auth.authenticated:
yield self.auth.authenticate()
if self.opts.get('detect_mode', False) is False:
self.publish_port = self.auth.creds['publish_port']
self._socket.connect(self.master_pub)
@ -994,6 +995,9 @@ class AsyncReqMessageClient(object):
# Add this future to the mapping
self.send_future_map[message] = future
if self.opts.get('detect_mode') is True:
timeout = 1
if timeout is not None:
send_timeout = self.io_loop.call_later(timeout, self.timeout_message, message)
self.send_timeout_map[message] = send_timeout