refresh_queries shouldn't break because of a single query having a bad schedule object (#4163)

* move filtering of invalid schedules to the query

* simplify retrieved_at assignment and wrap in a try/except block to avoid one query blowing up the rest

* refactor refresh_queries to use simpler functions with a single responsibility and add try/except blocks to avoid one query blowing up the rest

* avoid blowing up when job locks point to expired Job objects. Enqueue them again instead

* there's no need to check for the existence of interval - all schedules have intervals

* disable faulty schedules

* reduce FP style in refresh_queries

* report refresh_queries errors to Sentry (if it is configured)

* avoid using exists+fetch and use exceptions instead
This commit is contained in:
Omer Lachish 2020-03-01 11:02:46 +02:00 committed by GitHub
parent b0f1cdd194
commit a9cb87d4b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 210 additions and 267 deletions

View File

@ -36,6 +36,7 @@ from redash.utils import (
json_loads, json_loads,
mustache_render, mustache_render,
base_url, base_url,
sentry,
) )
from redash.utils.configuration import ConfigurationContainer from redash.utils.configuration import ConfigurationContainer
from redash.models.parameterized_query import ParameterizedQuery from redash.models.parameterized_query import ParameterizedQuery
@ -630,34 +631,39 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
scheduled_queries_executions.refresh() scheduled_queries_executions.refresh()
for query in queries: for query in queries:
if query.schedule["interval"] is None: try:
continue if query.schedule.get("disabled"):
if query.schedule["until"] is not None:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)
if schedule_until <= now:
continue continue
if query.latest_query_data: if query.schedule["until"]:
retrieved_at = query.latest_query_data.retrieved_at schedule_until = pytz.utc.localize(
else: datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
retrieved_at = now )
retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at if schedule_until <= now:
continue
if should_schedule_next( retrieved_at = scheduled_queries_executions.get(query.id) or (
retrieved_at, query.latest_query_data and query.latest_query_data.retrieved_at
now, )
query.schedule["interval"],
query.schedule["time"], if should_schedule_next(
query.schedule["day_of_week"], retrieved_at or now,
query.schedule_failures, now,
): query.schedule["interval"],
key = "{}:{}".format(query.query_hash, query.data_source_id) query.schedule["time"],
outdated_queries[key] = query query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
except Exception as e:
query.schedule["disabled"] = True
db.session.commit()
message = "Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled." % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)
return list(outdated_queries.values()) return list(outdated_queries.values())

View File

@ -5,6 +5,7 @@ import redis
from rq import get_current_job from rq import get_current_job
from rq.job import JobStatus from rq.job import JobStatus
from rq.timeouts import JobTimeoutException from rq.timeouts import JobTimeoutException
from rq.exceptions import NoSuchJobError
from redash import models, redis_connection, settings from redash import models, redis_connection, settings
from redash.query_runner import InterruptException from redash.query_runner import InterruptException
@ -43,16 +44,22 @@ def enqueue_query(
job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id: if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id) logger.info("[%s] Found existing job: %s", query_hash, job_id)
job_complete = None
job = Job.fetch(job_id) try:
job = Job.fetch(job_id)
job_exists = True
status = job.get_status()
job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED]
status = job.get_status() if job_complete:
if status in [JobStatus.FINISHED, JobStatus.FAILED]: message = "job found is complete (%s)" % status
logger.info( except NoSuchJobError:
"[%s] job found is ready (%s), removing lock", message = "job found has expired"
query_hash, job_exists = False
status,
) if job_complete or not job_exists:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id)) redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None job = None

View File

