Merge pull request #280 from EverythingMe/fix_stuck_jobs

Fix #261: cancelling jobs sends them to limbo
This commit is contained in:
Arik Fraimovich 2014-09-06 18:12:03 +03:00
commit 1274d36abc
2 changed files with 15 additions and 2 deletions

View File

@ -111,7 +111,6 @@ def status_api():
manager_status = redis_connection.hgetall('redash:status')
status['manager'] = manager_status
status['manager']['queue_size'] = redis_connection.llen('queries') + redis_connection.llen('scheduled_queries')
status['manager']['outdated_queries_count'] = models.Query.outdated_queries().count()
queues = {}

View File

@ -1,6 +1,7 @@
import time
import datetime
import logging
import itertools
import redis
from celery import Task
from celery.result import AsyncResult
@ -64,7 +65,12 @@ class QueryTask(object):
logging.info("[Manager][%s] Found existing job: %s", query_hash, job_id)
job = cls(job_id=job_id)
else:
if job.is_cancelled:
logging.info("[%s] job found cancelled already, removing lock", query_hash)
redis_connection.delete(QueryTask._job_lock_id(query_hash, data_source.id))
job = None
if not job:
pipe.multi()
if scheduled:
@ -113,6 +119,14 @@ class QueryTask(object):
'query_result_id': query_result_id,
}
@property
def is_cancelled(self):
return self._async_result.status == 'REVOKED'
@property
def celery_status(self):
return self._async_result.status
def cancel(self):
return self._async_result.revoke(terminate=True)