mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 17:33:54 +00:00
Remove threading from MultiMinion
This commit is contained in:
parent
6410b404b3
commit
bdc5d6d7ed
@ -445,17 +445,9 @@ class MultiMinion(MinionBase):
|
||||
|
||||
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
|
||||
|
||||
def _minions(self):
|
||||
def _spawn_minions(self):
|
||||
'''
|
||||
Return a dict of minion generators bound to the tune_in method
|
||||
|
||||
dict of master -> minion_mapping, the mapping contains:
|
||||
|
||||
opts: options used to create the minion
|
||||
last: last auth attempt time
|
||||
auth_wait: time to wait for next auth attempt
|
||||
minion: minion object
|
||||
generator: generator function (non-blocking tune_in)
|
||||
Spawn all the coroutines which will sign in to masters
|
||||
'''
|
||||
if not isinstance(self.opts['master'], list):
|
||||
log.error(
|
||||
@ -467,55 +459,31 @@ class MultiMinion(MinionBase):
|
||||
s_opts['master'] = master
|
||||
s_opts['multimaster'] = True
|
||||
s_opts['auth_timeout'] = self.MINION_CONNECT_TIMEOUT
|
||||
ret[master] = {'opts': s_opts,
|
||||
'last': time.time(),
|
||||
'auth_wait': s_opts['acceptance_wait_time']}
|
||||
self.io_loop.spawn_callback(self._connect_minion, s_opts)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _connect_minion(self, opts):
|
||||
last = 0 # never have we signed in
|
||||
auth_wait = opts['acceptance_wait_time']
|
||||
while True:
|
||||
try:
|
||||
minion = Minion(s_opts,
|
||||
self.MINION_CONNECT_TIMEOUT,
|
||||
False,
|
||||
io_loop=self.io_loop,
|
||||
loaded_base_name='salt.loader.{0}'.format(master))
|
||||
ret[master]['minion'] = minion
|
||||
loaded_base_name='salt.loader.{0}'.format(master),
|
||||
)
|
||||
yield minion.connect_master(opts, self.MINION_CONNECT_TIMEOUT, False) # TODO: we pass these twice?
|
||||
minion.tune_in(start=False)
|
||||
break
|
||||
except SaltClientError as exc:
|
||||
log.error('Error while bringing up minion for multi-master. Is master at {0} responding?'.format(master))
|
||||
return ret
|
||||
|
||||
# TODO: make an async minion interface, then we can do this in the IOLoop
|
||||
def _async_sign_in_masters(self):
|
||||
'''
|
||||
Background thread to try and sign in to masters we were unable to get
|
||||
at the beginning
|
||||
'''
|
||||
remaining = True
|
||||
while remaining:
|
||||
remainging = False
|
||||
for master, minion in self.minions.iteritems():
|
||||
if 'minion' not in minion:
|
||||
if time.time() - minion['auth_wait'] > minion['last']:
|
||||
minion['last'] = time.time()
|
||||
if minion['auth_wait'] < self.max_wait:
|
||||
minion['auth_wait'] += self.auth_wait
|
||||
try:
|
||||
t_minion = Minion(minion['opts'], self.MINION_CONNECT_TIMEOUT, False, io_loop=self.io_loop)
|
||||
minion['minion'] = t_minion
|
||||
# since minion sign_in registers the event handler
|
||||
# we have to do that from the main thread
|
||||
self.io_loop.spawn_callback(self._tune_in_master, master)
|
||||
except SaltClientError as exc:
|
||||
remaining = True
|
||||
log.error('Error while bringing up minion for multi-master. Is master at {0} responding?'.format(master))
|
||||
else:
|
||||
remaining = True
|
||||
time.sleep(0.1)
|
||||
|
||||
def _tune_in_master(self, master):
|
||||
self.minions[master]['minion'].tune_in(start=False)
|
||||
|
||||
def handle_event(self, package):
|
||||
for master_name, minion_dict in self.minions.iteritems():
|
||||
minion_dict['minion'].handle_event(package)
|
||||
log.error('Error while bringing up minion for multi-master. Is master at {0} responding?'.format(opts['master']))
|
||||
last = time.time()
|
||||
if auth_wait < self.max_auth_wait:
|
||||
auth_wait += self.auth_wait
|
||||
yield tornado.gen.sleep(auth_wait) # TODO: log?
|
||||
except Exception as e:
|
||||
log.critical('Unexpected error while connecting to {0}'.format(opts['master']), exc_info=True)
|
||||
|
||||
# Multi Master Tune In
|
||||
def tune_in(self):
|
||||
@ -527,14 +495,9 @@ class MultiMinion(MinionBase):
|
||||
reconnect (don't know of an API to get the state here in salt)
|
||||
'''
|
||||
# Prepare the minion generators
|
||||
self.minions = self._minions()
|
||||
self.auth_wait = self.opts['acceptance_wait_time']
|
||||
self.max_wait = self.opts['acceptance_wait_time_max']
|
||||
|
||||
# create thread to sign-in to masters that weren't here
|
||||
t = threading.Thread(target=self._async_sign_in_masters)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
self.minions = self._spawn_minions()
|
||||
|
||||
self.io_loop.start()
|
||||
|
||||
@ -1540,7 +1503,8 @@ class Minion(MinionBase):
|
||||
|
||||
log.debug('Minion {0!r} trying to tune in'.format(self.opts['id']))
|
||||
|
||||
self.block_until_connected()
|
||||
if start:
|
||||
self.block_until_connected()
|
||||
|
||||
salt.utils.event.AsyncEventPublisher(self.opts, self.handle_event, io_loop=self.io_loop)
|
||||
self._fire_master_minion_start()
|
||||
|
Loading…
Reference in New Issue
Block a user