@ -8,7 +8,7 @@ from redash.models.parameterized_query import (
QueryDetachedFromDataSourceError, QueryDetachedFromDataSourceError,
) )
from redash.tasks.failure_report import track_failure from redash.tasks.failure_report import track_failure
from redash.utils import json_dumps from redash.utils import json_dumps, sentry
from redash.worker import job, get_job_logger from redash.worker import job, get_job_logger
from .execution import enqueue_query from .execution import enqueue_query
@ -27,85 +27,79 @@ def empty_schedules():
logger.info("Deleted %d schedules.", len(queries)) logger.info("Deleted %d schedules.", len(queries))
def _should_refresh_query(query):
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
return False
elif query.org.is_disabled:
logger.debug("Skipping refresh of %s because org is disabled.", query.id)
return False
elif query.data_source is None:
logger.debug("Skipping refresh of %s because the datasource is none.", query.id)
return False
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
return False
else:
return True
def _apply_default_parameters(query):
parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
return query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
raise
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
raise
else:
return query.query_text
def refresh_queries(): def refresh_queries():
logger.info("Refreshing queries...") logger.info("Refreshing queries...")
enqueued = []
for query in models.Query.outdated_queries():
if not _should_refresh_query(query):
continue
outdated_queries_count = 0 try:
query_ids = [] enqueue_query(
_apply_default_parameters(query),
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)
enqueued.append(query)
except Exception as e:
message = "Could not enqueue query %d due to %s" % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)
with statsd_client.timer("manager.outdated_queries_lookup"): status = {
for query in models.Query.outdated_queries(): "outdated_queries_count": len(enqueued),
if settings.FEATURE_DISABLE_REFRESH_QUERIES: "last_refresh_at": time.time(),
logger.info("Disabled refresh queries.") "query_ids": json_dumps([q.id for q in enqueued]),
elif query.org.is_disabled: }
logger.debug(
"Skipping refresh of %s because org is disabled.", query.id
)
elif query.data_source is None:
logger.debug(
"Skipping refresh of %s because the datasource is none.", query.id
)
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
else:
query_text = query.query_text
parameters = {p["name"]: p.get("value") for p in query.parameters} redis_connection.hmset("redash:status", status)
if any(parameters): logger.info("Done refreshing queries: %s" % status)
try:
query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
continue
enqueue_query(
query_text,
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)
query_ids.append(query.id)
outdated_queries_count += 1
statsd_client.gauge("manager.outdated_queries", outdated_queries_count)
logger.info(
"Done refreshing queries. Found %d outdated queries: %s"
% (outdated_queries_count, query_ids)
)
status = redis_connection.hgetall("redash:status")
now = time.time()
redis_connection.hmset(
"redash:status",
{
"outdated_queries_count": outdated_queries_count,
"last_refresh_at": now,
"query_ids": json_dumps(query_ids),
},
)
statsd_client.gauge(
"manager.seconds_since_refresh", now - float(status.get("last_refresh_at", now))
)
def cleanup_query_results(): def cleanup_query_results():

View File

@ -1,4 +1,5 @@
import sentry_sdk import sentry_sdk
from funcy import iffy
from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration from sentry_sdk.integrations.redis import RedisIntegration
@ -33,3 +34,6 @@ def init():
RqIntegration(), RqIntegration(),
], ],
) )
capture_message = iffy(lambda _: settings.SENTRY_DSN, sentry_sdk.capture_message)

View File

@ -4,6 +4,7 @@ import uuid
from mock import patch, Mock from mock import patch, Mock
from rq import Connection from rq import Connection
from rq.exceptions import NoSuchJobError
from tests import BaseTestCase from tests import BaseTestCase
from redash import redis_connection, rq_redis_connection, models from redash import redis_connection, rq_redis_connection, models
@ -67,6 +68,33 @@ class TestEnqueueTask(BaseTestCase):
self.assertEqual(1, enqueue.call_count) self.assertEqual(1, enqueue.call_count)
def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):
query = self.factory.create_query()
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)
# "expire" the previous job
fetch_job.side_effect = NoSuchJobError
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)
self.assertEqual(2, enqueue.call_count)
@patch("redash.settings.dynamic_settings.query_time_limit", return_value=60) @patch("redash.settings.dynamic_settings.query_time_limit", return_value=60)
def test_limits_query_time(self, _, enqueue, __): def test_limits_query_time(self, _, enqueue, __):
query = self.factory.create_query() query = self.factory.create_query()

View File

