mirror of
https://github.com/valitydev/redash.git
synced 2024-11-07 17:38:54 +00:00
Remove cleanup_tasks, as it's not stable
This commit is contained in:
parent
065d2bc2c6
commit
f6bd562dd2
@ -111,7 +111,6 @@ def status_api():
|
||||
|
||||
manager_status = redis_connection.hgetall('redash:status')
|
||||
status['manager'] = manager_status
|
||||
status['manager']['queue_size'] = redis_connection.llen('queries') + redis_connection.llen('scheduled_queries')
|
||||
status['manager']['outdated_queries_count'] = models.Query.outdated_queries().count()
|
||||
|
||||
queues = {}
|
||||
|
@ -123,6 +123,10 @@ class QueryTask(object):
|
||||
def is_cancelled(self):
|
||||
return self._async_result.status == 'REVOKED'
|
||||
|
||||
@property
|
||||
def celery_status(self):
|
||||
return self._async_result.status
|
||||
|
||||
def cancel(self):
|
||||
return self._async_result.revoke(terminate=True)
|
||||
|
||||
@ -131,32 +135,6 @@ class QueryTask(object):
|
||||
return "query_hash_job:%s:%s" % (data_source_id, query_hash)
|
||||
|
||||
|
||||
@celery.task(base=BaseTask)
|
||||
def cleanup_tasks():
|
||||
# in case of cold restart of the workers, there might be jobs that still have their "lock" object, but aren't really
|
||||
# going to run. this jobs removes them
|
||||
|
||||
inspect = celery.control.inspect()
|
||||
active_tasks = inspect.active()
|
||||
reserved_tasks = inspect.reserved()
|
||||
|
||||
all_tasks = set()
|
||||
for task_list in itertools.chain(active_tasks.values(), reserved_tasks.values()):
|
||||
for task in task_list:
|
||||
all_tasks.add(task['id'])
|
||||
|
||||
logger.info("Active jobs: %d", len(all_tasks))
|
||||
|
||||
# TODO: use set instead of keys command
|
||||
lock_keys = redis_connection.keys("query_hash_job:*")
|
||||
locks = redis_connection.mget(lock_keys)
|
||||
|
||||
for i, lock in enumerate(locks):
|
||||
if lock not in all_tasks:
|
||||
logger.warning("Couldn't find active job for: %s", lock_keys[i])
|
||||
redis_connection.delete(lock_keys[i])
|
||||
|
||||
|
||||
@celery.task(base=BaseTask)
|
||||
def refresh_queries():
|
||||
# self.status['last_refresh_at'] = time.time()
|
||||
|
@ -13,10 +13,6 @@ celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND,
|
||||
'task': 'redash.tasks.refresh_queries',
|
||||
'schedule': timedelta(seconds=30)
|
||||
},
|
||||
'cleanup_tasks': {
|
||||
'task': 'redahs.tasks.cleanup_tasks',
|
||||
'schedule': timedelta(minutes=5)
|
||||
}
|
||||
},
|
||||
CELERY_TIMEZONE='UTC')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user