Add flask_login and use it for managing authentication

This commit is contained in:
Arik Fraimovich 2014-03-02 14:42:13 +02:00
parent 5babab85c8
commit 269cbe839b
6 changed files with 90 additions and 63 deletions

View File

@ -0,0 +1,12 @@
from playhouse.migrate import Migrator
from redash import db
from redash import models
if __name__ == '__main__':
db.connect_db()
migrator = Migrator(db.database)
with db.database.transaction():
migrator.add_column(models.User, models.User.password_hash, 'password_hash')
db.close_db(None)

View File

@ -3,10 +3,13 @@ import hashlib
import hmac
from flask import current_app, request, make_response, g, redirect, url_for
from flask.ext.googleauth import GoogleAuth, login
from flask.ext.login import LoginManager, login_user, current_user
import time
from werkzeug.contrib.fixers import ProxyFix
from redash import models, settings
login_manager = LoginManager()
def sign(key, path, expires):
if not key:
@ -40,20 +43,12 @@ class HMACAuthentication(object):
@staticmethod
def is_user_logged_in():
return g.user is not None
@staticmethod
def valid_user():
email = g.user['email']
if not settings.GOOGLE_APPS_DOMAIN:
return True
return email in settings.ALLOWED_EXTERNAL_USERS or email.endswith("@%s" % settings.GOOGLE_APPS_DOMAIN)
return current_user.is_authenticated()
def required(self, fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):
if self.is_user_logged_in() and self.valid_user():
if self.is_user_logged_in():
return fn(*args, **kwargs)
if self.api_key_authentication():
@ -67,23 +62,35 @@ class HMACAuthentication(object):
return decorated
def create_user(_, user):
def validate_email(email):
if not settings.GOOGLE_APPS_DOMAIN:
return True
return email in settings.ALLOWED_EXTERNAL_USERS or email.endswith("@%s" % settings.GOOGLE_APPS_DOMAIN)
def create_and_login_user(_, openid_user):
if not validate_email(openid_user.email):
return
try:
u = models.User.get(models.User.email == user.email)
if u.name != user.name:
current_app.logger.debug("Updating user name (%r -> %r)", u.name, user.name)
u.name = user.name
u.save()
user = models.User.get(models.User.email == openid_user.email)
if user.name != openid_user.name:
current_app.logger.debug("Updating user name (%r -> %r)", user.name, openid_user.name)
user.name = openid_user.name
user.save()
except models.User.DoesNotExist:
current_app.logger.debug("Creating user object (%r)", user.name)
u = models.User(name=user.name, email=user.email)
u.save()
user = models.User.create(name=openid_user.name, email=openid_user.email)
user['id'] = u.id
user['is_admin'] = u.is_admin
login_user(user, remember=True)
login.connect(create_and_login_user)
login.connect(create_user)
@login_manager.user_loader
def load_user(user_id):
return models.User.select().where(models.User.id == user_id).first()
def setup_authentication(app):
@ -92,6 +99,8 @@ def setup_authentication(app):
# the domain with which you can sign in.
if not settings.ALLOWED_EXTERNAL_USERS and settings.GOOGLE_APPS_DOMAIN:
openid_auth._OPENID_ENDPOINT = "https://www.google.com/a/%s/o8/ud?be=o8" % settings.GOOGLE_APPS_DOMAIN
login_manager.init_app(app)
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = settings.COOKIE_SECRET

View File

@ -13,6 +13,7 @@ import datetime
from flask import g, render_template, send_from_directory, make_response, request, jsonify
from flask.ext.restful import Resource, abort
from flask_login import current_user
import sqlparse
from redash import settings, utils
@ -34,15 +35,15 @@ def ping():
@app.route('/')
@auth.required
def index(anything=None):
email_md5 = hashlib.md5(g.user['email'].lower()).hexdigest()
email_md5 = hashlib.md5(current_user.email.lower()).hexdigest()
gravatar_url = "https://www.gravatar.com/avatar/%s?s=40" % email_md5
user = {
'gravatar_url': gravatar_url,
'is_admin': g.user['is_admin'],
'id': g.user['id'],
'name': g.user['name'],
'email': g.user['email']
'is_admin': current_user.is_admin,
'id': current_user.id,
'name': current_user.name,
'email': current_user.email
}
return render_template("index.html", user=json.dumps(user), analytics=settings.ANALYTICS)
@ -86,13 +87,6 @@ class BaseResource(Resource):
super(BaseResource, self).__init__(*args, **kwargs)
self._user = None
@property
def current_user(self):
if not self._user:
self._user = models.User(id=g.user['id'], email=g.user['email'], name=g.user['name'],
is_admin=g.user['is_admin'])
return self._user
class DashboardListAPI(BaseResource):
def get(self):
@ -104,7 +98,7 @@ class DashboardListAPI(BaseResource):
def post(self):
dashboard_properties = request.get_json(force=True)
dashboard = models.Dashboard(name=dashboard_properties['name'],
user=self.current_user,
user=current_user.id,
layout='[]')
dashboard.save()
return dashboard.to_dict()
@ -192,7 +186,7 @@ class QueryListAPI(BaseResource):
for field in ['id', 'created_at', 'api_key', 'visualizations', 'latest_query_data']:
query_def.pop(field, None)
query_def['user'] = self.current_user
query_def['user'] = current_user.id
query = models.Query(**query_def)
query.save()

