Merge pull request #1584 from washort/scheduled-query-backoff

Scheduled query backoff
This commit is contained in:
Arik Fraimovich 2017-02-28 13:19:34 +02:00 committed by GitHub
commit ce8ffae152
6 changed files with 286 additions and 125 deletions

View File

@ -0,0 +1,25 @@
"""add Query.schedule_failures
Revision ID: d1eae8b9893e
Revises: 65fc9ede4746
Create Date: 2017-02-03 01:45:02.954923
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'd1eae8b9893e'
down_revision = '65fc9ede4746'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('queries', sa.Column('schedule_failures', sa.Integer(),
nullable=False, server_default='0'))
def downgrade():
op.drop_column('queries', 'schedule_failures')

View File

@ -645,7 +645,7 @@ class QueryResult(db.Model, BelongsToOrgMixin):
return self.data_source.groups
def should_schedule_next(previous_iteration, now, schedule):
def should_schedule_next(previous_iteration, now, schedule, failures):
if schedule.isdigit():
ttl = int(schedule)
next_iteration = previous_iteration + datetime.timedelta(seconds=ttl)
@ -662,7 +662,8 @@ def should_schedule_next(previous_iteration, now, schedule):
previous_iteration = normalized_previous_iteration - datetime.timedelta(days=1)
next_iteration = (previous_iteration + datetime.timedelta(days=1)).replace(hour=hour, minute=minute)
if failures:
next_iteration += datetime.timedelta(minutes=2**failures)
return now > next_iteration
@ -688,6 +689,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
is_archived = Column(db.Boolean, default=False, index=True)
is_draft = Column(db.Boolean, default=True, index=True)
schedule = Column(db.String(10), nullable=True)
schedule_failures = Column(db.Integer, default=0)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
@ -789,12 +791,14 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
queries = (db.session.query(Query)
.join(QueryResult)
.join(DataSource)
.filter(Query.schedule != None))
.filter(Query.schedule != None)
.order_by(Query.id))
now = utils.utcnow()
outdated_queries = {}
for query in queries:
if should_schedule_next(query.latest_query_data.retrieved_at, now, query.schedule):
if should_schedule_next(query.latest_query_data.retrieved_at, now,
query.schedule, query.schedule_failures):
key = "{}:{}".format(query.query_hash, query.data_source.id)
outdated_queries[key] = query
@ -892,6 +896,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
@listens_for(Query.query_text, 'set')
def gen_query_hash(target, val, oldval, initiator):
target.query_hash = utils.gen_query_hash(val)
target.schedule_failures = 0
@listens_for(Query.user_id, 'set')

View File

@ -199,7 +199,8 @@ class QueryTask(object):
return self._async_result.revoke(terminate=True, signal='SIGINT')
def enqueue_query(query, data_source, user_id, scheduled=False, metadata={}):
def enqueue_query(query, data_source, user_id, scheduled_query=None,
metadata={}):
query_hash = gen_query_hash(query)
logging.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
@ -225,14 +226,21 @@ def enqueue_query(query, data_source, user_id, scheduled=False, metadata={}):
if not job:
pipe.multi()
if scheduled:
if scheduled_query:
queue_name = data_source.scheduled_queue_name
scheduled_query_id = scheduled_query.id
else:
queue_name = data_source.queue_name
scheduled_query_id = None
result = execute_query.apply_async(args=(query, data_source.id, metadata, user_id), queue=queue_name)
result = execute_query.apply_async(args=(
query, data_source.id, metadata, user_id,
scheduled_query_id),
queue=queue_name)
job = QueryTask(async_result=result)
tracker = QueryTaskTracker.create(result.id, 'created', query_hash, data_source.id, scheduled, metadata)
tracker = QueryTaskTracker.create(
result.id, 'created', query_hash, data_source.id,
scheduled_query is not None, metadata)
tracker.save(connection=pipe)
logging.info("[%s] Created new job: %s", query_hash, job.id)
@ -264,7 +272,7 @@ def refresh_queries():
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
else:
enqueue_query(query.query_text, query.data_source, query.user_id,
scheduled=True,
scheduled_query=query,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id)
@ -380,7 +388,8 @@ class QueryExecutionError(Exception):
# We could have created this as a celery.Task derived class, and act as the task itself. But this might result in weird
# issues as the task class created once per process, so decided to have a plain object instead.
class QueryExecutor(object):
def __init__(self, task, query, data_source_id, user_id, metadata):
def __init__(self, task, query, data_source_id, user_id, metadata,
scheduled_query):
self.task = task
self.query = query
self.data_source_id = data_source_id
@ -391,6 +400,7 @@ class QueryExecutor(object):
else:
self.user = None
self.query_hash = gen_query_hash(self.query)
self.scheduled_query = scheduled_query
# Load existing tracker or create a new one if the job was created before code update:
self.tracker = QueryTaskTracker.get_by_task_id(task.request.id) or QueryTaskTracker.create(task.request.id,
'created',
@ -425,7 +435,14 @@ class QueryExecutor(object):
if error:
self.tracker.update(state='failed')
result = QueryExecutionError(error)
if self.scheduled_query:
self.scheduled_query.schedule_failures += 1
models.db.session.add(self.scheduled_query)
else:
if (self.scheduled_query and
self.scheduled_query.schedule_failures > 0):
self.scheduled_query.schedule_failures = 0
models.db.session.add(self.scheduled_query)
query_result, updated_query_ids = models.QueryResult.store_result(
self.data_source.org, self.data_source,
self.query_hash, self.query, data,
@ -452,10 +469,14 @@ class QueryExecutor(object):
return annotated_query
def _log_progress(self, state):
logger.info(u"task=execute_query state=%s query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
state,
self.query_hash, self.data_source.type, self.data_source.id, self.task.request.id, self.task.request.delivery_info['routing_key'],
self.metadata.get('Query ID', 'unknown'), self.metadata.get('Username', 'unknown'))
logger.info(
u"task=execute_query state=%s query_hash=%s type=%s ds_id=%d "
"task_id=%s queue=%s query_id=%s username=%s",
state, self.query_hash, self.data_source.type, self.data_source.id,
self.task.request.id,
self.task.request.delivery_info['routing_key'],
self.metadata.get('Query ID', 'unknown'),
self.metadata.get('Username', 'unknown'))
self.tracker.update(state=state)
def _load_data_source(self):
@ -466,5 +487,11 @@ class QueryExecutor(object):
# user_id is added last as a keyword argument for backward compatability -- to support executing previously submitted
# jobs before the upgrade to this version.
@celery.task(name="redash.tasks.execute_query", bind=True, track_started=True)
def execute_query(self, query, data_source_id, metadata, user_id=None):
return QueryExecutor(self, query, data_source_id, user_id, metadata).run()
def execute_query(self, query, data_source_id, metadata, user_id=None,
scheduled_query_id=None):
if scheduled_query_id is not None:
scheduled_query = models.Query.query.get(scheduled_query_id)
else:
scheduled_query = None
return QueryExecutor(self, query, data_source_id, user_id, metadata,
scheduled_query).run()

View File

@ -1,11 +1,14 @@
from tests import BaseTestCase
from redash import redis_connection
from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query
from unittest import TestCase
from mock import MagicMock
from collections import namedtuple
import uuid
import mock
from tests import BaseTestCase
from redash import redis_connection, models
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query
class TestPrune(TestCase):
def setUp(self):
@ -45,7 +48,7 @@ def gen_hash(*args, **kwargs):
class TestEnqueueTask(BaseTestCase):
def test_multiple_enqueue_of_same_query(self):
query = self.factory.create_query()
execute_query.apply_async = MagicMock(side_effect=gen_hash)
execute_query.apply_async = mock.MagicMock(side_effect=gen_hash)
enqueue_query(query.query_text, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
@ -58,7 +61,7 @@ class TestEnqueueTask(BaseTestCase):
def test_multiple_enqueue_of_different_query(self):
query = self.factory.create_query()
execute_query.apply_async = MagicMock(side_effect=gen_hash)
execute_query.apply_async = mock.MagicMock(side_effect=gen_hash)
enqueue_query(query.query_text, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text + '2', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
@ -68,3 +71,79 @@ class TestEnqueueTask(BaseTestCase):
self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
class QueryExecutorTests(BaseTestCase):
def test_success(self):
"""
``execute_query`` invokes the query runner and stores a query result.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query("SELECT 1, 2",
self.factory.data_source.id, {})
self.assertEqual(1, qr.call_count)
result = models.QueryResult.query.get(result_id)
self.assertEqual(result.data, '{1,2}')
def test_success_scheduled(self):
"""
Scheduled queries remember their latest results.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query(
"SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
models.db.session.refresh(q)
self.assertEqual(q.schedule_failures, 0)
result = models.QueryResult.query.get(result_id)
self.assertEqual(q.latest_query_data, result)
def test_failure_scheduled(self):
"""
Scheduled queries that fail have their failure recorded.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.exception = ValueError("broken")
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
self.assertEqual(q.schedule_failures, 1)
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
models.db.session.refresh(q)
self.assertEqual(q.schedule_failures, 2)
def test_success_after_failure(self):
"""
Query execution success resets the failure counter.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule=300)
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.exception = ValueError("broken")
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
models.db.session.refresh(q)
self.assertEqual(q.schedule_failures, 1)
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
models.db.session.refresh(q)
self.assertEqual(q.schedule_failures, 0)

View File

@ -1,109 +1,47 @@
import datetime
from mock import patch, call, ANY
from tests import BaseTestCase
from redash.utils import utcnow
from redash.tasks import refresh_queries
from redash.models import db
from redash.models import Query
# TODO: this test should be split into two:
# 1. tests for Query.outdated_queries method
# 2. test for the refresh_query task
class TestRefreshQueries(BaseTestCase):
class TestRefreshQuery(BaseTestCase):
def test_enqueues_outdated_queries(self):
query = self.factory.create_query(schedule="60")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
db.session.add(query)
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
"""
refresh_queries() launches an execution task for each query returned
from Query.outdated_queries().
"""
query1 = self.factory.create_query()
query2 = self.factory.create_query(
query_text="select 42;",
data_source=self.factory.create_data_source())
oq = staticmethod(lambda: [query1, query2])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \
patch.object(Query, 'outdated_queries', oq):
refresh_queries()
add_job_mock.assert_called_with(query.query_text, query.data_source, query.user_id, scheduled=True, metadata=ANY)
self.assertEqual(add_job_mock.call_count, 2)
add_job_mock.assert_has_calls([
call(query1.query_text, query1.data_source, query1.user_id,
scheduled_query=query1, metadata=ANY),
call(query2.query_text, query2.data_source, query2.user_id,
scheduled_query=query2, metadata=ANY)], any_order=True)
def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self):
query = self.factory.create_query(schedule="60")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
db.session.add(query)
db.session.commit()
"""
refresh_queries() does not launch execution tasks for queries whose
data source is paused.
"""
query = self.factory.create_query()
oq = staticmethod(lambda: [query])
query.data_source.pause()
with patch.object(Query, 'outdated_queries', oq):
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
query.data_source.resume()
query.data_source.resume()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(query.query_text, query.data_source, query.user_id, scheduled=True, metadata=ANY)
def test_skips_fresh_queries(self):
query = self.factory.create_query(schedule="1200")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
self.assertFalse(add_job_mock.called)
def test_skips_queries_with_no_ttl(self):
query = self.factory.create_query(schedule=None)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
self.assertFalse(add_job_mock.called)
def test_enqueues_query_only_once(self):
query = self.factory.create_query(schedule="60")
query2 = self.factory.create_query(schedule="60", query_text=query.query_text, query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
db.session.add_all([query, query2])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_once_with(query.query_text, query.data_source, query.user_id, scheduled=True, metadata=ANY)#{'Query ID': query.id, 'Username': 'Scheduled'})
def test_enqueues_query_with_correct_data_source(self):
query = self.factory.create_query(schedule="60", data_source=self.factory.create_data_source())
query2 = self.factory.create_query(schedule="60", query_text=query.query_text, query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
db.session.add_all([query, query2])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_has_calls([call(query2.query_text, query2.data_source, query2.user_id, scheduled=True, metadata=ANY),
call(query.query_text, query.data_source, query.user_id, scheduled=True, metadata=ANY)],
any_order=True)
self.assertEquals(2, add_job_mock.call_count)
def test_enqueues_only_for_relevant_data_source(self):
query = self.factory.create_query(schedule="60")
query2 = self.factory.create_query(schedule="3600", query_text=query.query_text, query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
db.session.add_all([query, query2])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_once_with(query.query_text, query.data_source, query.user_id, scheduled=True, metadata=ANY)
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(
query.query_text, query.data_source, query.user_id,
scheduled_query=query, metadata=ANY)

View File

@ -30,31 +30,45 @@ class ShouldScheduleNextTest(TestCase):
def test_interval_schedule_that_needs_reschedule(self):
now = utcnow()
two_hours_ago = now - datetime.timedelta(hours=2)
self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600"))
self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600",
0))
def test_interval_schedule_that_doesnt_need_reschedule(self):
now = utcnow()
half_an_hour_ago = now - datetime.timedelta(minutes=30)
self.assertFalse(models.should_schedule_next(half_an_hour_ago, now, "3600"))
self.assertFalse(models.should_schedule_next(half_an_hour_ago, now,
"3600", 0))
def test_exact_time_that_needs_reschedule(self):
now = utcnow()
yesterday = now - datetime.timedelta(days=1)
scheduled_datetime = now - datetime.timedelta(hours=3)
scheduled_time = "{:02d}:00".format(scheduled_datetime.hour)
self.assertTrue(models.should_schedule_next(yesterday, now, scheduled_time))
self.assertTrue(models.should_schedule_next(yesterday, now,
scheduled_time, 0))
def test_exact_time_that_doesnt_need_reschedule(self):
now = date_parse("2015-10-16 20:10")
yesterday = date_parse("2015-10-15 23:07")
schedule = "23:00"
self.assertFalse(models.should_schedule_next(yesterday, now, schedule))
self.assertFalse(models.should_schedule_next(yesterday, now, schedule,
0))
def test_exact_time_with_day_change(self):
now = utcnow().replace(hour=0, minute=1)
previous = (now - datetime.timedelta(days=2)).replace(hour=23, minute=59)
previous = (now - datetime.timedelta(days=2)).replace(hour=23,
minute=59)
schedule = "23:59".format(now.hour + 3)
self.assertTrue(models.should_schedule_next(previous, now, schedule))
self.assertTrue(models.should_schedule_next(previous, now, schedule,
0))
def test_backoff(self):
now = utcnow()
two_hours_ago = now - datetime.timedelta(hours=2)
self.assertTrue(models.should_schedule_next(two_hours_ago, now, "3600",
5))
self.assertFalse(models.should_schedule_next(two_hours_ago, now,
"3600", 10))
class QueryOutdatedQueriesTest(BaseTestCase):
@ -92,6 +106,79 @@ class QueryOutdatedQueriesTest(BaseTestCase):
queries = models.Query.outdated_queries()
self.assertIn(query, queries)
def test_enqueues_query_only_once(self):
"""
Only one query per data source with the same text will be reported by
Query.outdated_queries().
"""
query = self.factory.create_query(schedule="60")
query2 = self.factory.create_query(
schedule="60", query_text=query.query_text,
query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), [query2])
def test_enqueues_query_with_correct_data_source(self):
"""
Queries from different data sources will be reported by
Query.outdated_queries() even if they have the same query text.
"""
query = self.factory.create_query(
schedule="60", data_source=self.factory.create_data_source())
query2 = self.factory.create_query(
schedule="60", query_text=query.query_text,
query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()),
[query2, query])
def test_enqueues_only_for_relevant_data_source(self):
"""
If multiple queries with the same text exist, only ones that are
scheduled to be refreshed are reported by Query.outdated_queries().
"""
query = self.factory.create_query(schedule="60")
query2 = self.factory.create_query(
schedule="3600", query_text=query.query_text,
query_hash=query.query_hash)
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), [query])
def test_failure_extends_schedule(self):
"""
Execution failures recorded for a query result in exponential backoff
for scheduling future execution.
"""
query = self.factory.create_query(schedule="60", schedule_failures=4)
retrieved_at = utcnow() - datetime.timedelta(minutes=16)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at, query_text=query.query_text,
query_hash=query.query_hash)
query.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), [])
query_result.retrieved_at = utcnow() - datetime.timedelta(minutes=17)
self.assertEqual(list(models.Query.outdated_queries()), [query])
class QueryArchiveTest(BaseTestCase):
def setUp(self):