Add cleanup task to remove locks of dead jobs

This commit is contained in:
Arik Fraimovich 2014-09-06 14:18:15 +03:00
parent 7dc1176628
commit 653ed1c57a

View File

@ -1,6 +1,7 @@
import time
import datetime
import logging
import itertools
import redis
from celery import Task
from celery.result import AsyncResult
@ -130,6 +131,32 @@ class QueryTask(object):
return "query_hash_job:%s:%s" % (data_source_id, query_hash)
@celery.task(base=BaseTask)
def cleanup_tasks():
# in case of cold restart of the workers, there might be jobs that still have their "lock" object, but aren't really
# going to run. this jobs removes them
inspect = celery.control.inspect()
active_tasks = inspect.active()
reserved_tasks = inspect.reserved()
all_tasks = set()
for task_list in itertools.chain(active_tasks.values(), reserved_tasks.values()):
for task in task_list:
all_tasks.add(task['id'])
logger.info("Active jobs: %d", len(all_tasks))
# TODO: use set instead of keys command
lock_keys = redis_connection.keys("query_hash_job:*")
locks = redis_connection.mget(lock_keys)
for i, lock in enumerate(locks):
if lock not in all_tasks:
logger.warning("Couldn't find active job for: %s", lock_keys[i])
redis_connection.delete(lock_keys[i])
@celery.task(base=BaseTask)
def refresh_queries():
# self.status['last_refresh_at'] = time.time()