Merge pull request #22928 from DSRCompany/flush_returners_queue

Issue #22814: Flush returners queue before salt master shutdown
This commit is contained in:
Mike Place 2015-04-22 09:32:19 -06:00
commit e3fb101f63

View File

@ -56,6 +56,7 @@ from __future__ import absolute_import
import os
import time
import errno
import signal
import hashlib
import logging
import datetime
@ -869,37 +870,56 @@ class EventReturn(multiprocessing.Process):
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
self.event_queue = []
self.stop = False
def sig_stop(self, signum, frame):
self.stop = True # tell it to stop
def flush_events(self):
event_return = '{0}.event_return'.format(
self.opts['event_return']
)
if event_return in self.minion.returners:
try:
self.minion.returners[event_return](self.event_queue)
except Exception as exc:
log.error('Could not store events {0}. '
'Returner raised exception: {1}'.format(
self.event_queue, exc))
del self.event_queue[:]
else:
log.error(
'Could not store return for event(s) {0}. Returner '
'\'{1}\' not found.'
.format(self.event_queue, self.opts['event_return'])
)
def run(self):
'''
Spin up the multiprocess event returner
'''
# Properly exit if a SIGTERM is signalled
signal.signal(signal.SIGTERM, self.sig_stop)
salt.utils.appendproctitle(self.__class__.__name__)
self.event = get_event('master', opts=self.opts)
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
event_queue = []
for event in events:
if self._filter(event):
event_queue.append(event)
if len(event_queue) >= self.event_return_queue:
event_return = '{0}.event_return'.format(
self.opts['event_return']
)
if event_return in self.minion.returners:
try:
self.minion.returners[event_return](event_queue)
except Exception as exc:
log.error('Could not store event {0}. '
'Returner raised exception: {1}'.format(
event, exc))
event_queue = []
else:
log.error(
'Could not store return for event(s) {0}. Returner '
'\'{1}\' not found.'
.format(event_queue, self.opts['event_return'])
)
try:
for event in events:
if self._filter(event):
self.event_queue.append(event)
if len(self.event_queue) >= self.event_return_queue:
self.flush_events()
if self.stop:
break
except zmq.error.ZMQError as exc:
if exc.errno != errno.EINTR: # Outside interrupt is a normal shutdown case
raise
finally: # flush all we have at this moment
if self.event_queue:
self.flush_events()
def _filter(self, event):
'''