salt/tests/integration/netapi/rest_tornado/test_app.py
Colton Myers 76e468bc20 Merge remote-tracking branch 'upstream/2015.5' into merge-forward-2015.8
Conflicts:
    doc/topics/cloud/vmware.rst
    salt/cloud/clouds/digital_ocean_v2.py
    salt/cloud/clouds/vmware.py
    salt/daemons/flo/core.py
    salt/modules/hipchat.py
    salt/modules/lxc.py
    salt/modules/rabbitmq.py
    salt/pillar/s3.py
    salt/states/dockerio.py
    salt/states/file.py
    salt/utils/process.py
    salt/utils/s3.py
2015-07-20 13:55:14 -06:00

586 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Import Python Libs
from __future__ import absolute_import
from __future__ import print_function
import json
import time
from distutils.version import StrictVersion # pylint: disable=import-error,no-name-in-module
# Import Salt Libs
from salt.netapi.rest_tornado import saltnado
from unit.netapi.rest_tornado.test_handlers import SaltnadoTestCase
# Import Salt Testing Libs
from salttesting import skipIf
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../../')
import salt.ext.six as six
try:
import zmq
from zmq.eventloop.ioloop import ZMQIOLoop
HAS_ZMQ_IOLOOP = True
except ImportError:
HAS_ZMQ_IOLOOP = False
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
@skipIf(StrictVersion(zmq.__version__) < StrictVersion('14.0.1'), 'PyZMQ must be >= 14.0.1 to run these tests.')
class TestSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [('/', saltnado.SaltAPIHandler)]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
def test_root(self):
'''
Test the root path which returns the list of clients we support
'''
response = self.fetch('/',
connect_timeout=30,
request_timeout=30,
)
self.assertEqual(response.code, 200)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['clients'],
['runner',
'runner_async',
'local_async',
'local',
'local_batch']
)
self.assertEqual(response_obj['return'], 'Welcome')
def test_post_no_auth(self):
'''
Test post with no auth token, should 401
'''
# get a token for this test
low = [{'client': 'local',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json']},
follow_redirects=False,
connect_timeout=30,
request_timeout=30,
)
self.assertEqual(response.code, 302)
self.assertEqual(response.headers['Location'], '/login')
# Local client tests
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_simple_local_post(self):
'''
Test a basic API of /
'''
low = [{'client': 'local',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
def test_simple_local_post_no_tgt(self):
'''
POST job with invalid tgt
'''
low = [{'client': 'local',
'tgt': 'minion_we_dont_have',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], ["No minions matched the target. No command was sent, no jid was assigned."])
# local_batch tests
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_simple_local_batch_post(self):
'''
Basic post against local_batch
'''
low = [{'client': 'local_batch',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
# local_batch tests
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_full_local_batch_post(self):
'''
Test full parallelism of local_batch
'''
low = [{'client': 'local_batch',
'tgt': '*',
'fun': 'test.ping',
'batch': '100%',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
def test_simple_local_batch_post_no_tgt(self):
'''
Local_batch testing with no tgt
'''
low = [{'client': 'local_batch',
'tgt': 'minion_we_dont_have',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{}])
# local_async tests
def test_simple_local_async_post(self):
low = [{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
ret = response_obj['return']
ret[0]['minions'] = sorted(ret[0]['minions'])
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn('jid', ret[0])
self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))
def test_multi_local_async_post(self):
low = [{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
},
{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
ret = response_obj['return']
ret[0]['minions'] = sorted(ret[0]['minions'])
ret[1]['minions'] = sorted(ret[1]['minions'])
self.assertEqual(len(ret), 2)
self.assertIn('jid', ret[0])
self.assertIn('jid', ret[1])
self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))
self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion']))
def test_multi_local_async_post_multitoken(self):
low = [{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
},
{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
'token': self.token['token'], # send a different (but still valid token)
},
{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
'token': 'BAD_TOKEN', # send a bad token
},
]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
ret = response_obj['return']
ret[0]['minions'] = sorted(ret[0]['minions'])
ret[1]['minions'] = sorted(ret[1]['minions'])
self.assertEqual(len(ret), 3) # make sure we got 3 responses
self.assertIn('jid', ret[0]) # the first 2 are regular returns
self.assertIn('jid', ret[1])
self.assertIn('Failed to authenticate', ret[2]) # bad auth
self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))
self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion']))
def test_simple_local_async_post_no_tgt(self):
low = [{'client': 'local_async',
'tgt': 'minion_we_dont_have',
'fun': 'test.ping',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{}])
# runner tests
def test_simple_local_runner_post(self):
low = [{'client': 'runner',
'fun': 'manage.up',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = json.loads(response.body)
self.assertEqual(len(response_obj['return']), 1)
self.assertEqual(set(response_obj['return'][0]), set(['minion', 'sub_minion']))
# runner_async tests
def test_simple_local_runner_async_post(self):
low = [{'client': 'runner_async',
'fun': 'manage.up',
}]
response = self.fetch('/',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=10,
request_timeout=10,
)
response_obj = json.loads(response.body)
self.assertIn('return', response_obj)
self.assertEqual(1, len(response_obj['return']))
self.assertIn('jid', response_obj['return'][0])
self.assertIn('tag', response_obj['return'][0])
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
class TestMinionSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
(r"/minions", saltnado.MinionSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
def test_get_no_mid(self):
response = self.fetch('/minions',
method='GET',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
follow_redirects=False,
)
response_obj = json.loads(response.body)
self.assertEqual(len(response_obj['return']), 1)
# one per minion
self.assertEqual(len(response_obj['return'][0]), 2)
# check a single grain
for minion_id, grains in six.iteritems(response_obj['return'][0]):
self.assertEqual(minion_id, grains['id'])
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_get(self):
response = self.fetch('/minions/minion',
method='GET',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
follow_redirects=False,
)
response_obj = json.loads(response.body)
self.assertEqual(len(response_obj['return']), 1)
self.assertEqual(len(response_obj['return'][0]), 1)
# check a single grain
self.assertEqual(response_obj['return'][0]['minion']['id'], 'minion')
def test_post(self):
low = [{'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/minions',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
ret = response_obj['return']
ret[0]['minions'] = sorted(ret[0]['minions'])
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn('jid', ret[0])
self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))
def test_post_with_client(self):
# get a token for this test
low = [{'client': 'local_async',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/minions',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
ret = response_obj['return']
ret[0]['minions'] = sorted(ret[0]['minions'])
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn('jid', ret[0])
self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))
def test_post_with_incorrect_client(self):
'''
The /minions endpoint is async only, so if you try something else
make sure you get an error
'''
# get a token for this test
low = [{'client': 'local_batch',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/minions',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
self.assertEqual(response.code, 400)
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
class TestJobsSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
(r"/jobs", saltnado.JobsSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_get(self):
# test with no JID
self.http_client.fetch(self.get_url('/jobs'),
self.stop,
method='GET',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
follow_redirects=False,
)
response = self.wait(timeout=30)
response_obj = json.loads(response.body)['return'][0]
try:
for jid, ret in six.iteritems(response_obj):
self.assertIn('Function', ret)
self.assertIn('Target', ret)
self.assertIn('Target-type', ret)
self.assertIn('User', ret)
self.assertIn('StartTime', ret)
self.assertIn('Arguments', ret)
except AttributeError as attribute_error:
print(json.loads(response.body))
raise
# test with a specific JID passed in
jid = next(six.iterkeys(response_obj))
self.http_client.fetch(self.get_url('/jobs/{0}'.format(jid)),
self.stop,
method='GET',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
follow_redirects=False,
)
response = self.wait(timeout=30)
response_obj = json.loads(response.body)['return'][0]
self.assertIn('Function', response_obj)
self.assertIn('Target', response_obj)
self.assertIn('Target-type', response_obj)
self.assertIn('User', response_obj)
self.assertIn('StartTime', response_obj)
self.assertIn('Arguments', response_obj)
self.assertIn('Result', response_obj)
# TODO: run all the same tests from the root handler, but for now since they are
# the same code, we'll just sanity check
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
class TestRunSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [("/run", saltnado.RunSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
@skipIf(True, 'to be reenabled when #23623 is merged')
def test_get(self):
low = [{'client': 'local',
'tgt': '*',
'fun': 'test.ping',
}]
response = self.fetch('/run',
method='POST',
body=json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
class TestEventsSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [(r"/events", saltnado.EventsSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
# store a reference, for magic later!
self.application = application
self.events_to_fire = 0
return application
def test_get(self):
self.events_to_fire = 5
response = self.fetch('/events',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
streaming_callback=self.on_event,
)
def _stop(self):
self.stop()
def on_event(self, event):
if self.events_to_fire > 0:
self.application.event_listener.event.fire_event({
'foo': 'bar',
'baz': 'qux',
}, 'salt/netapi/test')
self.events_to_fire -= 1
# once we've fired all the events, lets call it a day
else:
# wait so that we can ensure that the next future is ready to go
# to make sure we don't explode if the next one is ready
ZMQIOLoop.current().add_timeout(time.time() + 0.5, self._stop)
event = event.strip()
# if we got a retry, just continue
if event != 'retry: 400':
tag, data = event.splitlines()
self.assertTrue(tag.startswith('tag: '))
self.assertTrue(data.startswith('data: '))
@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')
class TestWebhookSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
urls = [(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
]
application = self.build_tornado_app(urls)
self.application = application
application.event_listener = saltnado.EventListener({}, self.opts)
return application
def test_post(self):
def verify_event(future):
'''
Verify that the event fired on the master matches what we sent
'''
event = future.result()
self.assertEqual(event['tag'], 'salt/netapi/hook')
self.assertIn('headers', event['data'])
self.assertEqual(event['data']['post'], {'foo': 'bar'})
# get an event future
self._finished = False # TODO: remove after some cleanup of the event listener
event = self.application.event_listener.get_event(self,
tag='salt/netapi/hook',
callback=verify_event,
)
# fire the event
response = self.fetch('/hook',
method='POST',
body='foo=bar',
headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},
)
response_obj = json.loads(response.body)
self.assertTrue(response_obj['success'])
if __name__ == '__main__':
from integration import run_tests # pylint: disable=import-error
run_tests(TestEventsSaltAPIHandler,
TestJobsSaltAPIHandler,
TestMinionSaltAPIHandler,
TestRunSaltAPIHandler,
TestSaltAPIHandler,
TestWebhookSaltAPIHandler, needs_daemon=True)