diff --git a/manage.py b/manage.py index ee9ea923..0d2a6fb0 100755 --- a/manage.py +++ b/manage.py @@ -3,6 +3,8 @@ CLI to manage redash. """ import atfork +import signal + atfork.monkeypatch_os_fork_functions() import atfork.stdlib_fixer atfork.stdlib_fixer.fix_logging_module() @@ -28,26 +30,31 @@ def version(): def runworkers(): """Starts the re:dash query executors/workers.""" - try: - old_workers = data_manager.redis_connection.smembers('workers') - data_manager.redis_connection.delete('workers') - - logging.info("Cleaning old workers: %s", old_workers) - - data_manager.start_workers(settings.WORKERS_COUNT) - logging.info("Workers started.") - - while True: - try: - data_manager.refresh_queries() - data_manager.report_status() - except Exception as e: - logging.error("Something went wrong with refreshing queries...") - logging.exception(e) - time.sleep(60) - except KeyboardInterrupt: - logging.warning("Exiting; waiting for threads") + def stop_handler(signum, frame): + logging.warning("Exiting; waiting for workers") data_manager.stop_workers() + exit() + + signal.signal(signal.SIGTERM, stop_handler) + signal.signal(signal.SIGINT, stop_handler) + + old_workers = data_manager.redis_connection.smembers('workers') + data_manager.redis_connection.delete('workers') + + logging.info("Cleaning old workers: %s", old_workers) + + data_manager.start_workers(settings.WORKERS_COUNT) + logging.info("Workers started.") + + while True: + try: + data_manager.refresh_queries() + data_manager.report_status() + except Exception as e: + logging.error("Something went wrong with refreshing queries...") + logging.exception(e) + time.sleep(60) + @manager.shell def make_shell_context(): diff --git a/redash/data/manager.py b/redash/data/manager.py index 3b63a9cf..225ed199 100755 --- a/redash/data/manager.py +++ b/redash/data/manager.py @@ -142,6 +142,7 @@ class Manager(object): redis_connection_params = self.redis_connection.connection_pool.connection_kwargs self.workers = [worker.Worker(worker_id, self, redis_connection_params) for worker_id in xrange(workers_count)] + for w in self.workers: w.start() @@ -149,7 +150,9 @@ class Manager(object): def stop_workers(self): for w in self.workers: - w.continue_working = False + w.terminate() + + for w in self.workers: w.join() def _save_status(self): diff --git a/redash/data/worker.py b/redash/data/worker.py index 583b6bf0..430c635b 100644 --- a/redash/data/worker.py +++ b/redash/data/worker.py @@ -3,8 +3,8 @@ Worker implementation to execute incoming queries. """ import json import logging +import multiprocessing import os -import threading import uuid import datetime import time @@ -209,31 +209,30 @@ class Job(RedisObject): return "" % (self.id, self.priority, self.status) -class Worker(threading.Thread): +class Worker(multiprocessing.Process): def __init__(self, worker_id, manager, redis_connection_params, sleep_time=0.1): self.manager = manager self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) + self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems() if k in ('host', 'db', 'password', 'port')} + self.worker_id = None self.continue_working = True self.sleep_time = sleep_time self.child_pid = None - self.worker_id = worker_id + self.current_job_id = None self.status = { - 'id': self.worker_id, 'jobs_count': 0, 'cancelled_jobs_count': 0, 'done_jobs_count': 0, 'updated_at': time.time(), 'started_at': time.time() } - self._save_status() - self.manager.redis_connection.sadd('workers', self._key) - super(Worker, self).__init__(name="Worker-%s" % self.worker_id) + super(Worker, self).__init__(name="Worker") def set_title(self, title=None): base_title = "redash worker:%s" % self.worker_id @@ -245,7 +244,28 @@ class Worker(threading.Thread): setproctitle.setproctitle(full_title) def run(self): + self.worker_id = os.getpid() + self.status['id'] = self.worker_id + self.name = "Worker:%d" % self.worker_id + self.manager.redis_connection.sadd('workers', self._key) + self._save_status() + self.set_title() + logging.info("[%s] started.", self.name) + + signal.signal(signal.SIGINT, self._stop) + signal.signal(signal.SIGTERM, self._stop) + + self._wait_for_jobs() + + def _stop(self, signum, frame): + self.continue_working = False + if self.current_job_id: + job = Job.load(self.manager.redis_connection, self.current_job_id) + if job: + job.cancel() + + def _wait_for_jobs(self): while self.continue_working: job_id = self.manager.queue.pop() if job_id: @@ -270,6 +290,7 @@ class Worker(threading.Thread): self.manager.redis_connection.hmset(self._key, self.status) def _fork_and_process(self, job_id): + self.current_job_id = job_id self.child_pid = os.fork() if self.child_pid == 0: self.set_title("processing %s" % job_id) @@ -291,6 +312,9 @@ class Worker(threading.Thread): logging.info("[%s] Finished Processing %s (pid: %d status: %d)", self.name, job_id, self.child_pid, status) + self.child_pid = None + self.current_job_id = None + def _process(self, job_id): redis_connection = redis.StrictRedis(**self.redis_connection_params) job = Job.load(redis_connection, job_id)