Reset failure counter on adhoc success (#5394)

* reset failure counter when query completes successfully via adhoc

* Use "query_id" in metadata, but still allow "Query ID" for transition/legacy support
This commit is contained in:
Omer Lachish 2021-03-12 22:02:29 +02:00 committed by GitHub
parent 10bce2d1ac
commit 9fdf1f341d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 31 deletions

View File

@ -95,7 +95,7 @@ def enqueue_query(
"data_source_id": data_source.id,
"org_id": data_source.org_id,
"scheduled": scheduled_query_id is not None,
"query_id": metadata.get("Query ID"),
"query_id": metadata.get("query_id", metadata.get("Query ID")),
"user_id": user_id,
},
}
@ -151,22 +151,28 @@ def _resolve_user(user_id, is_api_key, query_id):
class QueryExecutor(object):
def __init__(
self, query, data_source_id, user_id, is_api_key, metadata, scheduled_query
self, query, data_source_id, user_id, is_api_key, metadata, is_scheduled_query
):
self.job = get_current_job()
self.query = query
self.data_source_id = data_source_id
self.metadata = metadata
self.data_source = self._load_data_source()
self.query_id = metadata.get("query_id")
self.user = _resolve_user(user_id, is_api_key, metadata.get("Query ID"))
self.query_model = (
models.Query.query.get(self.query_id)
if self.query_id and self.query_id != "adhoc"
else None
)
# Close DB connection to prevent holding a connection for a long time while the query is executing.
models.db.session.close()
self.query_hash = gen_query_hash(self.query)
self.scheduled_query = scheduled_query
# Load existing tracker or create a new one if the job was created before code update:
if scheduled_query:
models.scheduled_queries_executions.update(scheduled_query.id)
self.is_scheduled_query = is_scheduled_query
if self.is_scheduled_query:
# Load existing tracker or create a new one if the job was created before code update:
models.scheduled_queries_executions.update(self.query_model.id)
def run(self):
signal.signal(signal.SIGINT, signal_handler)
@ -203,20 +209,16 @@ class QueryExecutor(object):
if error is not None and data is None:
result = QueryExecutionError(error)
if self.scheduled_query is not None:
self.scheduled_query = models.db.session.merge(
self.scheduled_query, load=False
)
track_failure(self.scheduled_query, error)
if self.is_scheduled_query:
self.query_model = models.db.session.merge(self.query_model, load=False)
track_failure(self.query_model, error)
raise result
else:
if self.scheduled_query and self.scheduled_query.schedule_failures > 0:
self.scheduled_query = models.db.session.merge(
self.scheduled_query, load=False
)
self.scheduled_query.schedule_failures = 0
self.scheduled_query.skip_updated_at = True
models.db.session.add(self.scheduled_query)
if self.query_model and self.query_model.schedule_failures > 0:
self.query_model = models.db.session.merge(self.query_model, load=False)
self.query_model.schedule_failures = 0
self.query_model.skip_updated_at = True
models.db.session.add(self.query_model)
query_result = models.QueryResult.store_result(
self.data_source.org_id,
@ -243,7 +245,7 @@ class QueryExecutor(object):
def _annotate_query(self, query_runner):
self.metadata["Job ID"] = self.job.id
self.metadata["Query Hash"] = self.query_hash
self.metadata["Scheduled"] = self.scheduled_query is not None
self.metadata["Scheduled"] = self.is_scheduled_query
return query_runner.annotate_query(self.query, self.metadata)
@ -276,14 +278,14 @@ def execute_query(
scheduled_query_id=None,
is_api_key=False,
):
if scheduled_query_id is not None:
scheduled_query = models.Query.query.get(scheduled_query_id)
else:
scheduled_query = None
try:
return QueryExecutor(
query, data_source_id, user_id, is_api_key, metadata, scheduled_query
query,
data_source_id,
user_id,
is_api_key,
metadata,
scheduled_query_id is not None,
).run()
except QueryExecutionError as e:
models.db.session.rollback()

View File

@ -75,7 +75,7 @@ class HardLimitingWorker(HerokuWorker):
"""
grace_period = 15
queue_class = CancellableQueue
queue_class = RedashQueue
job_class = CancellableJob
def stop_executing_job(self, job):

View File

@ -201,7 +201,10 @@ class QueryExecutorTests(BaseTestCase):
with patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query(
"SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)
@ -219,14 +222,20 @@ class QueryExecutorTests(BaseTestCase):
qr.side_effect = ValueError("broken")
result = execute_query(
"SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
)
self.assertTrue(isinstance(result, QueryExecutionError))
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 1)
result = execute_query(
"SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
)
self.assertTrue(isinstance(result, QueryExecutionError))
q = models.Query.get_by_id(q.id)
@ -242,7 +251,10 @@ class QueryExecutorTests(BaseTestCase):
with patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
result = execute_query(
"SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
)
self.assertTrue(isinstance(result, QueryExecutionError))
q = models.Query.get_by_id(q.id)
@ -251,7 +263,41 @@ class QueryExecutorTests(BaseTestCase):
with patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
execute_query(
"SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)
def test_adhoc_success_after_scheduled_failure(self, _):
"""
Query execution success resets the failure counter, even if it runs as an adhoc query.
"""
q = self.factory.create_query(
query_text="SELECT 1, 2", schedule={"interval": 300}
)
with patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
result = execute_query(
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
scheduled_query_id=q.id,
user_id=self.factory.user.id,
)
self.assertTrue(isinstance(result, QueryExecutionError))
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 1)
with patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
execute_query(
"SELECT 1, 2",
self.factory.data_source.id,
{"query_id": q.id},
user_id=self.factory.user.id,
)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)