redash/tests/handlers/test_users.py
Nicolas Le Manchet 246eca1121 Migrate the application to Python 3 (#4251)
* Make core app compatible with Python 3

No backward compatibility with Python 2.7 is kept.
This commit mostly contains changes made with 2to3 and manual
tweaking when necessary.

* Use Python 3.7 as base docker image

Since it is not possible to change redash/base:debian to Python 3
without breaking future relases, its Dockerfile is temporarly
copied here.

* Upgrade some requirements to newest versions

Some of the older versions were not compatible with Python 3.

* Migrate tests to Python 3

* Build frontend on Python 3

* Make the HMAC sign function compatible with Python 3

In Python 3, HMAC only works with bytes so the strings and the
float used in the sign function need to be encoded.
Hopefully this is still backward compatible with already generated
signatures.

* Use assertCountEqual instead of assertItemsEqual

The latter is not available in Python 3.
See https://bugs.python.org/issue17866

* Remove redundant encoding header for Python 3 modules

* Remove redundant string encoding in CLI

* Rename list() functions in CLI

These functions shadow the builtin list function which is
problematic since 2to3 adds a fair amount of calls to the builtin
list when it finds dict.keys() and dict.values().

Only the Python function is renamed, from the perspective of the
CLI nothing changes.

* Replace usage of Exception.message in CLI

`message` is not available anymore, instead use the string
representation of the exception.

* Adapt test handlers to Python 3

* Fix test that relied on dict ordering

* Make sure test results are always uploaded (#4215)

* Support encoding memoryview to JSON

psycopg2 returns `buffer` objects in Python 2.7 and `memoryview`
in Python 3. See #3156

* Fix test relying on object address ordering

* Decode bytes returned from Redis

* Stop using e.message for most exceptions

Exception.message is not available in Python 3 anymore, except
for some exceptions defined by third-party libraries.

* Fix writing XLSX files in Python 3

The buffer for the file should be made of bytes and the actual
content written to it strings.

Note: I do not know why the diff is so large as it's only a two
lines change. Probably a white space or file encoding issue.

* Fix test by comparing strings to strings

* Fix another exception message unavailable in Python 3

* Fix export to CSV in Python 3

The UnicodeWriter is not used anymore. In Python 3, the interface
provided by the CSV module only deals with strings, in and out.
The encoding of the output is left to the user, in our case
it is given to Flask via `make_response`.

* (Python 3) Use Redis' decode_responses=True option (#4232)

* Fix test_outdated_queries_works_scheduled_queries_tracker (use utcnow)

* Make sure Redis connection uses decoded_responses option

* Remove unused imports.

* Use Redis' decode_responses option

* Remove cases of explicit Redis decoding

* Rename helper function and make sure it doesn't apply twice.

* Don't add decode_responses to Celery Redis connection URL

* Fix displaying error while connecting to SQLite

The exception message is always a string in Python 3, so no
need to try to decode things.

* Fix another missing exception message

* Handle JSON encoding for datasources returning bytes

SimpleJSON assumes the bytes it receives contain text data, so it
tries to UTF-8 encode them. It is sometimes not true, for instance
the SQLite datasource returns bytes for BLOB types, which typically
do not contain text but truly binary data.

This commit disables SimpleJSON auto encoding of bytes to str and
instead uses the same method as for memoryviews: generating a
hex representation of the data.

* Fix Python 3 compatibility with RQ

* Revert some changes 2to3 tends to do (#4261)

- Revert some changes 2to3 tends to do when it errs on the side of caution regarding dict view objects.

- Also fixed some naming issues with one character variables in list comprehensions.

- Fix Flask warning.

* Upgrade dependencies

* Remove useless `iter` added by 2to3

* Fix get_next_path tests (#4280)

* Removed setting SERVER_NAME in tests setup to avoid a warning.

* Change get_next_path to not return empty string in case of a domain only value.

* Fix redirect tests:

Since version 0.15 of Werkzeug it uses full path for fixing the location header instead of the root path.

* Remove explicit dependency for Werkzeug

* Switched pytz and certifi to unbinded versions.

* Switch to new library for getting country from IP

`python-geoip-geolite2` is not compatible with Python 3, instead
use `maxminddb-geolite2` which is very similar as it includes
the geolite2 database in the package .

* Python 3 RQ modifications (#4281)

* show current worker job (alongside with minor cosmetic column tweaks)

* avoid loading entire job data for queued jobs

* track general RQ queues (default, periodic and schemas)

* get all active RQ queues

* call get_celery_queues in another place

* merge dicts the Python 3 way

* extend the result_ttl of refresh_queries to 600 seconds to allow it to continue running periodically even after longer executions

* Remove legacy Python flake8 tests
2019-10-24 12:42:13 +03:00

496 lines
21 KiB
Python

from redash import models, settings
from tests import BaseTestCase
from mock import patch
class TestUserListResourcePost(BaseTestCase):
def test_returns_403_for_non_admin(self):
rv = self.make_request('post', "/api/users")
self.assertEqual(rv.status_code, 403)
def test_returns_400_when_missing_fields(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/users", user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', '/api/users', data={'name': 'User'}, user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', '/api/users', data={'name': 'User', 'email': 'bademailaddress'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_using_temporary_email(self):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': 'user@mailinator.com', 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 400)
test_user['email'] = 'arik@qq.com'
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 400)
def test_creates_user(self):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': 'user@example.com', 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.json['name'], test_user['name'])
self.assertEqual(rv.json['email'], test_user['email'])
@patch('redash.settings.email_server_is_configured', return_value=False)
def test_shows_invite_link_when_email_is_not_configured(self, _):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': 'user@example.com'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 200)
self.assertTrue('invite_link' in rv.json)
@patch('redash.settings.email_server_is_configured', return_value=True)
def test_does_not_show_invite_link_when_email_is_configured(self, _):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': 'user@example.com'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 200)
self.assertFalse('invite_link' in rv.json)
def test_creates_user_case_insensitive_email(self):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': 'User@Example.com', 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.json['name'], test_user['name'])
self.assertEqual(rv.json['email'], 'user@example.com')
def test_returns_400_when_email_taken(self):
admin = self.factory.create_admin()
test_user = {'name': 'User', 'email': admin.email, 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_email_taken_case_insensitive(self):
admin = self.factory.create_admin()
test_user1 = {'name': 'User', 'email': 'user@example.com', 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user1, user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.json['email'], 'user@example.com')
test_user2 = {'name': 'User', 'email': 'user@Example.com', 'password': 'test'}
rv = self.make_request('post', '/api/users', data=test_user2, user=admin)
self.assertEqual(rv.status_code, 400)
class TestUserListGet(BaseTestCase):
def create_filters_fixtures(self):
class PlainObject(object):
pass
result = PlainObject()
now = models.db.func.now()
result.enabled_active1 = self.factory.create_user(disabled_at=None, is_invitation_pending=None).id
result.enabled_active2 = self.factory.create_user(disabled_at=None, is_invitation_pending=False).id
result.enabled_pending = self.factory.create_user(disabled_at=None, is_invitation_pending=True).id
result.disabled_active1 = self.factory.create_user(disabled_at=now, is_invitation_pending=None).id
result.disabled_active2 = self.factory.create_user(disabled_at=now, is_invitation_pending=False).id
result.disabled_pending = self.factory.create_user(disabled_at=now, is_invitation_pending=True).id
return result
def make_request_and_return_ids(self, *args, **kwargs):
rv = self.make_request(*args, **kwargs)
return [user['id'] for user in rv.json['results']]
def assertUsersListMatches(self, actual_ids, expected_ids, unexpected_ids):
actual_ids = set(actual_ids)
expected_ids = set(expected_ids)
unexpected_ids = set(unexpected_ids)
self.assertSetEqual(actual_ids.intersection(expected_ids), expected_ids)
self.assertSetEqual(actual_ids.intersection(unexpected_ids), set())
def test_returns_users_for_given_org_only(self):
user1 = self.factory.user
user2 = self.factory.create_user()
org = self.factory.create_org()
user3 = self.factory.create_user(org=org)
user_ids = self.make_request_and_return_ids('get', '/api/users')
self.assertUsersListMatches(user_ids, [user1.id, user2.id], [user3.id])
def test_gets_all_enabled(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users')
self.assertUsersListMatches(
user_ids,
[users.enabled_active1, users.enabled_active2, users.enabled_pending],
[users.disabled_active1, users.disabled_active2, users.disabled_pending]
)
def test_gets_all_disabled(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users?disabled=true')
self.assertUsersListMatches(
user_ids,
[users.disabled_active1, users.disabled_active2, users.disabled_pending],
[users.enabled_active1, users.enabled_active2, users.enabled_pending]
)
def test_gets_all_enabled_and_active(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users?pending=false')
self.assertUsersListMatches(
user_ids,
[users.enabled_active1, users.enabled_active2],
[users.enabled_pending, users.disabled_active1, users.disabled_active2, users.disabled_pending]
)
def test_gets_all_enabled_and_pending(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users?pending=true')
self.assertUsersListMatches(
user_ids,
[users.enabled_pending],
[users.enabled_active1, users.enabled_active2, users.disabled_active1, users.disabled_active2, users.disabled_pending]
)
def test_gets_all_disabled_and_active(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users?disabled=true&pending=false')
self.assertUsersListMatches(
user_ids,
[users.disabled_active1, users.disabled_active2],
[users.disabled_pending, users.enabled_active1, users.enabled_active2, users.enabled_pending]
)
def test_gets_all_disabled_and_pending(self):
users = self.create_filters_fixtures()
user_ids = self.make_request_and_return_ids('get', '/api/users?disabled=true&pending=true')
self.assertUsersListMatches(
user_ids,
[users.disabled_pending],
[users.disabled_active1, users.disabled_active2, users.enabled_active1, users.enabled_active2, users.enabled_pending]
)
class TestUserResourceGet(BaseTestCase):
def test_returns_api_key_for_your_own_user(self):
rv = self.make_request('get', "/api/users/{}".format(self.factory.user.id))
self.assertIn('api_key', rv.json)
def test_returns_api_key_for_other_user_when_admin(self):
other_user = self.factory.user
admin = self.factory.create_admin()
rv = self.make_request('get', "/api/users/{}".format(other_user.id), user=admin)
self.assertIn('api_key', rv.json)
def test_doesnt_return_api_key_for_other_user(self):
other_user = self.factory.create_user()
rv = self.make_request('get', "/api/users/{}".format(other_user.id))
self.assertNotIn('api_key', rv.json)
def test_doesnt_return_user_from_different_org(self):
org = self.factory.create_org()
other_user = self.factory.create_user(org=org)
rv = self.make_request('get', "/api/users/{}".format(other_user.id))
self.assertEqual(rv.status_code, 404)
class TestUserResourcePost(BaseTestCase):
def test_returns_403_for_non_admin_changing_not_his_own(self):
other_user = self.factory.create_user()
rv = self.make_request('post', "/api/users/{}".format(other_user.id), data={"name": "New Name"})
self.assertEqual(rv.status_code, 403)
def test_returns_200_for_non_admin_changing_his_own(self):
rv = self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"name": "New Name"})
self.assertEqual(rv.status_code, 200)
@patch('redash.settings.email_server_is_configured', return_value=True)
def test_marks_email_as_not_verified_when_changed(self, _):
user = self.factory.user
user.is_email_verified = True
rv = self.make_request('post', "/api/users/{}".format(user.id), data={"email": "donald@trump.biz"})
self.assertFalse(user.is_email_verified)
@patch('redash.settings.email_server_is_configured', return_value=False)
def test_doesnt_mark_email_as_not_verified_when_changed_and_email_server_is_not_configured(self, _):
user = self.factory.user
user.is_email_verified = True
rv = self.make_request('post', "/api/users/{}".format(user.id), data={"email": "donald@trump.biz"})
self.assertTrue(user.is_email_verified)
def test_returns_200_for_admin_changing_other_user(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"name": "New Name"}, user=admin)
self.assertEqual(rv.status_code, 200)
def test_fails_password_change_without_old_password(self):
rv = self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"password": "new password"})
self.assertEqual(rv.status_code, 403)
def test_fails_password_change_with_incorrect_old_password(self):
rv = self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"password": "new password", "old_password": "wrong"})
self.assertEqual(rv.status_code, 403)
def test_changes_password(self):
new_password = "new password"
old_password = "old password"
self.factory.user.hash_password(old_password)
models.db.session.add(self.factory.user)
rv = self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"password": new_password, "old_password": old_password})
self.assertEqual(rv.status_code, 200)
user = models.User.query.get(self.factory.user.id)
self.assertTrue(user.verify_password(new_password))
def test_returns_400_when_using_temporary_email(self):
admin = self.factory.create_admin()
test_user = {'email': 'user@mailinator.com'}
rv = self.make_request('post', '/api/users/{}'.format(self.factory.user.id), data=test_user, user=admin)
self.assertEqual(rv.status_code, 400)
test_user['email'] = 'arik@qq.com'
rv = self.make_request('post', '/api/users', data=test_user, user=admin)
self.assertEqual(rv.status_code, 400)
def test_changing_email_ends_any_other_sessions_of_current_user(self):
with self.client as c:
# visit profile page
self.make_request('get', "/api/users/{}".format(self.factory.user.id))
with c.session_transaction() as sess:
previous = sess['user_id']
# change e-mail address - this will result in a new `user_id` value inside the session
self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"email": "john@doe.com"})
# force the old `user_id`, simulating that the user is logged in from another browser
with c.session_transaction() as sess:
sess['user_id'] = previous
rv = self.get_request("/api/users/{}".format(self.factory.user.id))
self.assertEqual(rv.status_code, 404)
def test_changing_email_does_not_end_current_session(self):
self.make_request('get', "/api/users/{}".format(self.factory.user.id))
with self.client as c:
with c.session_transaction() as sess:
previous = sess['user_id']
self.make_request('post', "/api/users/{}".format(self.factory.user.id), data={"email": "john@doe.com"})
with self.client as c:
with c.session_transaction() as sess:
current = sess['user_id']
# make sure the session's `user_id` has changed to reflect the new identity, thus not logging the user out
self.assertNotEqual(previous, current)
def test_admin_can_change_user_groups(self):
admin_user = self.factory.create_admin()
other_user = self.factory.create_user(group_ids=[1])
rv = self.make_request('post', "/api/users/{}".format(other_user.id), data={"group_ids": [1, 2]}, user=admin_user)
self.assertEqual(rv.status_code, 200)
self.assertEqual(models.User.query.get(other_user.id).group_ids, [1,2])
def test_admin_can_delete_user(self):
admin_user = self.factory.create_admin()
other_user = self.factory.create_user(is_invitation_pending=True)
rv = self.make_request('delete', "/api/users/{}".format(other_user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
self.assertEqual(models.User.query.get(other_user.id), None)
class TestUserDisable(BaseTestCase):
def test_non_admin_cannot_disable_user(self):
other_user = self.factory.create_user()
self.assertFalse(other_user.is_disabled)
rv = self.make_request('post', "/api/users/{}/disable".format(other_user.id), user=other_user)
self.assertEqual(rv.status_code, 403)
# user should stay enabled
other_user = models.User.query.get(other_user.id)
self.assertFalse(other_user.is_disabled)
def test_admin_can_disable_user(self):
admin_user = self.factory.create_admin()
other_user = self.factory.create_user()
self.assertFalse(other_user.is_disabled)
rv = self.make_request('post', "/api/users/{}/disable".format(other_user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
# user should become disabled
other_user = models.User.query.get(other_user.id)
self.assertTrue(other_user.is_disabled)
def test_admin_can_disable_another_admin(self):
admin_user1 = self.factory.create_admin()
admin_user2 = self.factory.create_admin()
self.assertFalse(admin_user2.is_disabled)
rv = self.make_request('post', "/api/users/{}/disable".format(admin_user2.id), user=admin_user1)
self.assertEqual(rv.status_code, 200)
# user should become disabled
admin_user2 = models.User.query.get(admin_user2.id)
self.assertTrue(admin_user2.is_disabled)
def test_admin_cannot_disable_self(self):
admin_user = self.factory.create_admin()
self.assertFalse(admin_user.is_disabled)
rv = self.make_request('post', "/api/users/{}/disable".format(admin_user.id), user=admin_user)
self.assertEqual(rv.status_code, 403)
# user should stay enabled
admin_user = models.User.query.get(admin_user.id)
self.assertFalse(admin_user.is_disabled)
def test_admin_can_enable_user(self):
admin_user = self.factory.create_admin()
other_user = self.factory.create_user(disabled_at='2018-03-08 00:00')
self.assertTrue(other_user.is_disabled)
rv = self.make_request('delete', "/api/users/{}/disable".format(other_user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
# user should become enabled
other_user = models.User.query.get(other_user.id)
self.assertFalse(other_user.is_disabled)
def test_admin_can_enable_another_admin(self):
admin_user1 = self.factory.create_admin()
admin_user2 = self.factory.create_admin(disabled_at='2018-03-08 00:00')
self.assertTrue(admin_user2.is_disabled)
rv = self.make_request('delete', "/api/users/{}/disable".format(admin_user2.id), user=admin_user1)
self.assertEqual(rv.status_code, 200)
# user should become enabled
admin_user2 = models.User.query.get(admin_user2.id)
self.assertFalse(admin_user2.is_disabled)
def test_disabled_user_cannot_login(self):
user = self.factory.create_user(disabled_at='2018-03-08 00:00')
user.hash_password('password')
self.db.session.add(user)
self.db.session.commit()
with patch('redash.handlers.authentication.login_user') as login_user_mock:
rv = self.post_request('/login', data={'email': user.email, 'password': 'password'}, org=self.factory.org)
# login handler should not be called
login_user_mock.assert_not_called()
# check if error is raised
self.assertEqual(rv.status_code, 200)
self.assertIn('Wrong email or password', rv.data.decode())
def test_disabled_user_should_not_access_api(self):
# Note: some API does not require user, so check the one which requires
# 1. create user; the user should have access to API
user = self.factory.create_user()
rv = self.make_request('get', '/api/dashboards', user=user)
self.assertEqual(rv.status_code, 200)
# 2. disable user; now API access should be forbidden
user.disable()
self.db.session.add(user)
self.db.session.commit()
rv = self.make_request('get', '/api/dashboards', user=user)
self.assertNotEqual(rv.status_code, 200)
def test_disabled_user_should_not_receive_restore_password_email(self):
admin_user = self.factory.create_admin()
# user should receive email
user = self.factory.create_user()
with patch('redash.handlers.users.send_password_reset_email') as send_password_reset_email_mock:
send_password_reset_email_mock.return_value = 'reset_token'
rv = self.make_request('post', '/api/users/{}/reset_password'.format(user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
send_password_reset_email_mock.assert_called_with(user)
# disable user; now should not receive email
user.disable()
self.db.session.add(user)
self.db.session.commit()
with patch('redash.handlers.users.send_password_reset_email') as send_password_reset_email_mock:
send_password_reset_email_mock.return_value = 'reset_token'
rv = self.make_request('post', '/api/users/{}/reset_password'.format(user.id), user=admin_user)
self.assertEqual(rv.status_code, 404)
send_password_reset_email_mock.assert_not_called()
class TestUserRegenerateApiKey(BaseTestCase):
def test_non_admin_cannot_regenerate_other_user_api_key(self):
admin_user = self.factory.create_admin()
other_user = self.factory.create_user()
orig_api_key = other_user.api_key
rv = self.make_request('post', "/api/users/{}/regenerate_api_key".format(other_user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
other_user = models.User.query.get(other_user.id)
self.assertNotEqual(orig_api_key, other_user.api_key)
def test_admin_can_regenerate_other_user_api_key(self):
user1 = self.factory.create_user()
user2 = self.factory.create_user()
orig_user2_api_key = user2.api_key
rv = self.make_request('post', "/api/users/{}/regenerate_api_key".format(user2.id), user=user1)
self.assertEqual(rv.status_code, 403)
user = models.User.query.get(user2.id)
self.assertEqual(orig_user2_api_key, user.api_key)
def test_admin_can_regenerate_api_key_myself(self):
admin_user = self.factory.create_admin()
orig_api_key = admin_user.api_key
rv = self.make_request('post', "/api/users/{}/regenerate_api_key".format(admin_user.id), user=admin_user)
self.assertEqual(rv.status_code, 200)
user = models.User.query.get(admin_user.id)
self.assertNotEqual(orig_api_key, user.api_key)
def test_user_can_regenerate_api_key_myself(self):
user = self.factory.create_user()
orig_api_key = user.api_key
rv = self.make_request('post', "/api/users/{}/regenerate_api_key".format(user.id), user=user)
self.assertEqual(rv.status_code, 200)
user = models.User.query.get(user.id)
self.assertNotEqual(orig_api_key, user.api_key)