Improved system status.

This commit is contained in:
Arik Fraimovich 2013-12-06 15:14:39 +02:00
parent cd03948164
commit 25c6bc2252
7 changed files with 89 additions and 5 deletions

View File

@ -14,6 +14,11 @@ import data
def start_workers(data_manager):
try:
old_workers = data_manager.redis_connection.smembers('workers')
data_manager.redis_connection.delete('workers')
logging.info("Cleaning old workers: %s", old_workers)
data_manager.start_workers(settings.WORKERS_COUNT, settings.CONNECTION_STRING)
logging.info("Workers started.")
@ -32,8 +37,7 @@ def start_workers(data_manager):
if __name__ == '__main__':
channel = logging.StreamHandler()
logging.getLogger().addHandler(channel)
# TODO: take logging level from configuration
logging.getLogger().setLevel("DEBUG")
logging.getLogger().setLevel(settings.LOG_LEVEL)
parser = argparse.ArgumentParser()
parser.add_argument("command")

View File

@ -9,6 +9,7 @@ import psycopg2
import psycopg2.pool
import qr
import redis
import time
import query_runner
import worker
from utils import gen_query_hash
@ -32,6 +33,12 @@ class Manager(object):
db_connection_string)
self.queue = qr.PriorityQueue("jobs", **self.redis_connection.connection_pool.connection_kwargs)
self.max_retries = 5
self.status = {
'last_refresh_at': 0,
'started_at': time.time()
}
self._save_status()
# TODO: Use our Django Models
def get_query_result_by_id(self, query_result_id):
@ -95,6 +102,8 @@ class Manager(object):
return job
def refresh_queries(self):
logging.info("Refreshing queries...")
sql = """SELECT queries.query, queries.ttl, retrieved_at
FROM (SELECT query, min(ttl) as ttl FROM queries WHERE ttl > 0 GROUP by query) queries
JOIN (SELECT query, max(retrieved_at) as retrieved_at
@ -103,10 +112,15 @@ class Manager(object):
WHERE queries.ttl > 0
AND query_results.retrieved_at + ttl * interval '1 second' < now() at time zone 'utc';"""
self.status['last_refresh_at'] = time.time()
self._save_status()
queries = self.run_query(sql)
for query, ttl, retrieved_at in queries:
self.add_job(query, worker.Job.LOW_PRIORITY)
logging.info("Done refreshing queries... %d" % len(queries))
def store_query_result(self, query, data, run_time, retrieved_at):
query_result_id = None
query_hash = gen_query_hash(query)
@ -169,3 +183,6 @@ class Manager(object):
connection.commit()
finally:
self.db_connection_pool.putconn(connection)
def _save_status(self):
self.redis_connection.hmset('manager:status', self.status)

View File

@ -9,7 +9,6 @@ query language (for example: HiveQL).
import json
import psycopg2
import sys
import signal
import select
from .utils import JSONEncoder

View File

@ -148,14 +148,25 @@ class Worker(threading.Thread):
self.query_runner = query_runner
self.sleep_time = sleep_time
self.child_pid = None
self.worker_id = uuid.uuid1()
self.status = {
'jobs_count': 0,
'cancelled_jobs_count': 0,
'done_jobs_count': 0,
'updated_at': time.time(),
'started_at': time.time()
}
self._save_status()
self.manager.redis_connection.sadd('workers', self._key)
super(Worker, self).__init__(name="Worker-%s" % uuid.uuid1())
super(Worker, self).__init__(name="Worker-%s" % self.worker_id)
def run(self):
logging.info("[%s] started.", self.name)
while self.continue_working:
job_id = self.manager.queue.pop()
if job_id:
self._update_status('jobs_count')
logging.info("[%s] Processing %s", self.name, job_id)
self._fork_and_process(job_id)
if self.child_pid == 0:
@ -163,6 +174,18 @@ class Worker(threading.Thread):
else:
time.sleep(self.sleep_time)
def _update_status(self, counter):
self.status['updated_at'] = time.time()
self.status[counter] += 1
self._save_status()
@property
def _key(self):
return 'worker:%s' % self.worker_id
def _save_status(self):
self.manager.redis_connection.hmset(self._key, self.status)
def _fork_and_process(self, job_id):
self.child_pid = os.fork()
if self.child_pid == 0:
@ -171,8 +194,10 @@ class Worker(threading.Thread):
logging.info("[%s] Waiting for pid: %d", self.name, self.child_pid)
_, status = os.waitpid(self.child_pid, 0)
if status > 0:
self._update_status('done_jobs_count')
job = Job.load(self.manager, job_id)
if not job.is_finished():
self._update_status('cancelled_jobs_count')
logging.info("[%s] process interrupted and job %s hasn't finished; registering interruption in job",
self.name, job_id)
job.done(None, "Interrupted/Cancelled while running.")

View File

@ -30,6 +30,7 @@ import tornado.web
import tornado.auth
import tornado.options
import settings
import time
from data import utils
import data
@ -105,12 +106,30 @@ class StatusHandler(BaseHandler):
status = {}
info = self.redis_connection.info()
status['redis_used_memory'] = info['used_memory_human']
status['queries_in_queue'] = self.redis_connection.zcard('jobs')
status['queries_count'] = data.models.Query.objects.count()
status['query_results_count'] = data.models.QueryResult.objects.count()
status['dashboards_count'] = data.models.Dashboard.objects.count()
status['widgets_count'] = data.models.Widget.objects.count()
workers = [self.redis_connection.hgetall(w)
for w in self.redis_connection.smembers('workers')]
status['workers'] = []
for worker in workers:
w = {}
w['uptime'] = "%.0f seconds" % (time.time() - float(worker['started_at']))
w['last_updated_at'] = datetime.datetime.fromtimestamp(float(worker['updated_at'])).strftime('%Y-%m-%d %H:%M:%S')
w['jobs_received'] = worker['jobs_count']
w['jobs_done'] = worker['done_jobs_count']
status['workers'].append(w)
manager_status = self.redis_connection.hgetall('manager:status')
status['manager'] = {}
status['manager']['uptime'] = "%.0f seconds" % (time.time() - float(manager_status['started_at']))
status['manager']['last_refresh_at'] = datetime.datetime.fromtimestamp(float(manager_status['last_refresh_at'])).strftime('%Y-%m-%d %H:%M:%S')
status['manager']['queue_size'] = self.redis_connection.zcard('jobs')
self.write_json(status)

View File

@ -5,6 +5,10 @@
var refresh = function () {
$scope.refresh_time = moment().add('minutes', 1);
$http.get('/status.json').success(function (data) {
$scope.workers = data.workers;
delete data.workers;
$scope.manager = data.manager;
delete data.manager;
$scope.status = data;
});

View File

@ -10,6 +10,22 @@
{{name | toHuman}}
</li>
</ul>
<ul class="list-group col-lg-4">
<li class="list-group-item active">Manager</li>
<li class="list-group-item" ng-repeat="(name, value) in manager">
<span class="badge">{{value}}</span>
{{name | toHuman}}
</li>
</ul>
<ul class="list-group col-lg-4">
<div ng-repeat="worker in workers">
<li class="list-group-item active">Worker {{$index+1}}</li>
<li class="list-group-item" ng-repeat="(name, value) in worker">
<span class="badge">{{value}}</span>
{{name | toHuman}}
</li>
</div>
</ul>
</div>
<div class="panel-footer">Next refresh: <span am-time-ago="refresh_time"></span></div>
</div>