Event return filtering

This commit is contained in:
Mike Place 2014-12-01 13:05:21 -07:00
parent 8877ba7d78
commit b0a3d2b87b
3 changed files with 40 additions and 2 deletions

View File

@ -132,6 +132,16 @@
# By default, events are not queued.
#event_return_queue: 0
# Only events returns matching tags in a whitelist
# event_return_whitelist:
# - salt/master/a_tag
# - salt/master/another_tag
# Store all event returns _except_ the tags in a blacklist
# event_return_blacklist:
# - salt/master/not_this_tag
# - salt/master/or_this_one
# Passing very large events can cause the minion to consume large amounts of
# memory. This value tunes the maximum size of a message allowed onto the
# master event bus. The value is expressed in bytes.

View File

@ -139,6 +139,8 @@ VALID_OPTS = {
'recon_randomize': float,
'event_return': str,
'event_return_queue': int,
'event_return_whitelist': list,
'event_return_blacklist': list,
'win_repo_cachefile': str,
'pidfile': str,
'range_server': str,
@ -547,6 +549,8 @@ DEFAULT_MASTER_OPTS = {
'reactor_worker_hwm': 10000,
'event_return': '',
'event_return_queue': 0,
'event_return_whitelist': [],
'event_return_blacklist': [],
'serial': 'msgpack',
'state_verbose': True,
'state_output': 'full',

View File

@ -600,6 +600,11 @@ class EventReturn(multiprocessing.Process):
and forwards events to the specified returner.
'''
def __init__(self, opts):
'''
Initialize the EventReturn system
Return an EventReturn instance
'''
multiprocessing.Process.__init__(self)
self.opts = opts
@ -609,6 +614,9 @@ class EventReturn(multiprocessing.Process):
self.minion = salt.minion.MasterMinion(local_minion_opts)
def run(self):
'''
Spin up the multiprocess event returner
'''
salt.utils.appendproctitle(self.__class__.__name__)
self.event = get_event('master', opts=self.opts)
events = self.event.iter_events(full=True)
@ -616,15 +624,31 @@ class EventReturn(multiprocessing.Process):
event_queue = []
try:
for event in events:
event_queue.append(event)
if self._filter(event):
event_queue.append(event)
if len(event_queue) >= self.event_return_queue:
log.trace('Storing events')
self.minion.returners['{0}.event_return'.format(self.opts['event_return'])](event_queue)
event_queue = []
except KeyError:
log.error('Could not store return for events {0}. Returner {1} '
'not found.'.format(events, self.opts.get('event_return', None)))
def _filter(self, event):
'''
Take an event and run it through configured filters.
Returns True if event should be stored, else False
'''
tag = event['tag']
if tag in self.opts['event_return_whitelist']:
if tag not in self.opts['event_return_blacklist']:
return True
else:
return False # Event was whitelisted and blacklisted
elif tag in self.opts['event_return_blacklist']:
return False
return True
class Reactor(multiprocessing.Process, salt.state.Compiler):
'''