2015-12-01 13:44:08 +00:00
|
|
|
import time
|
|
|
|
|
2015-07-08 17:59:07 +00:00
|
|
|
from flask import request
|
2014-03-02 13:41:38 +00:00
|
|
|
from mock import patch
|
2015-12-01 13:44:08 +00:00
|
|
|
|
2014-03-02 13:41:38 +00:00
|
|
|
from tests import BaseTestCase
|
2014-09-21 07:11:03 +00:00
|
|
|
from redash import models
|
2016-03-14 09:19:23 +00:00
|
|
|
from redash.authentication.google_oauth import create_and_login_user, verify_profile
|
2015-07-08 17:59:07 +00:00
|
|
|
from redash.authentication import api_key_load_user_from_request, hmac_load_user_from_request, sign
|
2015-03-10 15:51:17 +00:00
|
|
|
from redash.wsgi import app
|
|
|
|
|
|
|
|
|
|
|
|
class TestApiKeyAuthentication(BaseTestCase):
|
|
|
|
#
|
|
|
|
# This is a bad way to write these tests, but the way Flask works doesn't make it easy to write them properly...
|
|
|
|
#
|
|
|
|
def setUp(self):
|
|
|
|
super(TestApiKeyAuthentication, self).setUp()
|
2016-11-28 14:41:29 +00:00
|
|
|
self.api_key = '10'
|
2015-12-01 13:44:08 +00:00
|
|
|
self.query = self.factory.create_query(api_key=self.api_key)
|
2016-11-28 14:41:29 +00:00
|
|
|
models.db.session.flush()
|
2016-01-03 14:05:29 +00:00
|
|
|
self.query_url = '/{}/api/queries/{}'.format(self.factory.org.slug, self.query.id)
|
|
|
|
self.queries_url = '/{}/api/queries'.format(self.factory.org.slug)
|
2015-03-10 15:51:17 +00:00
|
|
|
|
|
|
|
def test_no_api_key(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.query_url)
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNone(api_key_load_user_from_request(request))
|
2015-03-10 15:51:17 +00:00
|
|
|
|
|
|
|
def test_wrong_api_key(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.query_url, query_string={'api_key': 'whatever'})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNone(api_key_load_user_from_request(request))
|
2015-03-10 15:51:17 +00:00
|
|
|
|
|
|
|
def test_correct_api_key(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.query_url, query_string={'api_key': self.api_key})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNotNone(api_key_load_user_from_request(request))
|
2015-03-10 15:51:17 +00:00
|
|
|
|
|
|
|
def test_no_query_id(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.queries_url, query_string={'api_key': self.api_key})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNone(api_key_load_user_from_request(request))
|
2014-03-02 13:41:38 +00:00
|
|
|
|
2015-07-08 17:59:07 +00:00
|
|
|
def test_user_api_key(self):
|
2015-12-01 13:44:08 +00:00
|
|
|
user = self.factory.create_user(api_key="user_key")
|
2016-11-28 14:41:29 +00:00
|
|
|
models.db.session.flush()
|
2015-07-08 17:59:07 +00:00
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.queries_url, query_string={'api_key': user.api_key})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertEqual(user.id, api_key_load_user_from_request(request).id)
|
|
|
|
|
2015-10-11 08:54:21 +00:00
|
|
|
def test_api_key_header(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.query_url, headers={'Authorization': "Key {}".format(self.api_key)})
|
2015-10-11 08:54:21 +00:00
|
|
|
self.assertIsNotNone(api_key_load_user_from_request(request))
|
|
|
|
|
|
|
|
def test_api_key_header_with_wrong_key(self):
|
|
|
|
with app.test_client() as c:
|
2016-01-03 14:05:29 +00:00
|
|
|
rv = c.get(self.query_url, headers={'Authorization': "Key oops"})
|
2015-10-11 08:54:21 +00:00
|
|
|
self.assertIsNone(api_key_load_user_from_request(request))
|
|
|
|
|
2016-01-03 14:05:29 +00:00
|
|
|
def test_api_key_for_wrong_org(self):
|
|
|
|
other_user = self.factory.create_admin(org=self.factory.create_org())
|
|
|
|
|
|
|
|
with app.test_client() as c:
|
|
|
|
rv = c.get(self.query_url, headers={'Authorization': "Key {}".format(other_user.api_key)})
|
|
|
|
self.assertEqual(404, rv.status_code)
|
|
|
|
|
2015-10-11 08:54:21 +00:00
|
|
|
|
2015-07-08 17:59:07 +00:00
|
|
|
class TestHMACAuthentication(BaseTestCase):
|
|
|
|
#
|
|
|
|
# This is a bad way to write these tests, but the way Flask works doesn't make it easy to write them properly...
|
|
|
|
#
|
|
|
|
def setUp(self):
|
|
|
|
super(TestHMACAuthentication, self).setUp()
|
2016-11-28 14:41:29 +00:00
|
|
|
self.api_key = '10'
|
2015-12-01 13:44:08 +00:00
|
|
|
self.query = self.factory.create_query(api_key=self.api_key)
|
2016-11-28 14:41:29 +00:00
|
|
|
models.db.session.flush()
|
2016-02-25 08:07:47 +00:00
|
|
|
self.path = '/{}/api/queries/{}'.format(self.query.org.slug, self.query.id)
|
2015-07-08 17:59:07 +00:00
|
|
|
self.expires = time.time() + 1800
|
|
|
|
|
|
|
|
def signature(self, expires):
|
|
|
|
return sign(self.query.api_key, self.path, expires)
|
|
|
|
|
|
|
|
def test_no_signature(self):
|
|
|
|
with app.test_client() as c:
|
|
|
|
rv = c.get(self.path)
|
|
|
|
self.assertIsNone(hmac_load_user_from_request(request))
|
|
|
|
|
|
|
|
def test_wrong_signature(self):
|
|
|
|
with app.test_client() as c:
|
|
|
|
rv = c.get(self.path, query_string={'signature': 'whatever', 'expires': self.expires})
|
|
|
|
self.assertIsNone(hmac_load_user_from_request(request))
|
|
|
|
|
|
|
|
def test_correct_signature(self):
|
|
|
|
with app.test_client() as c:
|
2016-02-25 08:07:47 +00:00
|
|
|
rv = c.get(self.path, query_string={'signature': self.signature(self.expires), 'expires': self.expires})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNotNone(hmac_load_user_from_request(request))
|
|
|
|
|
|
|
|
def test_no_query_id(self):
|
|
|
|
with app.test_client() as c:
|
2016-10-26 08:47:40 +00:00
|
|
|
rv = c.get('/{}/api/queries'.format(self.query.org.slug), query_string={'api_key': self.api_key})
|
2015-07-08 17:59:07 +00:00
|
|
|
self.assertIsNone(hmac_load_user_from_request(request))
|
|
|
|
|
|
|
|
def test_user_api_key(self):
|
2015-12-01 13:44:08 +00:00
|
|
|
user = self.factory.create_user(api_key="user_key")
|
2015-07-08 17:59:07 +00:00
|
|
|
path = '/api/queries/'
|
2016-11-28 14:41:29 +00:00
|
|
|
models.db.session.flush()
|
2015-07-08 17:59:07 +00:00
|
|
|
with app.test_client() as c:
|
|
|
|
signature = sign(user.api_key, path, self.expires)
|
|
|
|
rv = c.get(path, query_string={'signature': signature, 'expires': self.expires, 'user_id': user.id})
|
2016-11-28 14:41:29 +00:00
|
|
|
self.assertEqual(user, hmac_load_user_from_request(request))
|
2014-03-02 13:41:38 +00:00
|
|
|
|
2015-12-01 13:44:08 +00:00
|
|
|
|
2014-03-02 13:41:38 +00:00
|
|
|
class TestCreateAndLoginUser(BaseTestCase):
|
|
|
|
def test_logins_valid_user(self):
|
2015-12-01 13:44:08 +00:00
|
|
|
user = self.factory.create_user(email='test@example.com')
|
2014-03-02 13:41:38 +00:00
|
|
|
|
2015-12-01 13:44:08 +00:00
|
|
|
with patch('redash.authentication.google_oauth.login_user') as login_user_mock:
|
|
|
|
create_and_login_user(self.factory.org, user.name, user.email)
|
2014-03-02 13:41:38 +00:00
|
|
|
login_user_mock.assert_called_once_with(user, remember=True)
|
|
|
|
|
|
|
|
def test_creates_vaild_new_user(self):
|
2014-09-21 07:11:03 +00:00
|
|
|
email = 'test@example.com'
|
|
|
|
name = 'Test User'
|
2014-03-02 13:41:38 +00:00
|
|
|
|
2015-12-01 13:44:08 +00:00
|
|
|
with patch('redash.authentication.google_oauth.login_user') as login_user_mock:
|
|
|
|
create_and_login_user(self.factory.org, name, email)
|
2014-03-03 09:53:49 +00:00
|
|
|
|
|
|
|
self.assertTrue(login_user_mock.called)
|
2016-11-28 14:41:29 +00:00
|
|
|
user = models.User.query.filter(models.User.email == email).one()
|
|
|
|
self.assertEqual(user.email, email)
|
2016-03-14 09:19:23 +00:00
|
|
|
|
|
|
|
class TestVerifyProfile(BaseTestCase):
|
|
|
|
def test_no_domain_allowed_for_org(self):
|
|
|
|
profile = dict(email='arik@example.com')
|
|
|
|
self.assertFalse(verify_profile(self.factory.org, profile))
|
|
|
|
|
|
|
|
def test_domain_not_in_org_domains_list(self):
|
|
|
|
profile = dict(email='arik@example.com')
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = ['example.org']
|
|
|
|
self.assertFalse(verify_profile(self.factory.org, profile))
|
|
|
|
|
|
|
|
def test_domain_in_org_domains_list(self):
|
|
|
|
profile = dict(email='arik@example.com')
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = ['example.com']
|
|
|
|
self.assertTrue(verify_profile(self.factory.org, profile))
|
|
|
|
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = ['example.org', 'example.com']
|
|
|
|
self.assertTrue(verify_profile(self.factory.org, profile))
|
|
|
|
|
|
|
|
def test_org_in_public_mode_accepts_any_domain(self):
|
|
|
|
profile = dict(email='arik@example.com')
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_IS_PUBLIC] = True
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = []
|
|
|
|
self.assertTrue(verify_profile(self.factory.org, profile))
|
|
|
|
|
|
|
|
def test_user_not_in_domain_but_account_exists(self):
|
|
|
|
profile = dict(email='arik@example.com')
|
|
|
|
self.factory.create_user(email='arik@example.com')
|
|
|
|
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = ['example.org']
|
|
|
|
self.assertTrue(verify_profile(self.factory.org, profile))
|