From a9cb87d4b3b5efb96920db5bd225688966f9b837 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 1 Mar 2020 11:02:46 +0200 Subject: [PATCH] refresh_queries shouldn't break because of a single query having a bad schedule object (#4163) * move filtering of invalid schedules to the query * simplify retrieved_at assignment and wrap in a try/except block to avoid one query blowing up the rest * refactor refresh_queries to use simpler functions with a single responsibility and add try/except blocks to avoid one query blowing up the rest * avoid blowing up when job locks point to expired Job objects. Enqueue them again instead * there's no need to check for the existence of interval - all schedules have intervals * disable faulty schedules * reduce FP style in refresh_queries * report refresh_queries errors to Sentry (if it is configured) * avoid using exists+fetch and use exceptions instead --- redash/models/__init__.py | 54 ++++--- redash/tasks/queries/execution.py | 23 ++- redash/tasks/queries/maintenance.py | 144 +++++++++--------- redash/utils/sentry.py | 4 + tests/tasks/test_queries.py | 28 ++++ tests/test_models.py | 224 ++++++++-------------------- 6 files changed, 210 insertions(+), 267 deletions(-) diff --git a/redash/models/__init__.py b/redash/models/__init__.py index 78d11cd6..45e6821a 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -36,6 +36,7 @@ from redash.utils import ( json_loads, mustache_render, base_url, + sentry, ) from redash.utils.configuration import ConfigurationContainer from redash.models.parameterized_query import ParameterizedQuery @@ -630,34 +631,39 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): scheduled_queries_executions.refresh() for query in queries: - if query.schedule["interval"] is None: - continue - - if query.schedule["until"] is not None: - schedule_until = pytz.utc.localize( - datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d") - ) - - if schedule_until <= now: + try: + if query.schedule.get("disabled"): continue - if query.latest_query_data: - retrieved_at = query.latest_query_data.retrieved_at - else: - retrieved_at = now + if query.schedule["until"]: + schedule_until = pytz.utc.localize( + datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d") + ) - retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at + if schedule_until <= now: + continue - if should_schedule_next( - retrieved_at, - now, - query.schedule["interval"], - query.schedule["time"], - query.schedule["day_of_week"], - query.schedule_failures, - ): - key = "{}:{}".format(query.query_hash, query.data_source_id) - outdated_queries[key] = query + retrieved_at = scheduled_queries_executions.get(query.id) or ( + query.latest_query_data and query.latest_query_data.retrieved_at + ) + + if should_schedule_next( + retrieved_at or now, + now, + query.schedule["interval"], + query.schedule["time"], + query.schedule["day_of_week"], + query.schedule_failures, + ): + key = "{}:{}".format(query.query_hash, query.data_source_id) + outdated_queries[key] = query + except Exception as e: + query.schedule["disabled"] = True + db.session.commit() + + message = "Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled." % (query.id, repr(e)) + logging.info(message) + sentry.capture_message(message) return list(outdated_queries.values()) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 5389d16a..2c70f973 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -5,6 +5,7 @@ import redis from rq import get_current_job from rq.job import JobStatus from rq.timeouts import JobTimeoutException +from rq.exceptions import NoSuchJobError from redash import models, redis_connection, settings from redash.query_runner import InterruptException @@ -43,16 +44,22 @@ def enqueue_query( job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) if job_id: logger.info("[%s] Found existing job: %s", query_hash, job_id) + job_complete = None - job = Job.fetch(job_id) + try: + job = Job.fetch(job_id) + job_exists = True + status = job.get_status() + job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED] - status = job.get_status() - if status in [JobStatus.FINISHED, JobStatus.FAILED]: - logger.info( - "[%s] job found is ready (%s), removing lock", - query_hash, - status, - ) + if job_complete: + message = "job found is complete (%s)" % status + except NoSuchJobError: + message = "job found has expired" + job_exists = False + + if job_complete or not job_exists: + logger.info("[%s] %s, removing lock", query_hash, message) redis_connection.delete(_job_lock_id(query_hash, data_source.id)) job = None diff --git a/redash/tasks/queries/maintenance.py b/redash/tasks/queries/maintenance.py index f7956f21..e71d42c0 100644 --- a/redash/tasks/queries/maintenance.py +++ b/redash/tasks/queries/maintenance.py @@ -8,7 +8,7 @@ from redash.models.parameterized_query import ( QueryDetachedFromDataSourceError, ) from redash.tasks.failure_report import track_failure -from redash.utils import json_dumps +from redash.utils import json_dumps, sentry from redash.worker import job, get_job_logger from .execution import enqueue_query @@ -27,85 +27,79 @@ def empty_schedules(): logger.info("Deleted %d schedules.", len(queries)) +def _should_refresh_query(query): + if settings.FEATURE_DISABLE_REFRESH_QUERIES: + logger.info("Disabled refresh queries.") + return False + elif query.org.is_disabled: + logger.debug("Skipping refresh of %s because org is disabled.", query.id) + return False + elif query.data_source is None: + logger.debug("Skipping refresh of %s because the datasource is none.", query.id) + return False + elif query.data_source.paused: + logger.debug( + "Skipping refresh of %s because datasource - %s is paused (%s).", + query.id, + query.data_source.name, + query.data_source.pause_reason, + ) + return False + else: + return True + + +def _apply_default_parameters(query): + parameters = {p["name"]: p.get("value") for p in query.parameters} + if any(parameters): + try: + return query.parameterized.apply(parameters).query + except InvalidParameterError as e: + error = u"Skipping refresh of {} because of invalid parameters: {}".format( + query.id, str(e) + ) + track_failure(query, error) + raise + except QueryDetachedFromDataSourceError as e: + error = ( + "Skipping refresh of {} because a related dropdown " + "query ({}) is unattached to any datasource." + ).format(query.id, e.query_id) + track_failure(query, error) + raise + else: + return query.query_text + + def refresh_queries(): logger.info("Refreshing queries...") + enqueued = [] + for query in models.Query.outdated_queries(): + if not _should_refresh_query(query): + continue - outdated_queries_count = 0 - query_ids = [] + try: + enqueue_query( + _apply_default_parameters(query), + query.data_source, + query.user_id, + scheduled_query=query, + metadata={"Query ID": query.id, "Username": "Scheduled"}, + ) + enqueued.append(query) + except Exception as e: + message = "Could not enqueue query %d due to %s" % (query.id, repr(e)) + logging.info(message) + sentry.capture_message(message) - with statsd_client.timer("manager.outdated_queries_lookup"): - for query in models.Query.outdated_queries(): - if settings.FEATURE_DISABLE_REFRESH_QUERIES: - logger.info("Disabled refresh queries.") - elif query.org.is_disabled: - logger.debug( - "Skipping refresh of %s because org is disabled.", query.id - ) - elif query.data_source is None: - logger.debug( - "Skipping refresh of %s because the datasource is none.", query.id - ) - elif query.data_source.paused: - logger.debug( - "Skipping refresh of %s because datasource - %s is paused (%s).", - query.id, - query.data_source.name, - query.data_source.pause_reason, - ) - else: - query_text = query.query_text + status = { + "outdated_queries_count": len(enqueued), + "last_refresh_at": time.time(), + "query_ids": json_dumps([q.id for q in enqueued]), + } - parameters = {p["name"]: p.get("value") for p in query.parameters} - if any(parameters): - try: - query_text = query.parameterized.apply(parameters).query - except InvalidParameterError as e: - error = u"Skipping refresh of {} because of invalid parameters: {}".format( - query.id, str(e) - ) - track_failure(query, error) - continue - except QueryDetachedFromDataSourceError as e: - error = ( - "Skipping refresh of {} because a related dropdown " - "query ({}) is unattached to any datasource." - ).format(query.id, e.query_id) - track_failure(query, error) - continue - - enqueue_query( - query_text, - query.data_source, - query.user_id, - scheduled_query=query, - metadata={"Query ID": query.id, "Username": "Scheduled"}, - ) - - query_ids.append(query.id) - outdated_queries_count += 1 - - statsd_client.gauge("manager.outdated_queries", outdated_queries_count) - - logger.info( - "Done refreshing queries. Found %d outdated queries: %s" - % (outdated_queries_count, query_ids) - ) - - status = redis_connection.hgetall("redash:status") - now = time.time() - - redis_connection.hmset( - "redash:status", - { - "outdated_queries_count": outdated_queries_count, - "last_refresh_at": now, - "query_ids": json_dumps(query_ids), - }, - ) - - statsd_client.gauge( - "manager.seconds_since_refresh", now - float(status.get("last_refresh_at", now)) - ) + redis_connection.hmset("redash:status", status) + logger.info("Done refreshing queries: %s" % status) def cleanup_query_results(): diff --git a/redash/utils/sentry.py b/redash/utils/sentry.py index bcef3237..e560b7f9 100644 --- a/redash/utils/sentry.py +++ b/redash/utils/sentry.py @@ -1,4 +1,5 @@ import sentry_sdk +from funcy import iffy from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.integrations.redis import RedisIntegration @@ -33,3 +34,6 @@ def init(): RqIntegration(), ], ) + + +capture_message = iffy(lambda _: settings.SENTRY_DSN, sentry_sdk.capture_message) \ No newline at end of file diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 24153bb9..063e7c27 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -4,6 +4,7 @@ import uuid from mock import patch, Mock from rq import Connection +from rq.exceptions import NoSuchJobError from tests import BaseTestCase from redash import redis_connection, rq_redis_connection, models @@ -67,6 +68,33 @@ class TestEnqueueTask(BaseTestCase): self.assertEqual(1, enqueue.call_count) + def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job): + query = self.factory.create_query() + + with Connection(rq_redis_connection): + enqueue_query( + query.query_text, + query.data_source, + query.user_id, + False, + query, + {"Username": "Arik", "Query ID": query.id}, + ) + + # "expire" the previous job + fetch_job.side_effect = NoSuchJobError + + enqueue_query( + query.query_text, + query.data_source, + query.user_id, + False, + query, + {"Username": "Arik", "Query ID": query.id}, + ) + + self.assertEqual(2, enqueue.call_count) + @patch("redash.settings.dynamic_settings.query_time_limit", return_value=60) def test_limits_query_time(self, _, enqueue, __): query = self.factory.create_query() diff --git a/tests/test_models.py b/tests/test_models.py index 8eae2e5c..600ff8fe 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -159,16 +159,26 @@ class ShouldScheduleNextTest(TestCase): class QueryOutdatedQueriesTest(BaseTestCase): + def schedule(self, **kwargs): + schedule = {"interval": None, "time": None, "until": None, "day_of_week": None} + schedule.update(**kwargs) + return schedule + + def create_scheduled_query(self, **kwargs): + return self.factory.create_query(schedule=self.schedule(**kwargs)) + + def fake_previous_execution(self, query, **kwargs): + retrieved_at = utcnow() - datetime.timedelta(**kwargs) + query_result = self.factory.create_query_result( + retrieved_at=retrieved_at, + query_text=query.query_text, + query_hash=query.query_hash, + ) + query.latest_query_data = query_result + # TODO: this test can be refactored to use mock version of should_schedule_next to simplify it. def test_outdated_queries_skips_unscheduled_queries(self): - query = self.factory.create_query( - schedule={ - "interval": None, - "time": None, - "until": None, - "day_of_week": None, - } - ) + query = self.create_scheduled_query() query_with_none = self.factory.create_query(schedule=None) queries = models.Query.outdated_queries() @@ -177,71 +187,33 @@ class QueryOutdatedQueriesTest(BaseTestCase): self.assertNotIn(query_with_none, queries) def test_outdated_queries_works_with_ttl_based_schedule(self): - two_hours_ago = utcnow() - datetime.timedelta(hours=2) - query = self.factory.create_query( - schedule={ - "interval": "3600", - "time": None, - "until": None, - "day_of_week": None, - } - ) - query_result = self.factory.create_query_result( - query=query.query_text, retrieved_at=two_hours_ago - ) - query.latest_query_data = query_result + query = self.create_scheduled_query(interval="3600") + self.fake_previous_execution(query, hours=2) queries = models.Query.outdated_queries() + self.assertIn(query, queries) def test_outdated_queries_works_scheduled_queries_tracker(self): - two_hours_ago = utcnow() - datetime.timedelta(hours=2) - query = self.factory.create_query( - schedule={ - "interval": "3600", - "time": None, - "until": None, - "day_of_week": None, - } - ) - query_result = self.factory.create_query_result( - query=query, retrieved_at=two_hours_ago - ) - query.latest_query_data = query_result - + query = self.create_scheduled_query(interval="3600") + self.fake_previous_execution(query, hours=2) models.scheduled_queries_executions.update(query.id) queries = models.Query.outdated_queries() + self.assertNotIn(query, queries) def test_skips_fresh_queries(self): - half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) - query = self.factory.create_query( - schedule={ - "interval": "3600", - "time": None, - "until": None, - "day_of_week": None, - } - ) - query_result = self.factory.create_query_result( - query=query.query_text, retrieved_at=half_an_hour_ago - ) - query.latest_query_data = query_result + query = self.create_scheduled_query(interval="3600") + self.fake_previous_execution(query, minutes=30) queries = models.Query.outdated_queries() + self.assertNotIn(query, queries) def test_outdated_queries_works_with_specific_time_schedule(self): half_an_hour_ago = utcnow() - datetime.timedelta(minutes=30) - query = self.factory.create_query( - schedule={ - "interval": "86400", - "time": half_an_hour_ago.strftime("%H:%M"), - "until": None, - "day_of_week": None, - } - ) + query = self.create_scheduled_query(interval="86400", time=half_an_hour_ago.strftime("%H:%M")) query_result = self.factory.create_query_result( query=query.query_text, retrieved_at=half_an_hour_ago - datetime.timedelta(days=1), @@ -256,32 +228,14 @@ class QueryOutdatedQueriesTest(BaseTestCase): Only one query per data source with the same text will be reported by Query.outdated_queries(). """ - query = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - } - ) + query = self.create_scheduled_query(interval="60") query2 = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - }, + schedule=self.schedule(interval="60"), query_text=query.query_text, query_hash=query.query_hash, ) - retrieved_at = utcnow() - datetime.timedelta(minutes=10) - query_result = self.factory.create_query_result( - retrieved_at=retrieved_at, - query_text=query.query_text, - query_hash=query.query_hash, - ) - query.latest_query_data = query_result - query2.latest_query_data = query_result + self.fake_previous_execution(query, minutes=10) + self.fake_previous_execution(query2, minutes=10) self.assertEqual(list(models.Query.outdated_queries()), [query2]) @@ -291,32 +245,16 @@ class QueryOutdatedQueriesTest(BaseTestCase): Query.outdated_queries() even if they have the same query text. """ query = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - }, + schedule=self.schedule(interval="60"), data_source=self.factory.create_data_source(), ) query2 = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - }, + schedule=self.schedule(interval="60"), query_text=query.query_text, query_hash=query.query_hash, ) - retrieved_at = utcnow() - datetime.timedelta(minutes=10) - query_result = self.factory.create_query_result( - retrieved_at=retrieved_at, - query_text=query.query_text, - query_hash=query.query_hash, - ) - query.latest_query_data = query_result - query2.latest_query_data = query_result + self.fake_previous_execution(query, minutes=10) + self.fake_previous_execution(query2, minutes=10) outdated_queries = models.Query.outdated_queries() self.assertEqual(len(outdated_queries), 2) @@ -328,32 +266,14 @@ class QueryOutdatedQueriesTest(BaseTestCase): If multiple queries with the same text exist, only ones that are scheduled to be refreshed are reported by Query.outdated_queries(). """ - query = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - } - ) + query = self.create_scheduled_query(interval="60") query2 = self.factory.create_query( - schedule={ - "interval": "3600", - "until": None, - "time": None, - "day_of_week": None, - }, + schedule=self.schedule(interval="3600"), query_text=query.query_text, query_hash=query.query_hash, ) - retrieved_at = utcnow() - datetime.timedelta(minutes=10) - query_result = self.factory.create_query_result( - retrieved_at=retrieved_at, - query_text=query.query_text, - query_hash=query.query_hash, - ) - query.latest_query_data = query_result - query2.latest_query_data = query_result + self.fake_previous_execution(query, minutes=10) + self.fake_previous_execution(query2, minutes=10) self.assertEqual(list(models.Query.outdated_queries()), [query]) @@ -363,25 +283,14 @@ class QueryOutdatedQueriesTest(BaseTestCase): for scheduling future execution. """ query = self.factory.create_query( - schedule={ - "interval": "60", - "until": None, - "time": None, - "day_of_week": None, - }, + schedule=self.schedule(interval="60"), schedule_failures=4, ) - retrieved_at = utcnow() - datetime.timedelta(minutes=16) - query_result = self.factory.create_query_result( - retrieved_at=retrieved_at, - query_text=query.query_text, - query_hash=query.query_hash, - ) - query.latest_query_data = query_result + self.fake_previous_execution(query, minutes=16) self.assertEqual(list(models.Query.outdated_queries()), []) - query_result.retrieved_at = utcnow() - datetime.timedelta(minutes=17) + self.fake_previous_execution(query, minutes=17) self.assertEqual(list(models.Query.outdated_queries()), [query]) def test_schedule_until_after(self): @@ -390,21 +299,11 @@ class QueryOutdatedQueriesTest(BaseTestCase): Query.outdated_queries() after the given time is past. """ one_day_ago = (utcnow() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") - two_hours_ago = utcnow() - datetime.timedelta(hours=2) - query = self.factory.create_query( - schedule={ - "interval": "3600", - "until": one_day_ago, - "time": None, - "day_of_week": None, - } - ) - query_result = self.factory.create_query_result( - query=query.query_text, retrieved_at=two_hours_ago - ) - query.latest_query_data = query_result + query = self.create_scheduled_query(interval="3600", until=one_day_ago) + self.fake_previous_execution(query, hours=2) queries = models.Query.outdated_queries() + self.assertNotIn(query, queries) def test_schedule_until_before(self): @@ -413,23 +312,28 @@ class QueryOutdatedQueriesTest(BaseTestCase): Query.outdated_queries() before the given time is past. """ one_day_from_now = (utcnow() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") - two_hours_ago = utcnow() - datetime.timedelta(hours=2) - query = self.factory.create_query( - schedule={ - "interval": "3600", - "until": one_day_from_now, - "time": None, - "day_of_week": None, - } - ) - query_result = self.factory.create_query_result( - query=query.query_text, retrieved_at=two_hours_ago - ) - query.latest_query_data = query_result + query = self.create_scheduled_query(interval="3600", until=one_day_from_now) + self.fake_previous_execution(query, hours=2) queries = models.Query.outdated_queries() + self.assertIn(query, queries) + def test_skips_and_disables_faulty_queries(self): + faulty_query = self.create_scheduled_query(until="pigs fly") + valid_query = self.create_scheduled_query(interval="60") + self.fake_previous_execution(valid_query, minutes=10) + + queries = models.Query.outdated_queries() + + self.assertEqual(list(models.Query.outdated_queries()), [valid_query]) + self.assertTrue(faulty_query.schedule.get("disabled")) + + def test_skips_disabled_schedules(self): + query = self.create_scheduled_query(disabled=True) + queries = models.Query.outdated_queries() + self.assertNotIn(query, queries) + class QueryArchiveTest(BaseTestCase): def test_archive_query_sets_flag(self):