redash/tests/test_authentication.py

427 lines
16 KiB
Python
Raw Permalink Normal View History

import importlib
import os
import time
from flask import request
from mock import patch
from redash import models, settings
from redash.authentication import (
api_key_load_user_from_request,
get_login_url,
hmac_load_user_from_request,
sign,
)
from redash.authentication.google_oauth import create_and_login_user, verify_profile
from redash.utils import utcnow
from sqlalchemy.orm.exc import NoResultFound
from tests import BaseTestCase
class TestApiKeyAuthentication(BaseTestCase):
#
# This is a bad way to write these tests, but the way Flask works doesn't make it easy to write them properly...
#
def setUp(self):
super(TestApiKeyAuthentication, self).setUp()
self.api_key = "10"
self.query = self.factory.create_query(api_key=self.api_key)
2016-11-28 14:41:29 +00:00
models.db.session.flush()
self.query_url = "/{}/api/queries/{}".format(
self.factory.org.slug, self.query.id
)
self.queries_url = "/{}/api/queries".format(self.factory.org.slug)
def test_no_api_key(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.query_url)
self.assertIsNone(api_key_load_user_from_request(request))
def test_wrong_api_key(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.query_url, query_string={"api_key": "whatever"})
2016-11-30 12:18:00 +00:00
self.assertIsNone(api_key_load_user_from_request(request))
def test_correct_api_key(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.query_url, query_string={"api_key": self.api_key})
2016-11-30 12:18:00 +00:00
self.assertIsNotNone(api_key_load_user_from_request(request))
def test_no_query_id(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.queries_url, query_string={"api_key": self.api_key})
2016-11-30 12:18:00 +00:00
self.assertIsNone(api_key_load_user_from_request(request))
2014-03-02 13:41:38 +00:00
2015-07-08 17:59:07 +00:00
def test_user_api_key(self):
user = self.factory.create_user(api_key="user_key")
2016-11-28 14:41:29 +00:00
models.db.session.flush()
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.queries_url, query_string={"api_key": user.api_key})
2016-11-30 12:18:00 +00:00
self.assertEqual(user.id, api_key_load_user_from_request(request).id)
2015-07-08 17:59:07 +00:00
def test_disabled_user_api_key(self):
user = self.factory.create_user(api_key="user_key")
user.disable()
models.db.session.flush()
with self.app.test_client() as c:
rv = c.get(self.queries_url, query_string={"api_key": user.api_key})
self.assertEqual(None, api_key_load_user_from_request(request))
def test_api_key_header(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
self.query_url, headers={"Authorization": "Key {}".format(self.api_key)}
)
2016-11-30 12:18:00 +00:00
self.assertIsNotNone(api_key_load_user_from_request(request))
def test_api_key_header_with_wrong_key(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.query_url, headers={"Authorization": "Key oops"})
2016-11-30 12:18:00 +00:00
self.assertIsNone(api_key_load_user_from_request(request))
def test_api_key_for_wrong_org(self):
other_user = self.factory.create_admin(org=self.factory.create_org())
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
self.query_url,
headers={"Authorization": "Key {}".format(other_user.api_key)},
)
2016-11-30 12:18:00 +00:00
self.assertEqual(404, rv.status_code)
2015-07-08 17:59:07 +00:00
class TestHMACAuthentication(BaseTestCase):
#
# This is a bad way to write these tests, but the way Flask works doesn't make it easy to write them properly...
#
def setUp(self):
super(TestHMACAuthentication, self).setUp()
self.api_key = "10"
self.query = self.factory.create_query(api_key=self.api_key)
2016-11-28 14:41:29 +00:00
models.db.session.flush()
self.path = "/{}/api/queries/{}".format(self.query.org.slug, self.query.id)
2015-07-08 17:59:07 +00:00
self.expires = time.time() + 1800
def signature(self, expires):
return sign(self.query.api_key, self.path, expires)
def test_no_signature(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(self.path)
self.assertIsNone(hmac_load_user_from_request(request))
2015-07-08 17:59:07 +00:00
def test_wrong_signature(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
self.path,
query_string={"signature": "whatever", "expires": self.expires},
)
2016-11-30 12:18:00 +00:00
self.assertIsNone(hmac_load_user_from_request(request))
2015-07-08 17:59:07 +00:00
def test_correct_signature(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
self.path,
query_string={
"signature": self.signature(self.expires),
"expires": self.expires,
},
)
2016-11-30 12:18:00 +00:00
self.assertIsNotNone(hmac_load_user_from_request(request))
2015-07-08 17:59:07 +00:00
def test_no_query_id(self):
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
"/{}/api/queries".format(self.query.org.slug),
query_string={"api_key": self.api_key},
)
2016-11-30 12:18:00 +00:00
self.assertIsNone(hmac_load_user_from_request(request))
2015-07-08 17:59:07 +00:00
def test_user_api_key(self):
user = self.factory.create_user(api_key="user_key")
path = "/api/queries/"
2016-11-28 14:41:29 +00:00
models.db.session.flush()
signature = sign(user.api_key, path, self.expires)
2016-11-30 12:18:00 +00:00
with self.app.test_client() as c:
rv = c.get(
path,
query_string={
"signature": signature,
"expires": self.expires,
"user_id": user.id,
},
)
2016-11-30 12:18:00 +00:00
self.assertEqual(user.id, hmac_load_user_from_request(request).id)
2014-03-02 13:41:38 +00:00
class TestSessionAuthentication(BaseTestCase):
def test_prefers_api_key_over_session_user_id(self):
user = self.factory.create_user()
query = self.factory.create_query(user=user)
other_org = self.factory.create_org()
other_user = self.factory.create_user(org=other_org)
models.db.session.flush()
rv = self.make_request(
"get",
"/api/queries/{}?api_key={}".format(query.id, query.api_key),
user=other_user,
)
self.assertEqual(rv.status_code, 200)
2014-03-02 13:41:38 +00:00
class TestCreateAndLoginUser(BaseTestCase):
def test_logins_valid_user(self):
user = self.factory.create_user(email="test@example.com")
2014-03-02 13:41:38 +00:00
with patch("redash.authentication.login_user") as login_user_mock:
create_and_login_user(self.factory.org, user.name, user.email)
2014-03-02 13:41:38 +00:00
login_user_mock.assert_called_once_with(user, remember=True)
def test_creates_vaild_new_user(self):
email = "test@example.com"
name = "Test User"
2014-03-02 13:41:38 +00:00
with patch("redash.authentication.login_user") as login_user_mock:
create_and_login_user(self.factory.org, name, email)
self.assertTrue(login_user_mock.called)
2016-11-28 14:41:29 +00:00
user = models.User.query.filter(models.User.email == email).one()
self.assertEqual(user.email, email)
def test_updates_user_name(self):
user = self.factory.create_user(email="test@example.com")
with patch("redash.authentication.login_user") as login_user_mock:
create_and_login_user(self.factory.org, "New Name", user.email)
login_user_mock.assert_called_once_with(user, remember=True)
2016-11-30 12:18:00 +00:00
class TestVerifyProfile(BaseTestCase):
def test_no_domain_allowed_for_org(self):
profile = dict(email="arik@example.com")
self.assertFalse(verify_profile(self.factory.org, profile))
def test_domain_not_in_org_domains_list(self):
profile = dict(email="arik@example.com")
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
"example.org"
]
self.assertFalse(verify_profile(self.factory.org, profile))
def test_domain_in_org_domains_list(self):
profile = dict(email="arik@example.com")
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
"example.com"
]
self.assertTrue(verify_profile(self.factory.org, profile))
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
"example.org",
"example.com",
]
self.assertTrue(verify_profile(self.factory.org, profile))
def test_org_in_public_mode_accepts_any_domain(self):
profile = dict(email="arik@example.com")
self.factory.org.settings[models.Organization.SETTING_IS_PUBLIC] = True
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = []
self.assertTrue(verify_profile(self.factory.org, profile))
def test_user_not_in_domain_but_account_exists(self):
profile = dict(email="arik@example.com")
self.factory.create_user(email="arik@example.com")
self.factory.org.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
"example.org"
]
self.assertTrue(verify_profile(self.factory.org, profile))
class TestGetLoginUrl(BaseTestCase):
def test_when_multi_org_enabled_and_org_exists(self):
with self.app.test_request_context("/{}/".format(self.factory.org.slug)):
self.assertEqual(
get_login_url(next=None), "/{}/login".format(self.factory.org.slug)
)
def test_when_multi_org_enabled_and_org_doesnt_exist(self):
with self.app.test_request_context(
"/{}_notexists/".format(self.factory.org.slug)
):
self.assertEqual(get_login_url(next=None), "/")
class TestRedirectToUrlAfterLoggingIn(BaseTestCase):
def setUp(self):
super(TestRedirectToUrlAfterLoggingIn, self).setUp()
self.user = self.factory.user
self.password = "test1234"
def test_no_next_param(self):
response = self.post_request(
"/login",
data={"email": self.user.email, "password": self.password},
org=self.factory.org,
)
self.assertEqual(
response.location, "http://localhost/{}/".format(self.user.org.slug)
)
def test_simple_path_in_next_param(self):
response = self.post_request(
"/login?next=queries",
data={"email": self.user.email, "password": self.password},
org=self.factory.org,
)
self.assertEqual(response.location, "http://localhost/default/queries")
def test_starts_scheme_url_in_next_param(self):
response = self.post_request(
"/login?next=https://redash.io",
data={"email": self.user.email, "password": self.password},
org=self.factory.org,
)
self.assertEqual(response.location, "http://localhost/default/")
def test_without_scheme_url_in_next_param(self):
response = self.post_request(
"/login?next=//redash.io",
data={"email": self.user.email, "password": self.password},
org=self.factory.org,
)
self.assertEqual(response.location, "http://localhost/default/")
def test_without_scheme_with_path_url_in_next_param(self):
response = self.post_request(
"/login?next=//localhost/queries",
data={"email": self.user.email, "password": self.password},
org=self.factory.org,
)
self.assertEqual(response.location, "http://localhost/queries")
class TestRemoteUserAuth(BaseTestCase):
DEFAULT_SETTING_OVERRIDES = {"REDASH_REMOTE_USER_LOGIN_ENABLED": "true"}
def setUp(self):
# Apply default setting overrides to every test
2018-02-02 14:50:43 +00:00
self.override_settings(None)
super(TestRemoteUserAuth, self).setUp()
2018-02-02 14:50:43 +00:00
def override_settings(self, overrides):
"""Override settings for testing purposes.
This helper method can be used to override specific environmental
variables to enable / disable Re:Dash features for the duration
of the test.
Note that these overrides only affect code that checks the value of
the setting at runtime. It doesn't affect code that only checks the
value during program initialization.
:param dict overrides: a dict of environmental variables to override
when the settings are reloaded
"""
variables = self.DEFAULT_SETTING_OVERRIDES.copy()
variables.update(overrides or {})
with patch.dict(os.environ, variables):
importlib.reload(settings)
# Queue a cleanup routine that reloads the settings without overrides
# once the test ends
self.addCleanup(lambda: importlib.reload(settings))
def assert_correct_user_attributes(
self,
user,
email="test@example.com",
name="test@example.com",
groups=None,
org=None,
):
"""Helper to assert that the user attributes are correct."""
groups = groups or []
if self.factory.org.default_group.id not in groups:
groups.append(self.factory.org.default_group.id)
self.assertIsNotNone(user)
self.assertEqual(user.email, email)
self.assertEqual(user.name, name)
self.assertEqual(user.org, org or self.factory.org)
Migrate the application to Python 3 (#4251) * Make core app compatible with Python 3 No backward compatibility with Python 2.7 is kept. This commit mostly contains changes made with 2to3 and manual tweaking when necessary. * Use Python 3.7 as base docker image Since it is not possible to change redash/base:debian to Python 3 without breaking future relases, its Dockerfile is temporarly copied here. * Upgrade some requirements to newest versions Some of the older versions were not compatible with Python 3. * Migrate tests to Python 3 * Build frontend on Python 3 * Make the HMAC sign function compatible with Python 3 In Python 3, HMAC only works with bytes so the strings and the float used in the sign function need to be encoded. Hopefully this is still backward compatible with already generated signatures. * Use assertCountEqual instead of assertItemsEqual The latter is not available in Python 3. See https://bugs.python.org/issue17866 * Remove redundant encoding header for Python 3 modules * Remove redundant string encoding in CLI * Rename list() functions in CLI These functions shadow the builtin list function which is problematic since 2to3 adds a fair amount of calls to the builtin list when it finds dict.keys() and dict.values(). Only the Python function is renamed, from the perspective of the CLI nothing changes. * Replace usage of Exception.message in CLI `message` is not available anymore, instead use the string representation of the exception. * Adapt test handlers to Python 3 * Fix test that relied on dict ordering * Make sure test results are always uploaded (#4215) * Support encoding memoryview to JSON psycopg2 returns `buffer` objects in Python 2.7 and `memoryview` in Python 3. See #3156 * Fix test relying on object address ordering * Decode bytes returned from Redis * Stop using e.message for most exceptions Exception.message is not available in Python 3 anymore, except for some exceptions defined by third-party libraries. * Fix writing XLSX files in Python 3 The buffer for the file should be made of bytes and the actual content written to it strings. Note: I do not know why the diff is so large as it's only a two lines change. Probably a white space or file encoding issue. * Fix test by comparing strings to strings * Fix another exception message unavailable in Python 3 * Fix export to CSV in Python 3 The UnicodeWriter is not used anymore. In Python 3, the interface provided by the CSV module only deals with strings, in and out. The encoding of the output is left to the user, in our case it is given to Flask via `make_response`. * (Python 3) Use Redis' decode_responses=True option (#4232) * Fix test_outdated_queries_works_scheduled_queries_tracker (use utcnow) * Make sure Redis connection uses decoded_responses option * Remove unused imports. * Use Redis' decode_responses option * Remove cases of explicit Redis decoding * Rename helper function and make sure it doesn't apply twice. * Don't add decode_responses to Celery Redis connection URL * Fix displaying error while connecting to SQLite The exception message is always a string in Python 3, so no need to try to decode things. * Fix another missing exception message * Handle JSON encoding for datasources returning bytes SimpleJSON assumes the bytes it receives contain text data, so it tries to UTF-8 encode them. It is sometimes not true, for instance the SQLite datasource returns bytes for BLOB types, which typically do not contain text but truly binary data. This commit disables SimpleJSON auto encoding of bytes to str and instead uses the same method as for memoryviews: generating a hex representation of the data. * Fix Python 3 compatibility with RQ * Revert some changes 2to3 tends to do (#4261) - Revert some changes 2to3 tends to do when it errs on the side of caution regarding dict view objects. - Also fixed some naming issues with one character variables in list comprehensions. - Fix Flask warning. * Upgrade dependencies * Remove useless `iter` added by 2to3 * Fix get_next_path tests (#4280) * Removed setting SERVER_NAME in tests setup to avoid a warning. * Change get_next_path to not return empty string in case of a domain only value. * Fix redirect tests: Since version 0.15 of Werkzeug it uses full path for fixing the location header instead of the root path. * Remove explicit dependency for Werkzeug * Switched pytz and certifi to unbinded versions. * Switch to new library for getting country from IP `python-geoip-geolite2` is not compatible with Python 3, instead use `maxminddb-geolite2` which is very similar as it includes the geolite2 database in the package . * Python 3 RQ modifications (#4281) * show current worker job (alongside with minor cosmetic column tweaks) * avoid loading entire job data for queued jobs * track general RQ queues (default, periodic and schemas) * get all active RQ queues * call get_celery_queues in another place * merge dicts the Python 3 way * extend the result_ttl of refresh_queries to 600 seconds to allow it to continue running periodically even after longer executions * Remove legacy Python flake8 tests
2019-10-24 09:42:13 +00:00
self.assertCountEqual(user.group_ids, groups)
def get_test_user(self, email="test@example.com", org=None):
"""Helper to fetch an user from the database."""
# Expire all cached objects to ensure these values are read directly
# from the database.
models.db.session.expire_all()
return models.User.get_by_email_and_org(email, org or self.factory.org)
def test_remote_login_disabled(self):
self.override_settings({"REDASH_REMOTE_USER_LOGIN_ENABLED": "false"})
self.get_request(
"/remote_user/login",
org=self.factory.org,
headers={"X-Forwarded-Remote-User": "test@example.com"},
)
with self.assertRaises(NoResultFound):
2018-02-02 14:50:43 +00:00
self.get_test_user()
def test_remote_login_default_header(self):
self.get_request(
"/remote_user/login",
org=self.factory.org,
headers={"X-Forwarded-Remote-User": "test@example.com"},
)
2018-02-02 14:50:43 +00:00
self.assert_correct_user_attributes(self.get_test_user())
def test_remote_login_custom_header(self):
self.override_settings({"REDASH_REMOTE_USER_HEADER": "X-Custom-User"})
self.get_request(
"/remote_user/login",
org=self.factory.org,
headers={"X-Custom-User": "test@example.com"},
)
2018-02-02 14:50:43 +00:00
self.assert_correct_user_attributes(self.get_test_user())
class TestUserForgotPassword(BaseTestCase):
def test_user_should_receive_password_reset_link(self):
user = self.factory.create_user()
with patch(
"redash.handlers.authentication.send_password_reset_email"
) as send_password_reset_email_mock:
response = self.post_request(
"/forgot", org=user.org, data={"email": user.email}
)
self.assertEqual(response.status_code, 200)
send_password_reset_email_mock.assert_called_with(user)
def test_disabled_user_should_not_receive_password_reset_link(self):
user = self.factory.create_user()
user.disable()
self.db.session.add(user)
self.db.session.commit()
with patch(
"redash.handlers.authentication.send_password_reset_email"
) as send_password_reset_email_mock, patch(
"redash.handlers.authentication.send_user_disabled_email"
) as send_user_disabled_email_mock:
response = self.post_request(
"/forgot", org=user.org, data={"email": user.email}
)
self.assertEqual(response.status_code, 200)
send_password_reset_email_mock.assert_not_called()
send_user_disabled_email_mock.assert_called_with(user)