diff --git a/redash/tasks.py b/redash/tasks.py index d35533b8..5656a0db 100644 --- a/redash/tasks.py +++ b/redash/tasks.py @@ -1,6 +1,7 @@ import time import datetime import logging +import itertools import redis from celery import Task from celery.result import AsyncResult @@ -130,6 +131,32 @@ class QueryTask(object): return "query_hash_job:%s:%s" % (data_source_id, query_hash) +@celery.task(base=BaseTask) +def cleanup_tasks(): + # in case of cold restart of the workers, there might be jobs that still have their "lock" object, but aren't really + # going to run. this jobs removes them + + inspect = celery.control.inspect() + active_tasks = inspect.active() + reserved_tasks = inspect.reserved() + + all_tasks = set() + for task_list in itertools.chain(active_tasks.values(), reserved_tasks.values()): + for task in task_list: + all_tasks.add(task['id']) + + logger.info("Active jobs: %d", len(all_tasks)) + + # TODO: use set instead of keys command + lock_keys = redis_connection.keys("query_hash_job:*") + locks = redis_connection.mget(lock_keys) + + for i, lock in enumerate(locks): + if lock not in all_tasks: + logger.warning("Couldn't find active job for: %s", lock_keys[i]) + redis_connection.delete(lock_keys[i]) + + @celery.task(base=BaseTask) def refresh_queries(): # self.status['last_refresh_at'] = time.time()