salt/tests/integration/modules/test_publish.py

156 lines
4.2 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Import Python libs
2014-11-22 11:05:56 +00:00
from __future__ import absolute_import, print_function
# Import Salt Testing libs
from tests.support.case import ModuleCase
2017-04-02 16:09:47 +00:00
from tests.support.mixins import SaltReturnAssertsMixin
2012-03-29 00:37:05 +00:00
class PublishModuleTest(ModuleCase, SaltReturnAssertsMixin):
2012-03-29 00:37:05 +00:00
'''
Validate the publish module
'''
def test_publish(self):
'''
publish.publish
'''
2017-05-12 17:32:04 +00:00
ret = self.run_function(
'publish.publish',
['minion', 'test.ping'],
f_timeout=50
)
self.assertEqual(ret, {'minion': True})
ret = self.run_function(
'publish.publish',
2017-02-26 02:11:06 +00:00
['minion', 'test.kwarg'],
2017-05-12 17:32:04 +00:00
f_arg='cheese=spam',
f_timeout=50
)
ret = ret['minion']
check_true = (
'cheese',
'__pub_arg',
'__pub_fun',
'__pub_id',
'__pub_jid',
'__pub_ret',
'__pub_tgt',
'__pub_tgt_type',
)
for name in check_true:
if name not in ret:
2014-11-22 11:05:56 +00:00
print(name)
self.assertTrue(name in ret)
self.assertEqual(ret['cheese'], 'spam')
2017-02-26 02:11:06 +00:00
self.assertEqual(ret['__pub_arg'], [{'cheese': 'spam'}])
self.assertEqual(ret['__pub_id'], 'minion')
self.assertEqual(ret['__pub_fun'], 'test.kwarg')
2012-03-29 00:37:05 +00:00
2015-02-24 22:35:21 +00:00
def test_publish_yaml_args(self):
'''
test publish.publish yaml args formatting
'''
2017-05-12 17:32:04 +00:00
ret = self.run_function(
'publish.publish',
['minion', 'test.ping'],
f_timeout=50
)
2015-02-24 22:35:21 +00:00
self.assertEqual(ret, {'minion': True})
test_args_list = ['saltines, si', 'crackers, nein', 'cheese, indeed']
test_args = '["{args[0]}", "{args[1]}", "{args[2]}"]'.format(args=test_args_list)
ret = self.run_function(
'publish.publish',
2017-05-12 17:32:04 +00:00
['minion', 'test.arg', test_args],
f_timeout=50
2015-02-24 22:35:21 +00:00
)
ret = ret['minion']
check_true = (
'__pub_arg',
'__pub_fun',
'__pub_id',
'__pub_jid',
'__pub_ret',
'__pub_tgt',
'__pub_tgt_type',
)
for name in check_true:
if name not in ret['kwargs']:
print(name)
2015-02-24 22:35:21 +00:00
self.assertTrue(name in ret['kwargs'])
self.assertEqual(ret['args'], test_args_list)
self.assertEqual(ret['kwargs']['__pub_id'], 'minion')
self.assertEqual(ret['kwargs']['__pub_fun'], 'test.arg')
2012-03-29 00:37:05 +00:00
def test_full_data(self):
'''
publish.full_data
'''
ret = self.run_function(
'publish.full_data',
2017-05-12 17:32:04 +00:00
['minion', 'test.fib', 20],
f_timeout=50
)
2013-04-27 18:59:34 +00:00
self.assertTrue(ret)
2015-04-11 18:48:48 +00:00
self.assertEqual(ret['minion']['ret'][0], 6765)
def test_kwarg(self):
'''
Verify that the pub data is making it to the minion functions
'''
ret = self.run_function(
'publish.full_data',
2017-02-26 02:11:06 +00:00
['minion', 'test.kwarg'],
2017-05-12 17:32:04 +00:00
f_arg='cheese=spam',
f_timeout=50
)
ret = ret['minion']['ret']
check_true = (
'cheese',
'__pub_arg',
'__pub_fun',
'__pub_id',
'__pub_jid',
'__pub_ret',
'__pub_tgt',
'__pub_tgt_type',
)
for name in check_true:
if name not in ret:
2014-11-22 11:05:56 +00:00
print(name)
self.assertTrue(name in ret)
2012-05-29 16:40:20 +00:00
self.assertEqual(ret['cheese'], 'spam')
2017-02-26 02:11:06 +00:00
self.assertEqual(ret['__pub_arg'], [{'cheese': 'spam'}])
2012-05-29 16:40:20 +00:00
self.assertEqual(ret['__pub_id'], 'minion')
self.assertEqual(ret['__pub_fun'], 'test.kwarg')
ret = self.run_function(
'publish.full_data',
2017-02-26 02:11:06 +00:00
['minion', 'test.kwarg'],
2017-05-12 17:32:04 +00:00
cheese='spam',
f_timeout=50
)
self.assertIn(
2014-07-01 21:13:52 +00:00
'The following keyword arguments are not valid', ret
)
2012-03-29 00:47:10 +00:00
def test_reject_minion(self):
'''
Test bad authentication
'''
ret = self.run_function(
'publish.publish',
2017-05-12 17:32:04 +00:00
['minion', 'cmd.run', ['echo foo']],
f_timeout=50
)
2012-03-29 00:47:10 +00:00
self.assertEqual(ret, {})