Refactor Job class to be easier to extend.

Moved the Redis logic out of it.
This commit is contained in:
Arik Fraimovich 2014-03-18 15:47:29 +02:00
parent caa198964c
commit 0b994de531
3 changed files with 202 additions and 68 deletions

View File

@ -83,7 +83,7 @@ class Manager(object):
logging.info("[Manager][%s] Found existing job: %s", query_hash, job_id)
job = worker.Job.load(self.redis_connection, job_id)
else:
job = worker.Job(self.redis_connection, query, priority)
job = worker.Job(self.redis_connection, query=query, priority=priority)
pipe.multi()
job.save(pipe)
logging.info("[Manager][%s] Created new job: %s", query_hash, job.id)

View File

@ -15,7 +15,71 @@ from statsd import StatsClient
from redash.utils import gen_query_hash
from redash import settings
class Job(object):
class RedisObject(object):
# The following should be overriden in the inheriting class:
fields = {}
conversions = {}
id_field = ''
name = ''
def __init__(self, redis_connection, **kwargs):
self.redis_connection = redis_connection
self.values = {}
if not self.fields:
raise ValueError("You must set the fields dictionary, before using RedisObject.")
if not self.name:
raise ValueError("You must set the name, before using RedisObject")
self.update(**kwargs)
def __getattr__(self, name):
if name in self.values:
return self.values[name]
else:
raise AttributeError
def update(self, **kwargs):
for field, default_value in self.fields.iteritems():
value = kwargs.get(field, self.values.get(field, default_value))
if callable(value):
value = value()
if value == 'None':
value = None
if field in self.conversions and value:
value = self.conversions[field](value)
self.values[field] = value
@classmethod
def _redis_key(cls, object_id):
return '{}:{}'.format(cls.name, object_id)
def save(self, pipe):
if not pipe:
pipe = self.redis_connection.pipeline()
pipe.sadd('{}_set'.format(self.name), self.id)
pipe.hmset(self._redis_key(self.id), self.values)
pipe.publish(self._redis_key(self.id), json.dumps(self.to_dict()))
pipe.execute()
@classmethod
def load(cls, redis_connection, object_id):
object_dict = redis_connection.hgetall(cls._redis_key(object_id))
obj = None
if object_dict:
obj = cls(redis_connection, **object_dict)
return obj
class Job(RedisObject):
HIGH_PRIORITY = 1
LOW_PRIORITY = 2
@ -24,37 +88,39 @@ class Job(object):
DONE = 3
FAILED = 4
def __init__(self, redis_connection, query, priority,
job_id=None,
wait_time=None, query_time=None,
updated_at=None, status=None, error=None, query_result_id=None,
process_id=0):
self.redis_connection = redis_connection
self.query = query
self.priority = priority
self.query_hash = gen_query_hash(self.query)
self.query_result_id = query_result_id
if process_id == 'None':
self.process_id = None
else:
self.process_id = int(process_id)
fields = {
'id': lambda: str(uuid.uuid1()),
'query': None,
'priority': None,
'query_hash': None,
'wait_time': 0,
'query_time': 0,
'error': None,
'updated_at': time.time,
'status': WAITING,
'process_id': None,
'query_result_id': None
}
if job_id is None:
self.id = str(uuid.uuid1())
self.new_job = True
self.wait_time = 0
self.query_time = 0
self.error = None
self.updated_at = time.time() # job_dict.get('updated_at', time.time())
self.status = self.WAITING # int(job_dict.get('status', self.WAITING))
else:
self.id = job_id
self.new_job = False
self.error = error
self.wait_time = wait_time
self.query_time = query_time
self.updated_at = updated_at
self.status = status
conversions = {
'query': lambda query: query.decode('utf-8'),
'priority': int,
'updated_at': float,
'status': int,
'wait_time': float,
'query_time': float,
'process_id': int,
'query_result_id': int
}
name = 'job'
def __init__(self, redis_connection, query, priority, **kwargs):
kwargs['query'] = query
kwargs['priority'] = priority
kwargs['query_hash'] = gen_query_hash(kwargs['query'])
self.new_job = 'id' not in kwargs
super(Job, self).__init__(redis_connection, **kwargs)
def to_dict(self):
return {
@ -70,10 +136,6 @@ class Job(object):
'process_id': self.process_id
}
@staticmethod
def _redis_key(job_id):
return 'job:%s' % job_id
def cancel(self):
# TODO: Race condition:
# it's possible that it will be picked up by worker while processing the cancel order
@ -95,16 +157,14 @@ class Job(object):
if self.is_finished():
pipe.delete('query_hash_job:%s' % self.query_hash)
pipe.sadd('jobs_set', self.id)
pipe.hmset(self._redis_key(self.id), self.to_dict())
pipe.publish(self._redis_key(self.id), json.dumps(self.to_dict()))
pipe.execute()
super(Job, self).save(pipe)
def processing(self, process_id):
self.status = self.PROCESSING
self.process_id = process_id
self.wait_time = time.time() - self.updated_at
self.updated_at = time.time()
self.update(status=self.PROCESSING,
process_id=process_id,
wait_time=time.time() - self.updated_at,
updated_at=time.time())
self.save()
def is_finished(self):
@ -112,37 +172,21 @@ class Job(object):
def done(self, query_result_id, error):
if error:
self.status = self.FAILED
new_status = self.FAILED
else:
self.status = self.DONE
new_status = self.DONE
self.update(status=new_status,
query_result_id=query_result_id,
error=error,
query_time=time.time() - self.updated_at,
updated_at=time.time())
self.query_result_id = query_result_id
self.error = error
self.query_time = time.time() - self.updated_at
self.updated_at = time.time()
self.save()
def __str__(self):
return "<Job:%s,priority:%d,status:%d>" % (self.id, self.priority, self.status)
@classmethod
def _load(cls, redis_connection, job_id):
return redis_connection.hgetall(cls._redis_key(job_id))
@classmethod
def load(cls, redis_connection, job_id):
job_dict = cls._load(redis_connection, job_id)
job = None
if job_dict:
job = Job(redis_connection, job_id=job_dict['id'], query=job_dict['query'].decode('utf-8'),
priority=int(job_dict['priority']), updated_at=float(job_dict['updated_at']),
status=int(job_dict['status']), wait_time=float(job_dict['wait_time']),
query_time=float(job_dict['query_time']), error=job_dict['error'],
query_result_id=job_dict['query_result_id'],
process_id=job_dict['process_id'])
return job
class Worker(threading.Thread):
def __init__(self, worker_id, manager, redis_connection_params, query_runner, sleep_time=0.1):

