Merge pull request #30488 from skizunov/develop2

Do not use '__master_alive' schedule on TCP transport
This commit is contained in:
Mike Place 2016-01-21 08:43:10 -07:00
commit 491610b998
2 changed files with 130 additions and 42 deletions

View File

@ -867,7 +867,8 @@ class Minion(MinionBase):
self.schedule.delete_job('__mine_interval', persist=True)
# add master_alive job if enabled
if self.opts['master_alive_interval'] > 0:
if (self.opts['transport'] != 'tcp' and
self.opts['master_alive_interval'] > 0):
self.schedule.add_job({
'__master_alive':
{
@ -879,6 +880,8 @@ class Minion(MinionBase):
'connected': True}
}
}, persist=True)
else:
self.schedule.delete_job('__master_alive', persist=True)
self.grains_cache = self.opts['grains']
@ -1672,7 +1675,6 @@ class Minion(MinionBase):
log.debug('Forwarding master event tag={tag}'.format(tag=data['tag']))
self._fire_master(data['data'], data['tag'], data['events'], data['pretag'])
elif package.startswith('__master_disconnected'):
tag, data = salt.utils.event.MinionEvent.unpack(tag, data)
# if the master disconnect event is for a different master, raise an exception
if data['master'] != self.opts['master']:
raise Exception()
@ -1680,16 +1682,17 @@ class Minion(MinionBase):
# we are not connected anymore
self.connected = False
# modify the scheduled job to fire only on reconnect
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': False}
}
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
if self.opts['transport'] != 'tcp':
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': False}
}
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
log.info('Connection to master {0} lost'.format(self.opts['master']))
@ -1712,16 +1715,17 @@ class Minion(MinionBase):
log.info('Minion is ready to receive requests!')
# update scheduled job to run with the new master addr
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
if self.opts['transport'] != 'tcp':
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
elif package.startswith('__master_connected'):
# handle this event only once. otherwise it will pollute the log
@ -1730,17 +1734,18 @@ class Minion(MinionBase):
self.connected = True
# modify the __master_alive job to only fire,
# if the connection is lost again
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
if self.opts['transport'] != 'tcp':
schedule = {
'function': 'status.master',
'seconds': self.opts['master_alive_interval'],
'jid_include': True,
'maxrunning': 2,
'kwargs': {'master': self.opts['master'],
'connected': True}
}
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
self.schedule.modify_job(name='__master_alive',
schedule=schedule)
elif package.startswith('_salt_error'):
log.debug('Forwarding salt error event tag={tag}'.format(tag=tag))
self._fire_master(data, tag)
@ -2748,7 +2753,8 @@ class ProxyMinion(Minion):
self.schedule.delete_job('__mine_interval', persist=True)
# add master_alive job if enabled
if self.opts['master_alive_interval'] > 0:
if (self.opts['transport'] != 'tcp' and
self.opts['master_alive_interval'] > 0):
self.schedule.add_job({
'__master_alive':
{
@ -2760,5 +2766,7 @@ class ProxyMinion(Minion):
'connected': True}
}
}, persist=True)
else:
self.schedule.delete_job('__master_alive', persist=True)
self.grains_cache = self.opts['grains']

View File

@ -15,7 +15,8 @@ import sys
import os
import weakref
import urlparse # TODO: remove
import time
import traceback
# Import Salt Libs
import salt.crypt
@ -258,6 +259,12 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.io_loop = kwargs.get('io_loop') or tornado.ioloop.IOLoop.current()
self.connected = False
self._closing = False
self._reconnected = False
self.event = salt.utils.event.get_event(
'minion',
opts=self.opts,
listen=False
)
def close(self):
if self._closing:
@ -269,6 +276,65 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
def __del__(self):
self.close()
def connect_callback(self, result):
if self._closing:
return
self.connected = True
self.event.fire_event(
{'master': self.opts['master']},
'__master_connected'
)
if self._reconnected:
# On reconnects, fire a master event to notify that the minion is
# available.
if self.opts.get('__role') == 'syndic':
data = 'Syndic {0} started at {1}'.format(
self.opts['id'],
time.asctime()
)
tag = salt.utils.event.tagify(
[self.opts['id'], 'start'],
'syndic'
)
else:
data = 'Minion {0} started at {1}'.format(
self.opts['id'],
time.asctime()
)
tag = salt.utils.event.tagify(
[self.opts['id'], 'start'],
'minion'
)
tok = self.auth.gen_token('salt')
load = {'id': self.opts['id'],
'cmd': '_minion_event',
'pretag': None,
'tok': tok,
'data': data,
'tag': tag}
req_channel = salt.utils.async.SyncWrapper(
AsyncTCPReqChannel, (self.opts,)
)
try:
req_channel.send(load, timeout=60)
except salt.exceptions.SaltReqTimeoutError:
log.info('fire_master failed: master could not be contacted. Request timed out.')
except Exception:
log.info('fire_master failed: {0}'.format(
traceback.format_exc())
)
else:
self._reconnected = True
def disconnect_callback(self):
if self._closing:
return
self.connected = False
self.event.fire_event(
{'master': self.opts['master']},
'__master_disconnected'
)
@tornado.gen.coroutine
def connect(self):
try:
@ -279,7 +345,9 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.opts,
self.opts['master_ip'],
int(self.auth.creds['publish_port']),
io_loop=self.io_loop)
io_loop=self.io_loop,
connect_callback=self.connect_callback,
disconnect_callback=self.disconnect_callback)
yield self.message_client.connect() # wait for the client to be connected
self.connected = True
# TODO: better exception handling...
@ -486,9 +554,12 @@ class SaltMessageClient(object):
'''
Low-level message sending client
'''
def __init__(self, opts, host, port, io_loop=None, resolver=None):
def __init__(self, opts, host, port, io_loop=None, resolver=None,
connect_callback=None, disconnect_callback=None):
self.host = host
self.port = port
self.connect_callback = connect_callback
self.disconnect_callback = disconnect_callback
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
@ -529,7 +600,7 @@ class SaltMessageClient(object):
def __del__(self):
self.close()
def connect(self, callback=None):
def connect(self):
'''
Ask for this client to reconnect to the origin
'''
@ -540,11 +611,12 @@ class SaltMessageClient(object):
self._connecting_future = future
self.io_loop.add_callback(self._connect)
if callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
# Add the callback only when a new future is created
if self.connect_callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(self.connect_callback, response)
future.add_done_callback(handle_future)
return future
@ -596,6 +668,8 @@ class SaltMessageClient(object):
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -607,9 +681,12 @@ class SaltMessageClient(object):
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
@tornado.gen.coroutine
def _stream_send(self):
@ -626,9 +703,12 @@ class SaltMessageClient(object):
self.remove_message_timeout(message_id)
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
def _message_id(self):
wrap = False