diff --git a/redash/controllers.py b/redash/controllers.py index c67301c3..4af25c11 100644 --- a/redash/controllers.py +++ b/redash/controllers.py @@ -111,7 +111,6 @@ def status_api(): manager_status = redis_connection.hgetall('redash:status') status['manager'] = manager_status - status['manager']['queue_size'] = redis_connection.llen('queries') + redis_connection.llen('scheduled_queries') status['manager']['outdated_queries_count'] = models.Query.outdated_queries().count() queues = {} diff --git a/redash/tasks.py b/redash/tasks.py index 5656a0db..5f5699c4 100644 --- a/redash/tasks.py +++ b/redash/tasks.py @@ -123,6 +123,10 @@ class QueryTask(object): def is_cancelled(self): return self._async_result.status == 'REVOKED' + @property + def celery_status(self): + return self._async_result.status + def cancel(self): return self._async_result.revoke(terminate=True) @@ -131,32 +135,6 @@ class QueryTask(object): return "query_hash_job:%s:%s" % (data_source_id, query_hash) -@celery.task(base=BaseTask) -def cleanup_tasks(): - # in case of cold restart of the workers, there might be jobs that still have their "lock" object, but aren't really - # going to run. this jobs removes them - - inspect = celery.control.inspect() - active_tasks = inspect.active() - reserved_tasks = inspect.reserved() - - all_tasks = set() - for task_list in itertools.chain(active_tasks.values(), reserved_tasks.values()): - for task in task_list: - all_tasks.add(task['id']) - - logger.info("Active jobs: %d", len(all_tasks)) - - # TODO: use set instead of keys command - lock_keys = redis_connection.keys("query_hash_job:*") - locks = redis_connection.mget(lock_keys) - - for i, lock in enumerate(locks): - if lock not in all_tasks: - logger.warning("Couldn't find active job for: %s", lock_keys[i]) - redis_connection.delete(lock_keys[i]) - - @celery.task(base=BaseTask) def refresh_queries(): # self.status['last_refresh_at'] = time.time() diff --git a/redash/worker.py b/redash/worker.py index c27a5fd0..89c53f92 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -13,10 +13,6 @@ celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND, 'task': 'redash.tasks.refresh_queries', 'schedule': timedelta(seconds=30) }, - 'cleanup_tasks': { - 'task': 'redahs.tasks.cleanup_tasks', - 'schedule': timedelta(minutes=5) - } }, CELERY_TIMEZONE='UTC')