diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 23ad4c64d2..2ef6eadce8 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -139,7 +139,7 @@ class LocalClient(object): self.opts['sock_dir'], self.opts['transport'], opts=self.opts, - listen=not self.opts.get('__worker', False)) + listen=False) self.utils = salt.loader.utils(self.opts) self.functions = salt.loader.minion_mods(self.opts, utils=self.utils) self.returners = salt.loader.returners(self.opts, self.functions) @@ -978,7 +978,7 @@ class LocalClient(object): self.opts['sock_dir'], self.opts['transport'], opts=self.opts, - listen=not self.opts.get('__worker', False)) + listen=False) # start listening for new events, before firing off the pings event.connect_pub() # since this is a new ping, no one has responded yet diff --git a/salt/client/api.py b/salt/client/api.py index 137141dd8f..2dde18d5dd 100644 --- a/salt/client/api.py +++ b/salt/client/api.py @@ -46,7 +46,7 @@ class APIClient(object): Provide a uniform method of accessing the various client interfaces in Salt in the form of low-data data structures. For example: ''' - def __init__(self, opts=None): + def __init__(self, opts=None, listen=True): if not opts: opts = salt.config.client_config( os.environ.get( @@ -63,7 +63,8 @@ class APIClient(object): 'master', self.opts['sock_dir'], self.opts['transport'], - opts=self.opts) + opts=self.opts, + listen=listen) def run(self, cmd): ''' diff --git a/salt/client/mixins.py b/salt/client/mixins.py index b2706eeb2f..558e051c4e 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -146,7 +146,7 @@ class SyncClientMixin(object): 'eauth': 'pam', }) ''' - event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir']) + event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=True) job = self.master_call(**low) ret_tag = salt.utils.event.tagify('ret', base=job['tag']) diff --git a/salt/engines/logstash.py b/salt/engines/logstash.py index 3d198710ad..a5b2dd8fe0 100644 --- a/salt/engines/logstash.py +++ b/salt/engines/logstash.py @@ -52,13 +52,15 @@ def start(host, port=5959, tag='salt/engine/logstash'): if __opts__.get('id').endswith('_master'): event_bus = salt.utils.event.get_master_event( __opts__, - __opts__['sock_dir']) + __opts__['sock_dir'], + listen=True) else: event_bus = salt.utils.event.get_event( 'minion', transport=__opts__['transport'], opts=__opts__, - sock_dir=__opts__['sock_dir']) + sock_dir=__opts__['sock_dir'], + listen=True) log.debug('Logstash engine started') while True: diff --git a/salt/engines/sqs_events.py b/salt/engines/sqs_events.py index 0f2fd99097..c0d254d7d6 100644 --- a/salt/engines/sqs_events.py +++ b/salt/engines/sqs_events.py @@ -113,7 +113,8 @@ def start(queue, profile=None, tag='salt/engine/sqs'): if __opts__.get('__role') == 'master': fire_master = salt.utils.event.get_master_event( __opts__, - __opts__['sock_dir']).fire_event + __opts__['sock_dir'], + listen=False).fire_event else: fire_master = None diff --git a/salt/engines/test.py b/salt/engines/test.py index 53a81aa806..161a319357 100644 --- a/salt/engines/test.py +++ b/salt/engines/test.py @@ -21,13 +21,15 @@ def start(): if __opts__['__role'] == 'master': event_bus = salt.utils.event.get_master_event( __opts__, - __opts__['sock_dir']) + __opts__['sock_dir'], + listen=True) else: event_bus = salt.utils.event.get_event( 'minion', transport=__opts__['transport'], opts=__opts__, - sock_dir=__opts__['sock_dir']) + sock_dir=__opts__['sock_dir'], + listen=True) log.debug('test engine started') while True: diff --git a/salt/loader.py b/salt/loader.py index 583a6d3782..f821721bb4 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -204,7 +204,7 @@ def minion_mods( ret.pack['__salt__'] = ret if notify: - evt = salt.utils.event.get_event('minion', opts=opts) + evt = salt.utils.event.get_event('minion', opts=opts, listen=False) evt.fire_event({'complete': True}, tag='/salt/minion/minion_mod_complete') return ret diff --git a/salt/master.py b/salt/master.py index 0af8dd1058..0b11efa47d 100644 --- a/salt/master.py +++ b/salt/master.py @@ -165,7 +165,7 @@ class Maintenance(multiprocessing.Process): returners=self.returners) self.ckminions = salt.utils.minions.CkMinions(self.opts) # Make Event bus for firing - self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir']) + self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False) # Init any values needed by the git ext pillar self.git_pillar = salt.daemons.masterapi.init_git_pillar(self.opts) # Set up search object @@ -782,7 +782,7 @@ class AESFuncs(object): :returns: Instance for handling AES operations ''' self.opts = opts - self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir']) + self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False) self.serial = salt.payload.Serial(opts) self.ckminions = salt.utils.minions.CkMinions(opts) # Make a client @@ -1416,7 +1416,7 @@ class ClearFuncs(object): self.opts = opts self.key = key # Create the event manager - self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir']) + self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False) # Make a client self.local = salt.client.get_local_client(self.opts['conf_file']) # Make an minion checker object diff --git a/salt/modules/event.py b/salt/modules/event.py index b003c236bc..ff6c6b7bf8 100644 --- a/salt/modules/event.py +++ b/salt/modules/event.py @@ -68,7 +68,7 @@ def fire_master(data, tag, preload=None): # Usually, we can send the event via the minion, which is faster # because it is already authenticated try: - return salt.utils.event.MinionEvent(__opts__).fire_event( + return salt.utils.event.MinionEvent(__opts__, listen=False).fire_event( {'data': data, 'tag': tag, 'events': None, 'pretag': None}, 'fire_master') except Exception: return False diff --git a/salt/modules/mine.py b/salt/modules/mine.py index 96cc7d0d14..0ac2f4dcb0 100644 --- a/salt/modules/mine.py +++ b/salt/modules/mine.py @@ -58,7 +58,7 @@ def _mine_function_available(func): def _mine_send(load, opts): - eventer = salt.utils.event.MinionEvent(opts) + eventer = salt.utils.event.MinionEvent(opts, listen=False) event_ret = eventer.fire_event(load, '_minion_mine') # We need to pause here to allow for the decoupled nature of # events time to allow the mine to propagate diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py index c38f0568fb..dbce7acbc9 100644 --- a/salt/modules/saltutil.py +++ b/salt/modules/saltutil.py @@ -547,7 +547,7 @@ def refresh_modules(async=True): # If we're going to block, first setup a listener ret = __salt__['event.fire']({}, 'module_refresh') else: - eventer = salt.utils.event.get_event('minion', opts=__opts__) + eventer = salt.utils.event.get_event('minion', opts=__opts__, listen=True) ret = __salt__['event.fire']({'notify': True}, 'module_refresh') # Wait for the finish event to fire log.trace('refresh_modules waiting for module refresh to complete') diff --git a/salt/netapi/rest_cherrypy/app.py b/salt/netapi/rest_cherrypy/app.py index 00003c3f84..5952a6af57 100644 --- a/salt/netapi/rest_cherrypy/app.py +++ b/salt/netapi/rest_cherrypy/app.py @@ -1781,7 +1781,8 @@ class Events(object): 'master', sock_dir=self.opts['sock_dir'], transport=self.opts['transport'], - opts=self.opts) + opts=self.opts, + listen=True) stream = event.iter_events(full=True) yield u'retry: {0}\n'.format(400) @@ -1954,7 +1955,8 @@ class WebsocketEndpoint(object): 'master', sock_dir=self.opts['sock_dir'], transport=self.opts['transport'], - opts=self.opts) + opts=self.opts, + listen=True) stream = event.iter_events(full=True) SaltInfo = event_processor.SaltInfo(handler) while True: diff --git a/salt/netapi/rest_tornado/saltnado.py b/salt/netapi/rest_tornado/saltnado.py index d1c3ff6229..53ea88ab56 100644 --- a/salt/netapi/rest_tornado/saltnado.py +++ b/salt/netapi/rest_tornado/saltnado.py @@ -260,6 +260,7 @@ class EventListener(object): opts['sock_dir'], opts['transport'], opts=opts, + listen=True, ) self.event.subscribe() # start listening for events immediately @@ -1608,7 +1609,8 @@ class WebhookSaltAPIHandler(SaltAPIHandler): # pylint: disable=W0223 'master', self.application.opts['sock_dir'], self.application.opts['transport'], - opts=self.application.opts) + opts=self.application.opts, + listen=False) ret = self.event.fire_event({ 'post': self.raw_data, diff --git a/salt/runners/reactor.py b/salt/runners/reactor.py index 9ceb074f71..ba21c39579 100644 --- a/salt/runners/reactor.py +++ b/salt/runners/reactor.py @@ -34,7 +34,8 @@ def list_(saltenv='base', test=None): 'master', __opts__['sock_dir'], __opts__['transport'], - opts=__opts__) + opts=__opts__, + listen=True) __jid_event__.fire_event({}, 'salt/reactors/manage/list') @@ -60,7 +61,8 @@ def add(event, reactors, saltenv='base', test=None): 'master', __opts__['sock_dir'], __opts__['transport'], - opts=__opts__) + opts=__opts__, + listen=True) __jid_event__.fire_event({'event': event, 'reactors': reactors}, @@ -84,7 +86,8 @@ def delete(event, saltenv='base', test=None): 'master', __opts__['sock_dir'], __opts__['transport'], - opts=__opts__) + opts=__opts__, + listen=True) __jid_event__.fire_event({'event': event}, 'salt/reactors/manage/delete') diff --git a/salt/runners/state.py b/salt/runners/state.py index 754242d926..94824466a4 100644 --- a/salt/runners/state.py +++ b/salt/runners/state.py @@ -264,7 +264,8 @@ def event(tagmatch='*', count=-1, quiet=False, sock_dir=None, pretty=False): 'master', sock_dir or __opts__['sock_dir'], __opts__['transport'], - opts=__opts__) + opts=__opts__, + listen=True) while True: ret = sevent.get_event(full=True) diff --git a/salt/states/saltmod.py b/salt/states/saltmod.py index 65aef8dc7e..f4556c8c52 100644 --- a/salt/states/saltmod.py +++ b/salt/states/saltmod.py @@ -492,7 +492,8 @@ def wait_for_event( 'master', __opts__['sock_dir'], __opts__['transport'], - opts=__opts__) + opts=__opts__, + listen=True) del_counter = 0 starttime = time.time() diff --git a/salt/transport/mixins/auth.py b/salt/transport/mixins/auth.py index 1d7d2a14f8..4b028c58c5 100644 --- a/salt/transport/mixins/auth.py +++ b/salt/transport/mixins/auth.py @@ -71,7 +71,7 @@ class AESReqServerMixin(object): # other things needed for _auth # Create the event manager - self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir']) + self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=False) self.auto_key = salt.daemons.masterapi.AutoKey(self.opts) # only create a con_cache-client if the con_cache is active diff --git a/salt/utils/cloud.py b/salt/utils/cloud.py index ecb0e084bc..b281afd3f6 100644 --- a/salt/utils/cloud.py +++ b/salt/utils/cloud.py @@ -2044,7 +2044,7 @@ def check_auth(name, sock_dir=None, queue=None, timeout=300): This function is called from a multiprocess instance, to wait for a minion to become available to receive salt commands ''' - event = salt.utils.event.SaltEvent('master', sock_dir) + event = salt.utils.event.SaltEvent('master', sock_dir, listen=True) starttime = time.mktime(time.localtime()) newtimeout = timeout log.debug( diff --git a/salt/utils/error.py b/salt/utils/error.py index 87812830e1..23f4b47fb6 100644 --- a/salt/utils/error.py +++ b/salt/utils/error.py @@ -50,5 +50,5 @@ def fire_exception(exc, opts, job=None, node='minion'): ''' if job is None: job = {} - event = salt.utils.event.SaltEvent(node, opts=opts) + event = salt.utils.event.SaltEvent(node, opts=opts, listen=False) event.fire_event(pack_exception(exc), '_salt_error') diff --git a/salt/utils/event.py b/salt/utils/event.py index 8f5b914ebf..53f90f00e4 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -122,8 +122,8 @@ def get_event(node, sock_dir=None, transport='zeromq', opts=None, listen=True): # TODO: AIO core is separate from transport if transport in ('zeromq', 'tcp'): if node == 'master': - return MasterEvent(sock_dir, opts) - return SaltEvent(node, sock_dir, opts) + return MasterEvent(sock_dir, opts, listen=listen) + return SaltEvent(node, sock_dir, opts, listen=listen) elif transport == 'raet': import salt.utils.raetevent return salt.utils.raetevent.RAETEvent(node, @@ -138,7 +138,7 @@ def get_master_event(opts, sock_dir, listen=True): ''' # TODO: AIO core is separate from transport if opts['transport'] in ('zeromq', 'tcp'): - return MasterEvent(sock_dir, opts) + return MasterEvent(sock_dir, opts, listen=listen) elif opts['transport'] == 'raet': import salt.utils.raetevent return salt.utils.raetevent.MasterEvent( @@ -172,7 +172,7 @@ class SaltEvent(object): RAET compatible The base class used to manage salt events ''' - def __init__(self, node, sock_dir=None, opts=None): + def __init__(self, node, sock_dir=None, opts=None, listen=True): self.serial = salt.payload.Serial({'serial': 'msgpack'}) self.context = zmq.Context() self.poller = zmq.Poller() @@ -186,7 +186,8 @@ class SaltEvent(object): if salt.utils.is_windows() and not hasattr(opts, 'ipc_mode'): opts['ipc_mode'] = 'tcp' self.puburi, self.pulluri = self.__load_uri(sock_dir, node) - self.subscribe() + if listen: + self.subscribe() self.pending_events = [] self.__load_cache_regex() @@ -606,8 +607,8 @@ class MasterEvent(SaltEvent): RAET compatible Create a master event management object ''' - def __init__(self, sock_dir, opts=None): - super(MasterEvent, self).__init__('master', sock_dir, opts) + def __init__(self, sock_dir, opts=None, listen=True): + super(MasterEvent, self).__init__('master', sock_dir, opts, listen=listen) class LocalClientEvent(MasterEvent): @@ -640,9 +641,9 @@ class MinionEvent(SaltEvent): RAET compatible Create a master event management object ''' - def __init__(self, opts): + def __init__(self, opts, listen=True): super(MinionEvent, self).__init__( - 'minion', sock_dir=opts.get('sock_dir', None), opts=opts) + 'minion', sock_dir=opts.get('sock_dir', None), opts=opts, listen=listen) class AsyncEventPublisher(object): @@ -904,7 +905,7 @@ class EventReturn(multiprocessing.Process): signal.signal(signal.SIGTERM, self.sig_stop) salt.utils.appendproctitle(self.__class__.__name__) - self.event = get_event('master', opts=self.opts) + self.event = get_event('master', opts=self.opts, listen=True) events = self.event.iter_events(full=True) self.event.fire_event({}, 'salt/event_listen/start') try: @@ -947,7 +948,6 @@ class StateFire(object): ''' def __init__(self, opts, auth=None): self.opts = opts - self.event = SaltEvent(opts, 'minion') if not auth: self.auth = salt.crypt.SAuth(self.opts) else: diff --git a/salt/utils/raetevent.py b/salt/utils/raetevent.py index d13683dfbb..3dcea6895a 100644 --- a/salt/utils/raetevent.py +++ b/salt/utils/raetevent.py @@ -40,16 +40,15 @@ class RAETEvent(object): ''' self.node = node # application kind see kinds.APPL_KIND_NAMES self.sock_dir = sock_dir - self.listen = listen if opts is None: opts = {} self.opts = opts self.stack = None self.ryn = 'manor' # remote yard name self.connected = False - self.__prep_stack() + self.__prep_stack(listen) - def __prep_stack(self): + def __prep_stack(self, listen): ''' Prepare the stack objects ''' @@ -59,7 +58,8 @@ class RAETEvent(object): else: self.stack = transport.jobber_stack = self._setup_stack(ryn=self.ryn) log.debug("RAETEvent Using Jobber Stack at = {0}\n".format(self.stack.ha)) - self.connect_pub() + if listen: + self.subscribe() def _setup_stack(self, ryn='manor'): kind = self.opts.get('__role', '') # opts optional for master @@ -112,7 +112,8 @@ class RAETEvent(object): ''' Included for compat with zeromq events, not required ''' - return + if not self.connected: + self.connect_pub() def unsubscribe(self, tag=None): ''' @@ -124,16 +125,15 @@ class RAETEvent(object): ''' Establish the publish connection ''' - if not self.connected and self.listen: - try: - route = {'dst': (None, self.ryn, 'event_req'), - 'src': (None, self.stack.local.name, None)} - msg = {'route': route} - self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) - self.stack.serviceAll() - self.connected = True - except Exception: - pass + try: + route = {'dst': (None, self.ryn, 'event_req'), + 'src': (None, self.stack.local.name, None)} + msg = {'route': route} + self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) + self.stack.serviceAll() + self.connected = True + except Exception: + pass def connect_pull(self, timeout=1000): ''' @@ -156,7 +156,8 @@ class RAETEvent(object): IF wait is 0 then block forever. ''' - self.connect_pub() + if not self.connected: + self.connect_pub() start = time.time() while True: self.stack.serviceAll() @@ -180,7 +181,8 @@ class RAETEvent(object): ''' Get the raw event msg without blocking or any other niceties ''' - self.connect_pub() + if not self.connected: + self.connect_pub() self.stack.serviceAll() if self.stack.rxMsgs: msg, sender = self.stack.rxMsgs.popleft() @@ -204,7 +206,6 @@ class RAETEvent(object): Send a single event into the publisher with paylod dict "data" and event identifier "tag" ''' - self.connect_pub() # Timeout is retained for compat with zeromq events if not str(tag): # no empty tags allowed raise ValueError('Empty tag.') @@ -272,18 +273,17 @@ class PresenceEvent(MasterEvent): ''' Establish the publish connection ''' - if not self.connected and self.listen: - try: - route = {'dst': (None, self.ryn, 'presence_req'), - 'src': (None, self.stack.local.name, None)} - msg = {'route': route} - if self.state: - msg['data'] = {'state': self.state} - self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) - self.stack.serviceAll() - self.connected = True - except Exception: - pass + try: + route = {'dst': (None, self.ryn, 'presence_req'), + 'src': (None, self.stack.local.name, None)} + msg = {'route': route} + if self.state: + msg['data'] = {'state': self.state} + self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) + self.stack.serviceAll() + self.connected = True + except Exception: + pass class StatsEvent(MasterEvent): @@ -297,13 +297,12 @@ class StatsEvent(MasterEvent): ''' Establish the publish connection ''' - if not self.connected and self.listen: - try: - route = {'dst': (self.estate, None, 'stats_req'), - 'src': (None, self.stack.local.name, None)} - msg = {'route': route, 'tag': self.tag} - self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) - self.stack.serviceAll() - self.connected = True - except Exception: - pass + try: + route = {'dst': (self.estate, None, 'stats_req'), + 'src': (None, self.stack.local.name, None)} + msg = {'route': route, 'tag': self.tag} + self.stack.transmit(msg, self.stack.nameRemotes[self.ryn].uid) + self.stack.serviceAll() + self.connected = True + except Exception: + pass diff --git a/salt/utils/schedule.py b/salt/utils/schedule.py index 53cc0385b0..c4957d0daa 100644 --- a/salt/utils/schedule.py +++ b/salt/utils/schedule.py @@ -361,7 +361,7 @@ class Schedule(object): schedule = self.opts['pillar']['schedule'] # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': schedule}, tag='/salt/minion/minion_schedule_delete_complete') @@ -397,7 +397,7 @@ class Schedule(object): self.opts['schedule'].update(data) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': self.opts['schedule']}, tag='/salt/minion/minion_schedule_add_complete') @@ -416,7 +416,7 @@ class Schedule(object): schedule = self.opts['schedule'] # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': schedule}, tag='/salt/minion/minion_schedule_enabled_job_complete') @@ -437,7 +437,7 @@ class Schedule(object): schedule = self.opts['schedule'] # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': schedule}, tag='/salt/minion/minion_schedule_disabled_job_complete') @@ -507,7 +507,7 @@ class Schedule(object): self.opts['schedule']['enabled'] = True # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': self.opts['schedule']}, tag='/salt/minion/minion_schedule_enabled_complete') @@ -518,7 +518,7 @@ class Schedule(object): self.opts['schedule']['enabled'] = False # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': self.opts['schedule']}, tag='/salt/minion/minion_schedule_disabled_complete') @@ -554,7 +554,7 @@ class Schedule(object): schedule.update(self.opts['pillar']['schedule']) # Fire the complete event back along with the list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True, 'schedule': schedule}, tag='/salt/minion/minion_schedule_list_complete') @@ -565,7 +565,7 @@ class Schedule(object): self.persist() # Fire the complete event back along with the list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts) + evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) evt.fire_event({'complete': True}, tag='/salt/minion/minion_schedule_saved') diff --git a/tests/eventlisten.py b/tests/eventlisten.py index c6718e7900..d9f9a26727 100644 --- a/tests/eventlisten.py +++ b/tests/eventlisten.py @@ -107,7 +107,8 @@ def listen(opts): opts['node'], sock_dir=opts['sock_dir'], transport=opts['transport'], - opts=opts + opts=opts, + listen=True ) check_access_and_print_warning(opts['sock_dir']) print(event.puburi) diff --git a/tests/integration/modules/event.py b/tests/integration/modules/event.py index 1dab8c0a1a..82183d0047 100644 --- a/tests/integration/modules/event.py +++ b/tests/integration/modules/event.py @@ -29,7 +29,7 @@ class EventModuleTest(integration.ModuleCase): events = Queue() def get_event(events): - me = event.MasterEvent(self.master_opts['sock_dir']) + me = event.MasterEvent(self.master_opts['sock_dir'], listen=True) events.put_nowait( me.get_event(wait=10, tag='salttest', full=False) ) @@ -62,7 +62,7 @@ class EventModuleTest(integration.ModuleCase): events = Queue() def get_event(events): - me = event.MinionEvent(self.minion_opts) + me = event.MinionEvent(self.minion_opts, listen=True) events.put_nowait( me.get_event(wait=10, tag='salttest', full=False) ) @@ -91,7 +91,7 @@ class EventModuleTest(integration.ModuleCase): events = Queue() def get_event(events): - me = event.MinionEvent(self.sub_minion_opts) + me = event.MinionEvent(self.sub_minion_opts, listen=True) events.put_nowait( me.get_event(wait=10, tag='salttest', full=False) ) diff --git a/tests/unit/utils/event_test.py b/tests/unit/utils/event_test.py index 4584493803..d0967c6335 100644 --- a/tests/unit/utils/event_test.py +++ b/tests/unit/utils/event_test.py @@ -65,7 +65,7 @@ class EventSender(Process): self.wait = wait def run(self): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=False) time.sleep(self.wait) me.fire_event(self.data, self.tag) # Wait a few seconds before tearing down the zmq context @@ -99,7 +99,7 @@ class TestSaltEvent(TestCase): self.assertEqual(data[key], evt[key], msg) def test_master_event(self): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=False) self.assertEqual( me.puburi, 'ipc://{0}'.format( os.path.join(SOCK_DIR, 'master_event_pub.ipc') @@ -115,7 +115,7 @@ class TestSaltEvent(TestCase): def test_minion_event(self): opts = dict(id='foo', sock_dir=SOCK_DIR) id_hash = hashlib.md5(opts['id']).hexdigest()[:10] - me = event.MinionEvent(opts) + me = event.MinionEvent(opts, listen=False) self.assertEqual( me.puburi, 'ipc://{0}'.format( @@ -135,12 +135,12 @@ class TestSaltEvent(TestCase): def test_minion_event_tcp_ipc_mode(self): opts = dict(id='foo', ipc_mode='tcp') - me = event.MinionEvent(opts) + me = event.MinionEvent(opts, listen=False) self.assertEqual(me.puburi, 'tcp://127.0.0.1:4510') self.assertEqual(me.pulluri, 'tcp://127.0.0.1:4511') def test_minion_event_no_id(self): - me = event.MinionEvent(dict(sock_dir=SOCK_DIR)) + me = event.MinionEvent(dict(sock_dir=SOCK_DIR), listen=False) id_hash = hashlib.md5('').hexdigest()[:10] self.assertEqual( me.puburi, @@ -162,7 +162,7 @@ class TestSaltEvent(TestCase): def test_event_subscription(self): '''Test a single event is received''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') evt1 = me.get_event(tag='evt1') @@ -171,7 +171,7 @@ class TestSaltEvent(TestCase): def test_event_timeout(self): '''Test no event is received if the timeout is reached''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') evt1 = me.get_event(tag='evt1') @@ -182,7 +182,7 @@ class TestSaltEvent(TestCase): def test_event_no_timeout(self): '''Test no wait timeout, we should block forever, until we get one ''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') me.fire_event({'data': 'foo2'}, 'evt2') @@ -192,7 +192,7 @@ class TestSaltEvent(TestCase): def test_event_subscription_matching(self): '''Test a subscription startswith matching''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') evt1 = me.get_event(tag='evt1') @@ -201,7 +201,7 @@ class TestSaltEvent(TestCase): def test_event_subscription_matching_all(self): '''Test a subscription matching''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') evt1 = me.get_event(tag='') @@ -210,7 +210,7 @@ class TestSaltEvent(TestCase): def test_event_not_subscribed(self): '''Test get event ignores non-subscribed events''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() with eventsender_process({'data': 'foo1'}, 'evt1', 5): me.fire_event({'data': 'foo1'}, 'evt2') @@ -220,7 +220,7 @@ class TestSaltEvent(TestCase): def test_event_multiple_subscriptions(self): '''Test multiple subscriptions do not interfere''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() with eventsender_process({'data': 'foo1'}, 'evt1', 5): me.fire_event({'data': 'foo1'}, 'evt2') @@ -230,9 +230,9 @@ class TestSaltEvent(TestCase): def test_event_multiple_clients(self): '''Test event is received by multiple clients''' with eventpublisher_process(): - me1 = event.MasterEvent(SOCK_DIR) + me1 = event.MasterEvent(SOCK_DIR, listen=True) me1.subscribe() - me2 = event.MasterEvent(SOCK_DIR) + me2 = event.MasterEvent(SOCK_DIR, listen=True) me2.subscribe() me1.fire_event({'data': 'foo1'}, 'evt1') evt1 = me1.get_event(tag='evt1') @@ -245,7 +245,7 @@ class TestSaltEvent(TestCase): def test_event_nested_subs(self): '''Test nested event subscriptions do not drop events, issue #8580''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') me.fire_event({'data': 'foo2'}, 'evt2') @@ -272,7 +272,7 @@ class TestSaltEvent(TestCase): '''Test nested event subscriptions do not drop events, get event for all tags''' # Show why not to call get_event(tag='') with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() me.fire_event({'data': 'foo1'}, 'evt1') me.fire_event({'data': 'foo2'}, 'evt2') @@ -284,7 +284,7 @@ class TestSaltEvent(TestCase): def test_event_many(self): '''Test a large number of events, one at a time''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() for i in range(500): me.fire_event({'data': '{0}'.format(i)}, 'testevents') @@ -294,7 +294,7 @@ class TestSaltEvent(TestCase): def test_event_many_backlog(self): '''Test a large number of events, send all then recv all''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() # Must not exceed zmq HWM for i in range(500): @@ -308,7 +308,7 @@ class TestSaltEvent(TestCase): def test_send_master_event(self): '''Tests that sending an event through fire_master generates expected event''' with eventpublisher_process(): - me = event.MasterEvent(SOCK_DIR) + me = event.MasterEvent(SOCK_DIR, listen=True) me.subscribe() data = {'data': 'foo1'} me.fire_master(data, 'test_master') @@ -335,7 +335,7 @@ class TestAsyncEventPublisher(AsyncTestCase): def test_event_subscription(self): '''Test a single event is received''' - me = event.MinionEvent({'sock_dir': SOCK_DIR}) + me = event.MinionEvent({'sock_dir': SOCK_DIR}, listen=True) me.fire_event({'data': 'foo1'}, 'evt1') self.wait() evt1 = me.get_event(tag='evt1')