mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Merge pull request #35120 from kstreee/fix-missing-first-stream-data
The '_handle_event_socket_recv' function in Salt Api is missing first data of stream.
This commit is contained in:
commit
dd91006ed7
@ -272,13 +272,14 @@ class IPCClient(object):
|
||||
else:
|
||||
future = tornado.concurrent.Future()
|
||||
self._connecting_future = future
|
||||
self.io_loop.add_callback(self._connect, timeout=timeout)
|
||||
self._connect(timeout=timeout)
|
||||
|
||||
if callback is not None:
|
||||
def handle_future(future):
|
||||
response = future.result()
|
||||
self.io_loop.add_callback(callback, response)
|
||||
future.add_done_callback(handle_future)
|
||||
|
||||
return future
|
||||
|
||||
@tornado.gen.coroutine
|
||||
@ -665,14 +666,6 @@ class IPCMessageSubscriber(IPCClient):
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _read_async(self, callback):
|
||||
while not self.connected():
|
||||
try:
|
||||
yield self.connect()
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
|
||||
|
||||
while not self.stream.closed():
|
||||
try:
|
||||
self._read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
@ -694,6 +687,14 @@ class IPCMessageSubscriber(IPCClient):
|
||||
|
||||
:param callback: A callback with the received data
|
||||
'''
|
||||
while not self.connected():
|
||||
try:
|
||||
self.connect()
|
||||
except tornado.iostream.StreamClosedError:
|
||||
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
|
||||
|
||||
self.io_loop.spawn_callback(self._read_async, callback)
|
||||
|
||||
def close(self):
|
||||
|
@ -103,7 +103,8 @@ class TestEventListener(AsyncTestCase):
|
||||
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
|
||||
{'sock_dir': SOCK_DIR,
|
||||
'transport': 'zeromq'})
|
||||
event_future = event_listener.get_event(1, 'evt1', self.stop) # get an event future
|
||||
self._finished = False # fit to event_listener's behavior
|
||||
event_future = event_listener.get_event(self, 'evt1', self.stop) # get an event future
|
||||
me.fire_event({'data': 'foo2'}, 'evt2') # fire an event we don't want
|
||||
me.fire_event({'data': 'foo1'}, 'evt1') # fire an event we do want
|
||||
self.wait() # wait for the future
|
||||
@ -113,6 +114,27 @@ class TestEventListener(AsyncTestCase):
|
||||
self.assertEqual(event_future.result()['tag'], 'evt1')
|
||||
self.assertEqual(event_future.result()['data']['data'], 'foo1')
|
||||
|
||||
def test_set_event_handler(self):
|
||||
'''
|
||||
Test subscribing events using set_event_handler
|
||||
'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
|
||||
{'sock_dir': SOCK_DIR,
|
||||
'transport': 'zeromq'})
|
||||
self._finished = False # fit to event_listener's behavior
|
||||
event_future = event_listener.get_event(self,
|
||||
tag='evt',
|
||||
callback=self.stop,
|
||||
timeout=1,
|
||||
) # get an event future
|
||||
me.fire_event({'data': 'foo'}, 'evt') # fire an event we do want
|
||||
self.wait()
|
||||
|
||||
# check that we subscribed the event we wanted
|
||||
self.assertEqual(len(event_listener.timeout_map), 0)
|
||||
|
||||
def test_timeout(self):
|
||||
'''
|
||||
Make sure timeouts work correctly
|
||||
@ -121,7 +143,8 @@ class TestEventListener(AsyncTestCase):
|
||||
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
|
||||
{'sock_dir': SOCK_DIR,
|
||||
'transport': 'zeromq'})
|
||||
event_future = event_listener.get_event(1,
|
||||
self._finished = False # fit to event_listener's behavior
|
||||
event_future = event_listener.get_event(self,
|
||||
tag='evt1',
|
||||
callback=self.stop,
|
||||
timeout=1,
|
||||
|
Loading…
Reference in New Issue
Block a user