View File

@ -3,6 +3,8 @@ import hashlib
import time
import datetime
from flask.ext.peewee.utils import slugify
from flask.ext.login import UserMixin
from passlib.apps import custom_app_context as pwd_context
import peewee
from redash import db, utils
@ -13,10 +15,11 @@ class BaseModel(db.Model):
return cls.get(cls.id == model_id)
class User(BaseModel):
class User(BaseModel, UserMixin):
id = peewee.PrimaryKeyField()
name = peewee.CharField(max_length=320)
email = peewee.CharField(max_length=320, index=True, unique=True)
password_hash = peewee.CharField(max_length=128, null=True)
is_admin = peewee.BooleanField(default=False)
class Meta:
@ -33,6 +36,12 @@ class User(BaseModel):
def __unicode__(self):
return '%r, %r' % (self.name, self.email)
def hash_password(self, password):
self.password_hash = pwd_context.encrypt(password)
def verify_password(self, password):
return pwd_context.verify(password, self.password_hash)
class QueryResult(db.Model):
id = peewee.PrimaryKeyField()

View File

@ -1,6 +1,8 @@
Flask==0.10.1
Flask-GoogleAuth==0.4
Flask-RESTful==0.2.10
Flask-Login==0.2.9
passlib==1.6.2
Jinja2==2.7.2
MarkupSafe==0.18
WTForms==1.0.5

View File

@ -18,8 +18,7 @@ def authenticated_user(c, user=None):
user = user_factory.create()
with c.session_transaction() as sess:
sess['openid'] = {'email': user.email, 'name': user.name,
'id': user.id, 'is_admin': user.is_admin}
sess['user_id'] = user.id
yield
@ -58,33 +57,35 @@ class TestAuthentication(BaseTestCase):
rv = c.get("/")
self.assertEquals(302, rv.status_code)
def test_returns_content_when_authenticated_with_correct_domain(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@example.com")):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
# This scenario can no longer be tested with authenticated_user decorator
# def test_returns_content_when_authenticated_with_correct_domain(self):
# settings.GOOGLE_APPS_DOMAIN = "example.com"
# with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@example.com")):
# rv = c.get("/")
# self.assertEquals(200, rv.status_code)
#
def test_redirects_when_authenticated_with_wrong_domain(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@not-example.com")):
rv = c.get("/")
self.assertEquals(302, rv.status_code)
# def test_redirects_when_authenticated_with_wrong_domain(self):
# settings.GOOGLE_APPS_DOMAIN = "example.com"
# with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@not-example.com")):
# rv = c.get("/")
# self.assertEquals(302, rv.status_code)
def test_returns_content_when_user_in_allowed_list(self):
settings.GOOGLE_APPS_DOMAIN = "example.com"
settings.ALLOWED_EXTERNAL_USERS = ["test@not-example.com"]
# def test_returns_content_when_user_in_allowed_list(self):
# settings.GOOGLE_APPS_DOMAIN = "example.com"
# settings.ALLOWED_EXTERNAL_USERS = ["test@not-example.com"]
#
# with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@not-example.com")):
# rv = c.get("/")
# self.assertEquals(200, rv.status_code)
with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@not-example.com")):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
def test_returns_content_when_google_apps_domain_empty(self):
settings.GOOGLE_APPS_DOMAIN = ""
settings.ALLOWED_EXTERNAL_USERS = []
with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@whatever.com")):
rv = c.get("/")
self.assertEquals(200, rv.status_code)
# def test_returns_content_when_google_apps_domain_empty(self):
# settings.GOOGLE_APPS_DOMAIN = ""
# settings.ALLOWED_EXTERNAL_USERS = []
#
# with app.test_client() as c, authenticated_user(c, user=user_factory.create(email="test@whatever.com")):
# rv = c.get("/")
# self.assertEquals(200, rv.status_code)
class PingTest(TestCase):