salt/tests/unit/transport/ipc_test.py
Mike Place fcd9197f86 IPC transport skeleton
Basic IPC server works!

Lint

Skeleton of client

IPC bind test

Make stand-alone

Adding factories for push and pull channels

Allowing opts passing for consistency

Tests now (mostly) work

Lint

Method documentation

General cleanup. Migrate to inheritence.

Log cleanup

Migrate framing to stand-along module

Migrate ipc.py to new framer

Working except for serialization bug

Debugging

Debugging

It works!!

Remove ZeroMQ from TCP transport :]

General cleanup

Linting

General cleanup

Align socket name with what client expects

Remove unused buffer size flag

exception handling for stream close

Calls to parent class inits

Docs

Remove debugging

Remove unused function

Remove unnecessary pre/post fork on msgclient

Remove unecessary timeout flag

Better stream/socket shutdown in server

Remove unused handler

Removing more unused

More function cleanup

Removing more unneeded cruft

Lint

Round out documentation

More docs

Misc hacks to fix up @cachedout's IPC

This was using a mix of blocking and non-blocking calls, which was making a bit of a mess. connect and write are both non-blocking calls on IOStreams, so we either need to handle all the callbacks or do them in the coroutine fashion (much easier to manage). This meant that in the tests your "write" wouldn't make it out since we didn't wait on the connect.

IMO we should refactor this IPC stuff to have proper async interfaces and wrap if absolutely necessary, but I think its reasonable to ask that as part of this we make some more of the core coroutines :)

for #23236

Lint

Remove init of io_loop because we require start()

Various fixes

Remove uneeded functionality

Remove dup

Cleanup and remove unused functions

Moving toward coroutines

More lint

handle_connection changed to spawn

Singletons for ipcclient

Lint disable

Remove redundent check in close()

Remove duplicates in init

Improved exception handling

Test framework

Require sock path to be passed in

Better testing approach

Remove unecessary __init__

Misc cleanup of unecessary methods

Major rework of the IPC channels to make them work :)

Remove TODO, since the feature was implemented

Add more tests for IPC

Add support for reconnecting clients, as well as a return from the IPCServer

misc cleanup

Lint test case

Lint transport
2015-07-13 14:44:10 -06:00

147 lines
3.9 KiB
Python

# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Mike Place <mp@saltstack.com>`
'''
# Import python libs
from __future__ import absolute_import
import os
import logging
import tornado.gen
import tornado.ioloop
import tornado.testing
import salt.utils
import salt.config
import salt.exceptions
import salt.transport.ipc
import salt.transport.server
import salt.transport.client
from salt.ext.six.moves import range
# Import Salt Testing libs
import integration
from salttesting.mock import MagicMock
from salttesting.helpers import ensure_in_syspath
log = logging.getLogger(__name__)
ensure_in_syspath('../')
class BaseIPCReqCase(tornado.testing.AsyncTestCase):
'''
Test the req server/client pair
'''
def setUp(self):
super(BaseIPCReqCase, self).setUp()
self._start_handlers = dict(self.io_loop._handlers)
self.socket_path = os.path.join(integration.TMP, 'ipc_test.ipc')
self.server_channel = salt.transport.ipc.IPCMessageServer(
self.socket_path,
io_loop=self.io_loop,
payload_handler=self._handle_payload,
)
self.server_channel.start()
self.payloads = []
def tearDown(self):
super(BaseIPCReqCase, self).tearDown()
failures = []
self.server_channel.close()
os.unlink(self.socket_path)
for k, v in self.io_loop._handlers.iteritems():
if self._start_handlers.get(k) != v:
failures.append((k, v))
if len(failures) > 0:
raise Exception('FDs still attached to the IOLoop: {0}'.format(failures))
@tornado.gen.coroutine
def _handle_payload(self, payload, reply_func):
self.payloads.append(payload)
yield reply_func(payload)
if isinstance(payload, dict) and payload.get('stop'):
self.stop()
class IPCMessageClient(BaseIPCReqCase):
'''
Test all of the clear msg stuff
'''
def _get_channel(self):
channel = salt.transport.ipc.IPCMessageClient(
socket_path=self.socket_path,
io_loop=self.io_loop,
)
channel.connect(callback=self.stop)
self.wait()
return channel
def setUp(self):
super(IPCMessageClient, self).setUp()
self.channel = self._get_channel()
def tearDown(self):
super(IPCMessageClient, self).setUp()
self.channel.close()
def test_basic_send(self):
msg = {'foo': 'bar', 'stop': True}
self.channel.send(msg)
self.wait()
self.assertEqual(self.payloads[0], msg)
def test_many_send(self):
msgs = []
self.server_channel.stream_handler = MagicMock()
for i in range(0, 1000):
msgs.append('test_many_send_{0}'.format(i))
for i in msgs:
self.channel.send(i)
self.channel.send({'stop': True})
self.wait()
self.assertEqual(self.payloads[:-1], msgs)
def test_very_big_message(self):
long_str = ''.join([str(num) for num in range(10**5)])
msg = {'long_str': long_str, 'stop': True}
self.channel.send(msg)
self.wait()
self.assertEqual(msg, self.payloads[0])
def test_multistream_sends(self):
local_channel = self._get_channel()
for c in (self.channel, local_channel):
c.send('foo')
self.channel.send({'stop': True})
self.wait()
self.assertEqual(self.payloads[:-1], ['foo', 'foo'])
def test_multistream_errors(self):
local_channel = self._get_channel()
for c in (self.channel, local_channel):
c.send(None)
for c in (self.channel, local_channel):
c.send('foo')
self.channel.send({'stop': True})
self.wait()
self.assertEqual(self.payloads[:-1], [None, None, 'foo', 'foo'])
if __name__ == '__main__':
from integration import run_tests
run_tests(IPCMessageClient, needs_daemon=False)