Merge pull request #1774 from getredash/patches

Scheduled queries improvements:
This commit is contained in:
Arik Fraimovich 2017-05-18 15:17:03 +03:00 committed by GitHub
commit a7df809c4d
4 changed files with 85 additions and 31 deletions

View File

@ -6,6 +6,7 @@ import hashlib
import itertools
import json
import logging
import time
from funcy import project
@ -16,7 +17,7 @@ from passlib.apps import custom_app_context as pwd_context
from redash import redis_connection, utils
from redash.destinations import (get_configuration_schema_for_destination_type,
get_destination)
from redash.metrics import database
from redash.metrics import database # noqa: F401
from redash.permissions import has_access, view_only
from redash.query_runner import (get_configuration_schema_for_query_runner_type,
get_query_runner)
@ -28,15 +29,39 @@ from sqlalchemy.event import listens_for
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import backref, joinedload, object_session, subqueryload
# noinspection PyUnresolvedReferences
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.exc import NoResultFound # noqa: F401
from sqlalchemy.types import TypeDecorator
db = SQLAlchemy(session_options={
'expire_on_commit': False
})
Column = functools.partial(db.Column, nullable=False)
class ScheduledQueriesExecutions(object):
KEY_NAME = 'sq:executed_at'
def __init__(self):
self.executions = {}
def refresh(self):
self.executions = redis_connection.hgetall(self.KEY_NAME)
def update(self, query_id):
redis_connection.hmset(self.KEY_NAME, {
query_id: time.time()
})
def get(self, query_id):
timestamp = self.executions.get(str(query_id))
if timestamp:
timestamp = utils.dt_from_timestamp(timestamp)
return timestamp
scheduled_queries_executions = ScheduledQueriesExecutions()
# AccessPermission and Change use a 'generic foreign key' approach to refer to
# either queries or dashboards.
# TODO replace this with association tables.
@ -329,7 +354,7 @@ class User(TimestampMixin, db.Model, BelongsToOrgMixin, UserMixin, PermissionsCh
name = Column(db.String(320))
email = Column(db.String(320))
password_hash = Column(db.String(128), nullable=True)
#XXX replace with association table
# XXX replace with association table
group_ids = Column('groups', MutableList.as_mutable(postgresql.ARRAY(db.Integer)), nullable=True)
api_key = Column(db.String(40),
default=lambda: generate_token(40),
@ -554,7 +579,7 @@ class DataSource(BelongsToOrgMixin, db.Model):
def get_by_name(cls, name):
return cls.query.filter(cls.name == name).one()
#XXX examine call sites to see if a regular SQLA collection would work better
# XXX examine call sites to see if a regular SQLA collection would work better
@property
def groups(self):
groups = db.session.query(DataSourceGroup).filter(
@ -563,7 +588,7 @@ class DataSource(BelongsToOrgMixin, db.Model):
class DataSourceGroup(db.Model):
#XXX drop id, use datasource/group as PK
# XXX drop id, use datasource/group as PK
id = Column(db.Integer, primary_key=True)
data_source_id = Column(db.Integer, db.ForeignKey("data_sources.id"))
data_source = db.relationship(DataSource, back_populates="data_source_groups")
@ -821,7 +846,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
joinedload(Query.latest_query_data).load_only('runtime', 'retrieved_at'))
.join(DataSourceGroup, Query.data_source_id == DataSourceGroup.data_source_id)
.filter(Query.is_archived == False)
.filter(DataSourceGroup.group_id.in_(group_ids))\
.filter(DataSourceGroup.group_id.in_(group_ids))
.order_by(Query.created_at.desc()))
if not drafts:
@ -842,12 +867,16 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
now = utils.utcnow()
outdated_queries = {}
scheduled_queries_executions.refresh()
for query in queries:
if query.latest_query_data:
retrieved_at = query.latest_query_data.retrieved_at
else:
retrieved_at = now
retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at
if should_schedule_next(retrieved_at, now, query.schedule, query.schedule_failures):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
@ -966,11 +995,11 @@ class AccessPermission(GFKBase, db.Model):
@classmethod
def grant(cls, obj, access_type, grantee, grantor):
grant = cls.query.filter(cls.object_type==obj.__tablename__,
cls.object_id==obj.id,
cls.access_type==access_type,
cls.grantee==grantee,
cls.grantor==grantor).one_or_none()
grant = cls.query.filter(cls.object_type == obj.__tablename__,
cls.object_id == obj.id,
cls.access_type == access_type,
cls.grantee == grantee,
cls.grantor == grantor).one_or_none()
if not grant:
grant = cls(object_type=obj.__tablename__,
@ -1084,12 +1113,12 @@ class Alert(TimestampMixin, db.Model):
return db.session.query(Alert)\
.options(joinedload(Alert.user), joinedload(Alert.query_rel))\
.join(Query)\
.join(DataSourceGroup, DataSourceGroup.data_source_id==Query.data_source_id)\
.join(DataSourceGroup, DataSourceGroup.data_source_id == Query.data_source_id)\
.filter(DataSourceGroup.group_id.in_(group_ids))
@classmethod
def get_by_id_and_org(cls, id, org):
return db.session.query(Alert).join(Query).filter(Alert.id==id, Query.org==org).one()
return db.session.query(Alert).join(Query).filter(Alert.id == id, Query.org == org).one()
def to_dict(self, full=True):
d = {
@ -1271,7 +1300,7 @@ class Dashboard(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model
@classmethod
def get_by_slug_and_org(cls, slug, org):
return cls.query.filter(cls.slug == slug, cls.org==org).one()
return cls.query.filter(cls.slug == slug, cls.org == org).one()
def __unicode__(self):
return u"%s=%s" % (self.id, self.name)
@ -1360,7 +1389,7 @@ class Widget(TimestampMixin, db.Model):
@classmethod
def get_by_id_and_org(cls, widget_id, org):
return db.session.query(cls).join(Dashboard).filter(cls.id == widget_id, Dashboard.org== org).one()
return db.session.query(cls).join(Dashboard).filter(cls.id == widget_id, Dashboard.org == org).one()
class Event(db.Model):
@ -1415,7 +1444,7 @@ class ApiKey(TimestampMixin, GFKBase, db.Model):
org = db.relationship(Organization)
api_key = Column(db.String(255), index=True, default=lambda: generate_token(40))
active = Column(db.Boolean, default=True)
#'object' provided by GFKBase
# 'object' provided by GFKBase
created_by_id = Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
created_by = db.relationship(User)
@ -1424,11 +1453,11 @@ class ApiKey(TimestampMixin, GFKBase, db.Model):
@classmethod
def get_by_api_key(cls, api_key):
return cls.query.filter(cls.api_key==api_key, cls.active==True).one()
return cls.query.filter(cls.api_key == api_key, cls.active == True).one()
@classmethod
def get_by_object(cls, object):
return cls.query.filter(cls.object_type==object.__class__.__tablename__, cls.object_id==object.id, cls.active==True).first()
return cls.query.filter(cls.object_type == object.__class__.__tablename__, cls.object_id == object.id, cls.active == True).first()
@classmethod
def create_for_object(cls, object, user):
@ -1475,7 +1504,7 @@ class NotificationDestination(BelongsToOrgMixin, db.Model):
@classmethod
def all(cls, org):
notification_destinations = cls.query.filter(cls.org==org).order_by(cls.id.asc())
notification_destinations = cls.query.filter(cls.org == org).order_by(cls.id.asc())
return notification_destinations
@ -1567,6 +1596,6 @@ def init_db():
default_group = Group(name='default', permissions=Group.DEFAULT_PERMISSIONS, org=default_org, type=Group.BUILTIN_GROUP)
db.session.add_all([default_org, admin_group, default_group])
#XXX remove after fixing User.group_ids
# XXX remove after fixing User.group_ids
db.session.commit()
return default_org, admin_group, default_group

View File

@ -3,6 +3,7 @@ import logging
import signal
import time
import pystache
import redis
from celery.exceptions import SoftTimeLimitExceeded
@ -271,7 +272,15 @@ def refresh_queries():
elif query.data_source.paused:
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
else:
enqueue_query(query.query_text, query.data_source, query.user_id,
# if query.options and 'parameters' in query.options and len(query.options['parameters']) > 0:
if query.options and len(query.options.get('parameters', [])) > 0:
query_params = {p['name']: p['value']
for p in query.options['parameters']}
query_text = pystache.render(query.query_text, query_params)
else:
query_text = query.query_text
enqueue_query(query_text, query.data_source, query.user_id,
scheduled_query=query,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
@ -410,6 +419,8 @@ class QueryExecutor(object):
self.query_hash,
self.data_source_id,
False, metadata)
if self.tracker.scheduled:
models.scheduled_queries_executions.update(self.tracker.query_id)
def run(self):
signal.signal(signal.SIGINT, signal_handler)

View File

@ -28,6 +28,15 @@ def utcnow():
return datetime.datetime.now(pytz.utc)
def dt_from_timestamp(timestamp, tz_aware=True):
timestamp = datetime.datetime.utcfromtimestamp(float(timestamp))
if tz_aware:
timestamp = timestamp.replace(tzinfo=pytz.utc)
return timestamp
def slugify(s):
return re.sub('[^a-z0-9_\-]+', '-', s.lower())
@ -156,5 +165,3 @@ def base_url(org):
return "https://{}/{}".format(settings.HOST, org.slug)
return settings.HOST

View File

@ -2,9 +2,11 @@
import datetime
import json
from unittest import TestCase
import mock
from dateutil.parser import parse as date_parse
from tests import BaseTestCase
from redash import models
from redash.models import db
from redash.utils import gen_query_hash, utcnow
@ -79,12 +81,6 @@ class QueryOutdatedQueriesTest(BaseTestCase):
self.assertNotIn(query, queries)
def test_outdated_queries_skips_unscheduled_queries(self):
query = self.factory.create_query(schedule='60')
queries = models.Query.outdated_queries()
self.assertNotIn(query, queries)
def test_outdated_queries_works_with_ttl_based_schedule(self):
two_hours_ago = utcnow() - datetime.timedelta(hours=2)
query = self.factory.create_query(schedule="3600")
@ -94,6 +90,17 @@ class QueryOutdatedQueriesTest(BaseTestCase):
queries = models.Query.outdated_queries()
self.assertIn(query, queries)
def test_outdated_queries_works_scheduled_queries_tracker(self):
two_hours_ago = datetime.datetime.now() - datetime.timedelta(hours=2)
query = self.factory.create_query(schedule="3600")
query_result = self.factory.create_query_result(query=query, retrieved_at=two_hours_ago)
query.latest_query_data = query_result
models.scheduled_queries_executions.update(query.id)
queries = models.Query.outdated_queries()
self.assertNotIn(query, queries)
def test_skips_fresh_queries(self):
half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30)
query = self.factory.create_query(schedule="3600")
@ -450,7 +457,7 @@ class TestQueryResultStoreResult(BaseTestCase):
query_result, _ = models.QueryResult.store_result(
self.data_source.org, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)
self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertNotEqual(query3.latest_query_data, query_result)