salt/tests/unit/transport/pub_test.py
2015-04-14 09:16:42 -06:00

41 lines
1.2 KiB
Python

# -*- coding: utf-8 -*-
# Import Python Libs
from __future__ import absolute_import
# Import Salt Libs
import salt.transport.client
class PubChannelMixin(object):
def test_basic(self):
self.pub = None
def handle_pub(ret):
self.pub = ret
self.stop()
self.pub_channel = salt.transport.client.AsyncPubChannel.factory(self.minion_opts, io_loop=self.io_loop)
connect_future = self.pub_channel.connect()
connect_future.add_done_callback(lambda f: self.stop())
self.wait()
connect_future.result()
self.pub_channel.on_recv(handle_pub)
load = {
'fun': 'f',
'arg': 'a',
'tgt': 't',
'jid': 'j',
'ret': 'r',
'tgt_type': 'glob',
}
self.server_channel.publish(load)
self.wait()
self.assertEqual(self.pub['load'], load)
self.pub_channel.on_recv(None)
self.server_channel.publish(load)
with self.assertRaises(self.failureException):
self.wait(timeout=0.5)
# close our pub_channel, to pass our FD checks
del self.pub_channel