@ -159,16 +159,26 @@ class ShouldScheduleNextTest(TestCase):
class QueryOutdatedQueriesTest(BaseTestCase): class QueryOutdatedQueriesTest(BaseTestCase):
def schedule(self, **kwargs):
schedule = {"interval": None, "time": None, "until": None, "day_of_week": None}
schedule.update(**kwargs)
return schedule
def create_scheduled_query(self, **kwargs):
return self.factory.create_query(schedule=self.schedule(**kwargs))
def fake_previous_execution(self, query, **kwargs):
retrieved_at = utcnow() - datetime.timedelta(**kwargs)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at,
query_text=query.query_text,
query_hash=query.query_hash,
)
query.latest_query_data = query_result
# TODO: this test can be refactored to use mock version of should_schedule_next to simplify it. # TODO: this test can be refactored to use mock version of should_schedule_next to simplify it.
def test_outdated_queries_skips_unscheduled_queries(self): def test_outdated_queries_skips_unscheduled_queries(self):
query = self.factory.create_query( query = self.create_scheduled_query()
schedule={
"interval": None,
"time": None,
"until": None,
"day_of_week": None,
}
)
query_with_none = self.factory.create_query(schedule=None) query_with_none = self.factory.create_query(schedule=None)
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
@ -177,71 +187,33 @@ class QueryOutdatedQueriesTest(BaseTestCase):
self.assertNotIn(query_with_none, queries) self.assertNotIn(query_with_none, queries)
def test_outdated_queries_works_with_ttl_based_schedule(self): def test_outdated_queries_works_with_ttl_based_schedule(self):
two_hours_ago = utcnow() - datetime.timedelta(hours=2) query = self.create_scheduled_query(interval="3600")
query = self.factory.create_query( self.fake_previous_execution(query, hours=2)
schedule={
"interval": "3600",
"time": None,
"until": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result(
query=query.query_text, retrieved_at=two_hours_ago
)
query.latest_query_data = query_result
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
self.assertIn(query, queries) self.assertIn(query, queries)
def test_outdated_queries_works_scheduled_queries_tracker(self): def test_outdated_queries_works_scheduled_queries_tracker(self):
two_hours_ago = utcnow() - datetime.timedelta(hours=2) query = self.create_scheduled_query(interval="3600")
query = self.factory.create_query( self.fake_previous_execution(query, hours=2)
schedule={
"interval": "3600",
"time": None,
"until": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result(
query=query, retrieved_at=two_hours_ago
)
query.latest_query_data = query_result
models.scheduled_queries_executions.update(query.id) models.scheduled_queries_executions.update(query.id)
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
self.assertNotIn(query, queries) self.assertNotIn(query, queries)
def test_skips_fresh_queries(self): def test_skips_fresh_queries(self):
half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) query = self.create_scheduled_query(interval="3600")
query = self.factory.create_query( self.fake_previous_execution(query, minutes=30)
schedule={
"interval": "3600",
"time": None,
"until": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result(
query=query.query_text, retrieved_at=half_an_hour_ago
)
query.latest_query_data = query_result
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
self.assertNotIn(query, queries) self.assertNotIn(query, queries)
def test_outdated_queries_works_with_specific_time_schedule(self): def test_outdated_queries_works_with_specific_time_schedule(self):
half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30)
query = self.factory.create_query( query = self.create_scheduled_query(interval="86400", time=half_an_hour_ago.strftime("%H:%M"))
schedule={
"interval": "86400",
"time": half_an_hour_ago.strftime("%H:%M"),
"until": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result( query_result = self.factory.create_query_result(
query=query.query_text, query=query.query_text,
retrieved_at=half_an_hour_ago - datetime.timedelta(days=1), retrieved_at=half_an_hour_ago - datetime.timedelta(days=1),
@ -256,32 +228,14 @@ class QueryOutdatedQueriesTest(BaseTestCase):
Only one query per data source with the same text will be reported by Only one query per data source with the same text will be reported by
Query.outdated_queries(). Query.outdated_queries().
""" """
query = self.factory.create_query( query = self.create_scheduled_query(interval="60")
schedule={
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
}
)
query2 = self.factory.create_query( query2 = self.factory.create_query(
schedule={ schedule=self.schedule(interval="60"),
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
},
query_text=query.query_text, query_text=query.query_text,
query_hash=query.query_hash, query_hash=query.query_hash,
) )
retrieved_at = utcnow() - datetime.timedelta(minutes=10) self.fake_previous_execution(query, minutes=10)
query_result = self.factory.create_query_result( self.fake_previous_execution(query2, minutes=10)
retrieved_at=retrieved_at,
query_text=query.query_text,
query_hash=query.query_hash,
)
query.latest_query_data = query_result
query2.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), [query2]) self.assertEqual(list(models.Query.outdated_queries()), [query2])
@ -291,32 +245,16 @@ class QueryOutdatedQueriesTest(BaseTestCase):
Query.outdated_queries() even if they have the same query text. Query.outdated_queries() even if they have the same query text.
""" """
query = self.factory.create_query( query = self.factory.create_query(
schedule={ schedule=self.schedule(interval="60"),
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
},
data_source=self.factory.create_data_source(), data_source=self.factory.create_data_source(),
) )
query2 = self.factory.create_query( query2 = self.factory.create_query(
schedule={ schedule=self.schedule(interval="60"),
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
},
query_text=query.query_text, query_text=query.query_text,
query_hash=query.query_hash, query_hash=query.query_hash,
) )
retrieved_at = utcnow() - datetime.timedelta(minutes=10) self.fake_previous_execution(query, minutes=10)
query_result = self.factory.create_query_result( self.fake_previous_execution(query2, minutes=10)
retrieved_at=retrieved_at,
query_text=query.query_text,
query_hash=query.query_hash,
)
query.latest_query_data = query_result
query2.latest_query_data = query_result
outdated_queries = models.Query.outdated_queries() outdated_queries = models.Query.outdated_queries()
self.assertEqual(len(outdated_queries), 2) self.assertEqual(len(outdated_queries), 2)
@ -328,32 +266,14 @@ class QueryOutdatedQueriesTest(BaseTestCase):
If multiple queries with the same text exist, only ones that are If multiple queries with the same text exist, only ones that are
scheduled to be refreshed are reported by Query.outdated_queries(). scheduled to be refreshed are reported by Query.outdated_queries().
""" """
query = self.factory.create_query( query = self.create_scheduled_query(interval="60")
schedule={
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
}
)
query2 = self.factory.create_query( query2 = self.factory.create_query(
schedule={ schedule=self.schedule(interval="3600"),
"interval": "3600",
"until": None,
"time": None,
"day_of_week": None,
},
query_text=query.query_text, query_text=query.query_text,
query_hash=query.query_hash, query_hash=query.query_hash,
) )
retrieved_at = utcnow() - datetime.timedelta(minutes=10) self.fake_previous_execution(query, minutes=10)
query_result = self.factory.create_query_result( self.fake_previous_execution(query2, minutes=10)
retrieved_at=retrieved_at,
query_text=query.query_text,
query_hash=query.query_hash,
)
query.latest_query_data = query_result
query2.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), [query]) self.assertEqual(list(models.Query.outdated_queries()), [query])
@ -363,25 +283,14 @@ class QueryOutdatedQueriesTest(BaseTestCase):
for scheduling future execution. for scheduling future execution.
""" """
query = self.factory.create_query( query = self.factory.create_query(
schedule={ schedule=self.schedule(interval="60"),
"interval": "60",
"until": None,
"time": None,
"day_of_week": None,
},
schedule_failures=4, schedule_failures=4,
) )
retrieved_at = utcnow() - datetime.timedelta(minutes=16) self.fake_previous_execution(query, minutes=16)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at,
query_text=query.query_text,
query_hash=query.query_hash,
)
query.latest_query_data = query_result
self.assertEqual(list(models.Query.outdated_queries()), []) self.assertEqual(list(models.Query.outdated_queries()), [])
query_result.retrieved_at = utcnow() - datetime.timedelta(minutes=17) self.fake_previous_execution(query, minutes=17)
self.assertEqual(list(models.Query.outdated_queries()), [query]) self.assertEqual(list(models.Query.outdated_queries()), [query])
def test_schedule_until_after(self): def test_schedule_until_after(self):
@ -390,21 +299,11 @@ class QueryOutdatedQueriesTest(BaseTestCase):
Query.outdated_queries() after the given time is past. Query.outdated_queries() after the given time is past.
""" """
one_day_ago = (utcnow() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") one_day_ago = (utcnow() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
two_hours_ago = utcnow() - datetime.timedelta(hours=2) query = self.create_scheduled_query(interval="3600", until=one_day_ago)
query = self.factory.create_query( self.fake_previous_execution(query, hours=2)
schedule={
"interval": "3600",
"until": one_day_ago,
"time": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result(
query=query.query_text, retrieved_at=two_hours_ago
)
query.latest_query_data = query_result
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
self.assertNotIn(query, queries) self.assertNotIn(query, queries)
def test_schedule_until_before(self): def test_schedule_until_before(self):
@ -413,23 +312,28 @@ class QueryOutdatedQueriesTest(BaseTestCase):
Query.outdated_queries() before the given time is past. Query.outdated_queries() before the given time is past.
""" """
one_day_from_now = (utcnow() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") one_day_from_now = (utcnow() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
two_hours_ago = utcnow() - datetime.timedelta(hours=2) query = self.create_scheduled_query(interval="3600", until=one_day_from_now)
query = self.factory.create_query( self.fake_previous_execution(query, hours=2)
schedule={
"interval": "3600",
"until": one_day_from_now,
"time": None,
"day_of_week": None,
}
)
query_result = self.factory.create_query_result(
query=query.query_text, retrieved_at=two_hours_ago
)
query.latest_query_data = query_result
queries = models.Query.outdated_queries() queries = models.Query.outdated_queries()
self.assertIn(query, queries) self.assertIn(query, queries)
def test_skips_and_disables_faulty_queries(self):
faulty_query = self.create_scheduled_query(until="pigs fly")
valid_query = self.create_scheduled_query(interval="60")
self.fake_previous_execution(valid_query, minutes=10)
queries = models.Query.outdated_queries()
self.assertEqual(list(models.Query.outdated_queries()), [valid_query])
self.assertTrue(faulty_query.schedule.get("disabled"))
def test_skips_disabled_schedules(self):
query = self.create_scheduled_query(disabled=True)
queries = models.Query.outdated_queries()
self.assertNotIn(query, queries)
class QueryArchiveTest(BaseTestCase): class QueryArchiveTest(BaseTestCase):
def test_archive_query_sets_flag(self): def test_archive_query_sets_flag(self):