mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 08:58:59 +00:00
Add support for kapacitor 0.13 (#33352)
* add support for kapacitor 0.13 * update kapacitor tests
This commit is contained in:
parent
0a35106df9
commit
8abf303183
@ -22,6 +22,7 @@ import logging
|
||||
|
||||
import salt.utils
|
||||
import salt.utils.http
|
||||
from salt.utils.decorators import memoize
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -30,6 +31,17 @@ def __virtual__():
|
||||
return 'kapacitor' if salt.utils.which('kapacitor') else False
|
||||
|
||||
|
||||
@memoize
|
||||
def version():
|
||||
'''
|
||||
Get the kapacitor version.
|
||||
'''
|
||||
version = __salt__['pkg.version']('kapacitor')
|
||||
if not version:
|
||||
version = str(__salt__['config.option']('kapacitor.version', 'latest'))
|
||||
return version
|
||||
|
||||
|
||||
def get_task(name):
|
||||
'''
|
||||
Get a dict of data on a task.
|
||||
@ -45,7 +57,12 @@ def get_task(name):
|
||||
'''
|
||||
host = __salt__['config.option']('kapacitor.host', 'localhost')
|
||||
port = __salt__['config.option']('kapacitor.port', 9092)
|
||||
url = 'http://{0}:{1}/task?name={2}'.format(host, port, name)
|
||||
|
||||
if version() < '0.13':
|
||||
url = 'http://{0}:{1}/task?name={2}'.format(host, port, name)
|
||||
else:
|
||||
url = 'http://{0}:{1}/kapacitor/v1/tasks/{2}?skip-format=true'.format(host, port, name)
|
||||
|
||||
response = salt.utils.http.query(url, status=True)
|
||||
|
||||
if response['status'] == 404:
|
||||
@ -53,7 +70,20 @@ def get_task(name):
|
||||
|
||||
data = json.loads(response['body'])
|
||||
|
||||
return data
|
||||
if version() < '0.13':
|
||||
return {
|
||||
'script': data['TICKscript'],
|
||||
'type': data['Type'],
|
||||
'dbrps': data['DBRPs'],
|
||||
'enabled': data['Enabled'],
|
||||
}
|
||||
|
||||
return {
|
||||
'script': data['script'],
|
||||
'type': data['type'],
|
||||
'dbrps': data['dbrps'],
|
||||
'enabled': data['status'] == 'enabled',
|
||||
}
|
||||
|
||||
|
||||
def _run_cmd(cmd):
|
||||
@ -102,11 +132,16 @@ def define_task(name,
|
||||
|
||||
salt '*' kapacitor.define_task cpu salt://kapacitor/cpu.tick database=telegraf
|
||||
'''
|
||||
cmd = 'kapacitor define -name {0} -tick {1}'.format(name, tick_script)
|
||||
if version() < '0.13':
|
||||
cmd = 'kapacitor define -name {0}'.format(name)
|
||||
else:
|
||||
cmd = 'kapacitor define {0}'.format(name)
|
||||
|
||||
if tick_script.startswith('salt://'):
|
||||
tick_script = __salt__['cp.cache_file'](tick_script, __env__)
|
||||
|
||||
cmd += ' -tick {0}'.format(tick_script)
|
||||
|
||||
if task_type:
|
||||
cmd += ' -type {0}'.format(task_type)
|
||||
|
||||
|
@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def __virtual__():
|
||||
return 'kapacitor' if 'kapacitor.get_task' in __salt__ else False
|
||||
return 'kapacitor' if 'kapacitor.version' in __salt__ else False
|
||||
|
||||
|
||||
def task_present(name,
|
||||
@ -62,7 +62,7 @@ def task_present(name,
|
||||
ret = {'name': name, 'changes': {}, 'result': True, 'comment': ''}
|
||||
|
||||
task = __salt__['kapacitor.get_task'](name)
|
||||
old_script = task['TICKscript'] if task else ''
|
||||
old_script = task['script'] if task else ''
|
||||
|
||||
if tick_script.startswith('salt://'):
|
||||
script_path = __salt__['cp.cache_file'](tick_script, __env__)
|
||||
@ -70,7 +70,7 @@ def task_present(name,
|
||||
script_path = tick_script
|
||||
|
||||
with salt.utils.fopen(script_path, 'r') as file:
|
||||
new_script = file.read()
|
||||
new_script = file.read().replace('\t', ' ')
|
||||
|
||||
if old_script == new_script:
|
||||
comments.append('Task script is already up-to-date')
|
||||
@ -96,7 +96,7 @@ def task_present(name,
|
||||
comments.append('Task script updated')
|
||||
|
||||
if enable:
|
||||
if task and task['Enabled']:
|
||||
if task and task['enabled']:
|
||||
comments.append('Task is already enabled')
|
||||
else:
|
||||
if __opts__['test']:
|
||||
@ -114,7 +114,7 @@ def task_present(name,
|
||||
comments.append('Task was enabled')
|
||||
ret['changes']['enabled'] = {'old': False, 'new': True}
|
||||
else:
|
||||
if task and not task['Enabled']:
|
||||
if task and not task['enabled']:
|
||||
comments.append('Task is already disabled')
|
||||
else:
|
||||
if __opts__['test']:
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import
|
||||
import json
|
||||
|
||||
# Import Salt libs
|
||||
from salt.modules import kapacitor
|
||||
@ -11,6 +12,7 @@ from salttesting import TestCase
|
||||
from salttesting.mock import Mock, patch
|
||||
|
||||
kapacitor.__salt__ = {
|
||||
'pkg.version': Mock(return_value='9999'),
|
||||
'config.option': Mock(side_effect=lambda key, default: default)
|
||||
}
|
||||
kapacitor.__env__ = 'test'
|
||||
@ -18,24 +20,30 @@ kapacitor.__env__ = 'test'
|
||||
|
||||
class KapacitorTestCase(TestCase):
|
||||
def test_get_task_success(self):
|
||||
query_ret = {'body': '{"foo":"bar"}', 'status': 200}
|
||||
http_body = json.dumps({
|
||||
'script': 'test',
|
||||
'type': 'stream',
|
||||
'dbrps': [{'db': 'db', 'rp': 'rp'}],
|
||||
'status': 'enabled',
|
||||
})
|
||||
query_ret = {'body': http_body, 'status': 200}
|
||||
with patch('salt.utils.http.query', return_value=query_ret) as http_mock:
|
||||
task = kapacitor.get_task('taskname')
|
||||
http_mock.assert_called_once_with('http://localhost:9092/task?name=taskname', status=True)
|
||||
assert {'foo': 'bar'} == task
|
||||
http_mock.assert_called_once_with('http://localhost:9092/kapacitor/v1/tasks/taskname?skip-format=true', status=True)
|
||||
self.assertEqual('test', task['script'])
|
||||
|
||||
def test_get_task_not_found(self):
|
||||
query_ret = {'body': '{"Error":"unknown task taskname"}', 'status': 404}
|
||||
with patch('salt.utils.http.query', return_value=query_ret) as http_mock:
|
||||
task = kapacitor.get_task('taskname')
|
||||
http_mock.assert_called_once_with('http://localhost:9092/task?name=taskname', status=True)
|
||||
assert task is None
|
||||
http_mock.assert_called_once_with('http://localhost:9092/kapacitor/v1/tasks/taskname?skip-format=true', status=True)
|
||||
self.assertEqual(None, task)
|
||||
|
||||
def test_define_task(self):
|
||||
cmd_mock = Mock(return_value={'retcode': 0})
|
||||
with patch.dict(kapacitor.__salt__, {'cmd.run_all': cmd_mock}):
|
||||
kapacitor.define_task('taskname', '/tmp/script.tick')
|
||||
cmd_mock.assert_called_once_with('kapacitor define -name taskname '
|
||||
cmd_mock.assert_called_once_with('kapacitor define taskname '
|
||||
'-tick /tmp/script.tick -type stream')
|
||||
|
||||
def test_enable_task(self):
|
||||
|
@ -26,6 +26,10 @@ def _present(name='testname',
|
||||
enable_result=True,
|
||||
disable_result=True,
|
||||
script='test'):
|
||||
'''
|
||||
Run a "kapacitor.present" state after setting up mocks, and return the
|
||||
state return value as well as the mocks to make assertions.
|
||||
'''
|
||||
get_mock = Mock(return_value=task)
|
||||
|
||||
if isinstance(define_result, bool):
|
||||
@ -49,6 +53,7 @@ def _present(name='testname',
|
||||
with patch('salt.utils.fopen', mock_open(read_data=script)) as open_mock:
|
||||
retval = kapacitor.task_present(name, tick_script, task_type=task_type,
|
||||
database=database, retention_policy=retention_policy, enable=enable)
|
||||
|
||||
return retval, get_mock, define_mock, enable_mock, disable_mock
|
||||
|
||||
|
||||
@ -64,7 +69,7 @@ class KapacitorTestCase(TestCase):
|
||||
self.assertEqual(True, ret['changes']['enabled']['new'])
|
||||
|
||||
def test_task_present_existing_task(self):
|
||||
old_task = {'TICKscript': 'old_task', 'Enabled': True}
|
||||
old_task = {'script': 'old_task', 'enabled': True}
|
||||
ret, get_mock, define_mock, enable_mock, _ = _present(task=old_task)
|
||||
get_mock.assert_called_once_with('testname')
|
||||
define_mock.assert_called_once_with('testname', '/tmp/script.tick',
|
||||
@ -74,7 +79,7 @@ class KapacitorTestCase(TestCase):
|
||||
self.assertNotIn('enabled', ret['changes'])
|
||||
|
||||
def test_task_present_existing_task_not_enabled(self):
|
||||
old_task = {'TICKscript': 'test', 'Enabled': False}
|
||||
old_task = {'script': 'test', 'enabled': False}
|
||||
ret, get_mock, define_mock, enable_mock, _ = _present(task=old_task)
|
||||
get_mock.assert_called_once_with('testname')
|
||||
self.assertEqual(False, define_mock.called)
|
||||
@ -84,7 +89,7 @@ class KapacitorTestCase(TestCase):
|
||||
self.assertEqual(True, ret['changes']['enabled']['new'])
|
||||
|
||||
def test_task_present_disable_existing_task(self):
|
||||
old_task = {'TICKscript': 'test', 'Enabled': True}
|
||||
old_task = {'script': 'test', 'enabled': True}
|
||||
ret, get_mock, define_mock, _, disable_mock = _present(task=old_task, enable=False)
|
||||
get_mock.assert_called_once_with('testname')
|
||||
self.assertEqual(False, define_mock.called)
|
||||
|
Loading…
Reference in New Issue
Block a user