fix for local client timeout bug

This commit is contained in:
GwiYeong 2018-02-06 15:50:48 +09:00
parent 43a45b42c3
commit 45d663f435
No known key found for this signature in database
GPG Key ID: AA83A75008781259
2 changed files with 106 additions and 56 deletions

View File

@ -317,9 +317,9 @@ class EventListener(object):
'''
if request not in self.request_map:
return
for tag, future in self.request_map[request]:
for tag, matcher, future in self.request_map[request]:
# timeout the future
self._timeout_future(tag, future)
self._timeout_future(tag, matcher, future)
# remove the timeout
if future in self.timeout_map:
tornado.ioloop.IOLoop.current().remove_timeout(self.timeout_map[future])
@ -327,9 +327,22 @@ class EventListener(object):
del self.request_map[request]
@staticmethod
def prefix_matcher(mtag, tag):
if mtag is None or tag is None:
raise TypeError('mtag or tag can not be None')
return mtag.startswith(tag)
@staticmethod
def exact_matcher(mtag, tag):
if mtag is None or tag is None:
raise TypeError('mtag or tag can not be None')
return mtag == tag
def get_event(self,
request,
tag='',
matcher=prefix_matcher.__func__,
callback=None,
timeout=None
):
@ -349,43 +362,52 @@ class EventListener(object):
tornado.ioloop.IOLoop.current().add_callback(callback, future)
future.add_done_callback(handle_future)
# add this tag and future to the callbacks
self.tag_map[tag].append(future)
self.request_map[request].append((tag, future))
self.tag_map[(tag, matcher)].append(future)
self.request_map[request].append((tag, matcher, future))
if timeout:
timeout_future = tornado.ioloop.IOLoop.current().call_later(timeout, self._timeout_future, tag, future)
timeout_future = tornado.ioloop.IOLoop.current().call_later(timeout, self._timeout_future, tag, matcher, future)
self.timeout_map[future] = timeout_future
return future
def _timeout_future(self, tag, future):
def _timeout_future(self, tag, matcher, future):
'''
Timeout a specific future
'''
if tag not in self.tag_map:
if (tag, matcher) not in self.tag_map:
return
if not future.done():
future.set_exception(TimeoutException())
self.tag_map[tag].remove(future)
if len(self.tag_map[tag]) == 0:
del self.tag_map[tag]
self.tag_map[(tag, matcher)].remove(future)
if len(self.tag_map[(tag, matcher)]) == 0:
del self.tag_map[(tag, matcher)]
def _handle_event_socket_recv(self, raw):
'''
Callback for events on the event sub socket
'''
mtag, data = self.event.unpack(raw, self.event.serial)
# see if we have any futures that need this info:
for tag_prefix, futures in six.iteritems(self.tag_map):
if mtag.startswith(tag_prefix):
for future in futures:
if future.done():
continue
future.set_result({'data': data, 'tag': mtag})
self.tag_map[tag_prefix].remove(future)
if future in self.timeout_map:
tornado.ioloop.IOLoop.current().remove_timeout(self.timeout_map[future])
del self.timeout_map[future]
for (tag, matcher), futures in six.iteritems(self.tag_map):
try:
is_matched = matcher(mtag, tag)
except Exception as e:
logger.error('Failed to run a matcher.', exc_info=True)
is_matched = False
if not is_matched:
continue
for future in futures:
if future.done():
continue
future.set_result({'data': data, 'tag': mtag})
self.tag_map[(tag, matcher)].remove(future)
if future in self.timeout_map:
tornado.ioloop.IOLoop.current().remove_timeout(self.timeout_map[future])
del self.timeout_map[future]
class BaseSaltAPIHandler(tornado.web.RequestHandler, SaltClientsMixIn): # pylint: disable=W0223
@ -925,64 +947,83 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
if self.application.opts['order_masters']:
syndic_min_wait = tornado.gen.sleep(self.application.opts['syndic_wait'])
job_not_running = self.job_not_running(pub_data['jid'],
chunk['tgt'],
f_call['kwargs']['tgt_type'],
minions_remaining=minions_remaining
)
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
is_finished = Future()
job_not_running_future = self.job_not_running(pub_data['jid'],
chunk['tgt'],
f_call['kwargs']['tgt_type'],
is_finished,
minions_remaining=list(minions_remaining),
)
# if we have a min_wait, do that
if syndic_min_wait is not None:
yield syndic_min_wait
# we are completed when either all minions return or the job isn't running anywhere
chunk_ret = yield self.all_returns(pub_data['jid'],
finish_futures=[job_not_running],
minions_remaining=minions_remaining,
)
raise tornado.gen.Return(chunk_ret)
all_return_future = self.all_returns(pub_data['jid'],
is_finished,
minions_remaining=list(minions_remaining),
)
yield job_not_running_future
raise tornado.gen.Return((yield all_return_future))
@tornado.gen.coroutine
def all_returns(self,
jid,
finish_futures=None,
is_finished,
minions_remaining=None,
):
'''
Return a future which will complete once all returns are completed
(according to minions_remaining), or one of the passed in "finish_futures" completes
(according to minions_remaining), or one of the passed in "is_finished" completes
'''
if finish_futures is None:
finish_futures = []
if minions_remaining is None:
minions_remaining = []
ret_tag = tagify([jid, 'ret'], 'job')
chunk_ret = {}
minion_events = {}
for minion in minions_remaining:
tag = tagify([jid, 'ret', minion], 'job')
minion_event = self.application.event_listener.get_event(self,
tag=tag,
matcher=EventListener.exact_matcher,
timeout=self.application.opts['timeout'])
minion_events[minion_event] = minion
while True:
ret_event = self.application.event_listener.get_event(self,
tag=ret_tag,
)
f = yield Any([ret_event] + finish_futures)
if f in finish_futures:
raise tornado.gen.Return(chunk_ret)
event = f.result()
chunk_ret[event['data']['id']] = event['data']['return']
# its possible to get a return that wasn't in the minion_remaining list
f = yield Any(minion_events.keys() + [is_finished])
try:
minions_remaining.remove(event['data']['id'])
if f is is_finished:
for event in minion_events:
if not event.done():
event.set_result(None)
raise tornado.gen.Return(chunk_ret)
f_result = f.result()
chunk_ret[f_result['data']['id']] = f_result['data']['return']
except TimeoutException:
pass
# clear finished event future
try:
minions_remaining.remove(minion_events[f])
del minion_events[f]
except ValueError:
pass
if len(minions_remaining) == 0:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(chunk_ret)
@tornado.gen.coroutine
def job_not_running(self,
jid,
tgt,
tgt_type,
minions_remaining=None,
):
jid,
tgt,
tgt_type,
is_finished,
minions_remaining=None,
):
'''
Return a future which will complete once jid (passed in) is no longer
running on tgt
@ -999,12 +1040,21 @@ class SaltAPIHandler(BaseSaltAPIHandler, SaltClientsMixIn): # pylint: disable=W
minion_running = False
while True:
try:
event = yield self.application.event_listener.get_event(self,
tag=ping_tag,
timeout=self.application.opts['gather_job_timeout'],
)
event = self.application.event_listener.get_event(self,
tag=ping_tag,
timeout=self.application.opts['gather_job_timeout'],
)
f = yield Any([event, is_finished])
# When finished entire routine, cleanup other futures and return result
if f is is_finished:
if not event.done():
event.set_result(None)
raise tornado.gen.Return(True)
event = f.result()
except TimeoutException:
if not minion_running:
if not is_finished.done():
is_finished.set_result(True)
raise tornado.gen.Return(True)
else:
ping_pub_data = yield self.saltclients['local'](tgt,

View File

@ -90,7 +90,7 @@ class TestEventListener(AsyncTestCase):
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self, 'evt1', self.stop) # get an event future
event_future = event_listener.get_event(self, 'evt1', callback=self.stop) # get an event future
me.fire_event({'data': 'foo2'}, 'evt2') # fire an event we don't want
me.fire_event({'data': 'foo1'}, 'evt1') # fire an event we do want
self.wait() # wait for the future