Bring back the ability to set allowed external users & publicly open re:dash.

This commit is contained in:
Arik Fraimovich 2014-02-13 20:04:28 +02:00
parent ea65204eaa
commit 27031c96b5
3 changed files with 82 additions and 27 deletions

View File

@ -1,11 +1,10 @@
import functools
import hashlib
import hmac
from flask import request, make_response
from flask.ext.googleauth import GoogleFederated
from flask import current_app, request, make_response, g, redirect, url_for
from flask.ext.googleauth import GoogleAuth
import time
from werkzeug.contrib.fixers import ProxyFix
import werkzeug.wrappers
from redash import models, settings
@ -23,36 +22,53 @@ class HMACAuthentication(object):
def __init__(self, auth):
self.auth = auth
def required(self, fn):
wrapped_fn = self.auth.required(fn)
@staticmethod
def api_key_authentication():
signature = request.args.get('signature')
expires = float(request.args.get('expires') or 0)
query_id = request.view_args.get('query_id', None)
# TODO: 3600 should be a setting
if signature and query_id and time.time() < expires <= time.time() + 3600:
query = models.Query.get(models.Query.id == query_id)
calculated_signature = sign(query.api_key, request.path, expires)
if query.api_key and signature == calculated_signature:
return True
return False
@staticmethod
def is_user_logged_in():
return g.user is not None
@staticmethod
def valid_user():
email = g.user['email']
if not settings.GOOGLE_APPS_DOMAIN:
return True
return email in settings.ALLOWED_EXTERNAL_USERS or email.endswith("@%s" % settings.GOOGLE_APPS_DOMAIN)
def required(self, fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):
signature = request.args.get('signature')
expires = float(request.args.get('expires') or 0)
query_id = request.view_args.get('query_id', None)
if self.is_user_logged_in() and self.valid_user():
return fn(*args, **kwargs)
# TODO: 3600 should be a setting
if signature and query_id and time.time() < expires <= time.time() + 3600:
query = models.Query.get(models.Query.id == query_id)
calculated_signature = sign(query.api_key, request.path, expires)
if self.api_key_authentication():
return fn(*args, **kwargs)
if query.api_key and signature == calculated_signature:
return fn(*args, **kwargs)
# Work around for flask-restful testing only for flask.wrappers.Resource instead of
# werkzeug.wrappers.Response
resp = wrapped_fn(*args, **kwargs)
if isinstance(resp, werkzeug.wrappers.Response):
resp = make_response(resp)
return resp
blueprint = current_app.extensions['googleauth'].blueprint
# The make_response call is a work around for flask-restful testing only for
# flask.wrappers.Resource instead of werkzeug.wrappers.Response
return make_response(redirect(url_for("%s.login" % blueprint.name, next=request.url)))
return decorated
def setup_authentication(app):
openid_auth = GoogleFederated(settings.GOOGLE_APPS_DOMAIN, app)
openid_auth = GoogleAuth(app)
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = settings.COOKIE_SECRET

View File

@ -42,6 +42,7 @@ DATABASE_CONFIG = parse_db_url(os.environ.get("REDASH_DATABASE_URL", "postgresql
GOOGLE_APPS_DOMAIN = os.environ.get("REDASH_GOOGLE_APPS_DOMAIN", "")
# Email addresses of admin users (comma separated)
ADMINS = os.environ.get("REDASH_ADMINS", '').split(',')
ALLOWED_EXTERNAL_USERS = os.environ.get("REDASH_ALLOWED_EXTERNAL_USERS", '').split(',')
STATIC_ASSETS_PATH = fix_assets_path(os.environ.get("REDASH_STATIC_ASSETS_PATH", "../rd_ui/dist/"))
WORKERS_COUNT = int(os.environ.get("REDASH_WORKERS_COUNT", "2"))
COOKIE_SECRET = os.environ.get("REDASH_COOKIE_SECRET", "c292a0a3aa32397cdb050e233733900f")

View File

@ -1,14 +1,17 @@
from contextlib import contextmanager
import json
import time
from unittest import TestCase
from tests import BaseTestCase
from tests.factories import dashboard_factory, widget_factory, visualization_factory, query_factory, \
query_result_factory
from redash import app, models
from redash import app, models, settings
from redash.utils import json_dumps
from redash.authentication import sign
settings.GOOGLE_APPS_DOMAIN = "example.com"
@contextmanager
def authenticated_user(c, user='test@example.com', name='John Test'):
with c.session_transaction() as sess:
@ -45,7 +48,42 @@ class AuthenticationTestMixin():
self.assertEquals(200, rv.status_code)
class PingTest(BaseTestCase):
class TestAuthentication(TestCase):
def test_redirects_for_nonsigned_in_user(self):
with app.test_client() as c:
rv = c.get("/")
self.assertEquals(302, rv.status_code)
def test_returns_content_when_authenticated_with_correct_domain(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
with app.test_client() as c, authenticated_user(c, user="test@example.com"):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
def test_redirects_when_authenticated_with_wrong_domain(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
with app.test_client() as c, authenticated_user(c, user="test@not-example.com"):
rv = c.get("/")
self.assertEquals(302, rv.status_code)
def test_returns_content_when_user_in_allowed_list(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
settings.ALLOWED_EXTERNAL_USERS = ["test@not-example.com"]
with app.test_client() as c, authenticated_user(c, user="test@not-example.com"):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
def test_returns_content_when_google_apps_domain_empty(self):
settings.GOOGLE_APPS_DOMAIN = ""
settings.ALLOWED_EXTERNAL_USERS = []
with app.test_client() as c, authenticated_user(c, user="test@whatever.com"):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
class PingTest(TestCase):
def test_ping(self):
with app.test_client() as c:
rv = c.get('/ping')
@ -83,7 +121,7 @@ class DashboardAPITest(BaseTestCase, AuthenticationTestMixin):
self.assertEquals(rv.status_code, 404)
def test_create_new_dashboard(self):
user_email = 'test@everything.me'
user_email = 'test@example.com'
with app.test_client() as c, authenticated_user(c, user=user_email):
dashboard_name = 'Test Dashboard'
rv = json_request(c.post, '/api/dashboards', data={'name': dashboard_name})
@ -182,7 +220,7 @@ class QueryAPITest(BaseTestCase, AuthenticationTestMixin):
self.assertEquals(rv.json['name'], 'Testing')
def test_create_query(self):
user = 'test@everything.me'
user = 'test@example.com'
query_data = {
'name': 'Testing',
'description': 'Description',