Add basic tests for asyncEventpublisher

This commit is contained in:
Thomas Jackson 2015-04-07 20:30:04 -07:00
parent 0f4126a13e
commit f5e2650227

View File

@ -12,6 +12,7 @@ from __future__ import absolute_import
import os
import hashlib
import time
from tornado.testing import AsyncTestCase
import zmq
from contextlib import contextmanager
from multiprocessing import Process
@ -311,6 +312,33 @@ class TestSaltEvent(TestCase):
evt = me.get_event(tag='fire_master')
self.assertGotEvent(evt, {'data': data, 'tag': 'test_master', 'events': None, 'pretag': None})
class TestAsyncEventPublisher(AsyncTestCase):
def get_new_ioloop(self):
return zmq.eventloop.ioloop.ZMQIOLoop()
def setUp(self):
super(TestAsyncEventPublisher, self).setUp()
self.publisher = event.AsyncEventPublisher(
{'sock_dir': SOCK_DIR},
self._handle_publish,
self.io_loop,
)
def _handle_publish(self, raw):
self.tag, self.data = event.SaltEvent.unpack(raw)
self.stop()
def test_event_subscription(self):
'''Test a single event is received'''
me = event.MinionEvent({'sock_dir': SOCK_DIR})
me.fire_event({'data': 'foo1'}, 'evt1')
self.wait()
evt1 = me.get_event(tag='evt1')
self.assertEqual(self.tag, 'evt1')
self.data.pop('_stamp') # drop the stamp
self.assertEqual(self.data, {'data': 'foo1'})
if __name__ == '__main__':
from integration import run_tests
run_tests(TestSaltEvent, needs_daemon=False)