diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 2d8d261e97..2580aeb7ac 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -272,13 +272,14 @@ class IPCClient(object): else: future = tornado.concurrent.Future() self._connecting_future = future - self.io_loop.add_callback(self._connect, timeout=timeout) + self._connect(timeout=timeout) if callback is not None: def handle_future(future): response = future.result() self.io_loop.add_callback(callback, response) future.add_done_callback(handle_future) + return future @tornado.gen.coroutine @@ -665,14 +666,6 @@ class IPCMessageSubscriber(IPCClient): @tornado.gen.coroutine def _read_async(self, callback): - while not self.connected(): - try: - yield self.connect() - except tornado.iostream.StreamClosedError: - log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path)) - except Exception as exc: - log.error('Exception occurred while Subscriber connecting: {0}'.format(exc)) - while not self.stream.closed(): try: self._read_stream_future = self.stream.read_bytes(4096, partial=True) @@ -694,6 +687,14 @@ class IPCMessageSubscriber(IPCClient): :param callback: A callback with the received data ''' + while not self.connected(): + try: + self.connect() + except tornado.iostream.StreamClosedError: + log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path)) + except Exception as exc: + log.error('Exception occurred while Subscriber connecting: {0}'.format(exc)) + self.io_loop.spawn_callback(self._read_async, callback) def close(self): diff --git a/tests/unit/netapi/rest_tornado/test_utils.py b/tests/unit/netapi/rest_tornado/test_utils.py index 1dd18d5c5b..c22c288daf 100644 --- a/tests/unit/netapi/rest_tornado/test_utils.py +++ b/tests/unit/netapi/rest_tornado/test_utils.py @@ -103,7 +103,8 @@ class TestEventListener(AsyncTestCase): event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save? {'sock_dir': SOCK_DIR, 'transport': 'zeromq'}) - event_future = event_listener.get_event(1, 'evt1', self.stop) # get an event future + self._finished = False # fit to event_listener's behavior + event_future = event_listener.get_event(self, 'evt1', self.stop) # get an event future me.fire_event({'data': 'foo2'}, 'evt2') # fire an event we don't want me.fire_event({'data': 'foo1'}, 'evt1') # fire an event we do want self.wait() # wait for the future @@ -113,6 +114,27 @@ class TestEventListener(AsyncTestCase): self.assertEqual(event_future.result()['tag'], 'evt1') self.assertEqual(event_future.result()['data']['data'], 'foo1') + def test_set_event_handler(self): + ''' + Test subscribing events using set_event_handler + ''' + with eventpublisher_process(): + me = event.MasterEvent(SOCK_DIR) + event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save? + {'sock_dir': SOCK_DIR, + 'transport': 'zeromq'}) + self._finished = False # fit to event_listener's behavior + event_future = event_listener.get_event(self, + tag='evt', + callback=self.stop, + timeout=1, + ) # get an event future + me.fire_event({'data': 'foo'}, 'evt') # fire an event we do want + self.wait() + + # check that we subscribed the event we wanted + self.assertEqual(len(event_listener.timeout_map), 0) + def test_timeout(self): ''' Make sure timeouts work correctly @@ -121,7 +143,8 @@ class TestEventListener(AsyncTestCase): event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save? {'sock_dir': SOCK_DIR, 'transport': 'zeromq'}) - event_future = event_listener.get_event(1, + self._finished = False # fit to event_listener's behavior + event_future = event_listener.get_event(self, tag='evt1', callback=self.stop, timeout=1,