90
tests/test_job.py Normal file
View File

@ -0,0 +1,90 @@
import time
from unittest import TestCase
from mock import patch
from redash.data.worker import Job
from redash import redis_connection
from redash.utils import gen_query_hash
class TestJob(TestCase):
def setUp(self):
self.priority = 1
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
def test_job_creation(self):
now = time.time()
with patch('time.time', return_value=now):
job = Job(redis_connection, query=self.query, priority=self.priority)
self.assertIsNotNone(job.id)
self.assertTrue(job.new_job)
self.assertEquals(0, job.wait_time)
self.assertEquals(0, job.query_time)
self.assertEquals(None, job.process_id)
self.assertEquals(Job.WAITING, job.status)
self.assertEquals(self.priority, job.priority)
self.assertEquals(self.query, job.query)
self.assertEquals(self.query_hash, job.query_hash)
self.assertIsNone(job.error)
self.assertIsNone(job.query_result_id)
def test_job_loading(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
job.save()
loaded_job = Job.load(redis_connection, job.id)
self.assertFalse(loaded_job.new_job)
self.assertEquals(loaded_job.id, job.id)
self.assertEquals(loaded_job.wait_time, job.wait_time)
self.assertEquals(loaded_job.query_time, job.query_time)
self.assertEquals(loaded_job.process_id, job.process_id)
self.assertEquals(loaded_job.status, job.status)
self.assertEquals(loaded_job.priority, job.priority)
self.assertEquals(loaded_job.query_hash, job.query_hash)
self.assertEquals(loaded_job.query, job.query)
self.assertEquals(loaded_job.error, job.error)
self.assertEquals(loaded_job.query_result_id, job.query_result_id)
def test_update(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
job.update(process_id=1)
self.assertEquals(1, job.process_id)
self.assertEquals(self.query, job.query)
self.assertEquals(self.priority, job.priority)
def test_processing(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
updated_at = job.updated_at
now = time.time()+10
with patch('time.time', return_value=now):
job.processing(10)
job = Job.load(redis_connection, job.id)
self.assertEquals(10, job.process_id)
self.assertEquals(Job.PROCESSING, job.status)
self.assertEquals(now, job.updated_at)
self.assertEquals(now - updated_at, job.wait_time)
def test_done(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
updated_at = job.updated_at
now = time.time()+10
with patch('time.time', return_value=now):
job.done(1, None)
job = Job.load(redis_connection, job.id)
self.assertEquals(Job.DONE, job.status)
self.assertEquals(1, job.query_result_id)
self.assertEquals(now, job.updated_at)
self.assertEquals(now - updated_at, job.query_time)
self.assertIsNone(job.error)
def test_done_failed(self):
pass