mirror of
https://github.com/valitydev/redash.git
synced 2024-11-06 17:15:17 +00:00
dfc873fb8b
* Add additional statsd metrics for worker/scheduler
82 lines
2.4 KiB
Python
82 lines
2.4 KiB
Python
from unittest import TestCase
|
|
from mock import patch, ANY
|
|
|
|
from redash.tasks.schedule import rq_scheduler, schedule_periodic_jobs
|
|
|
|
|
|
class TestSchedule(TestCase):
|
|
def setUp(self):
|
|
for job in rq_scheduler.get_jobs():
|
|
rq_scheduler.cancel(job)
|
|
job.delete()
|
|
|
|
def test_schedules_a_new_job(self):
|
|
def foo():
|
|
pass
|
|
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
|
|
jobs = [job for job in rq_scheduler.get_jobs()]
|
|
|
|
self.assertEqual(len(jobs), 1)
|
|
self.assertTrue(jobs[0].func_name.endswith("foo"))
|
|
self.assertEqual(jobs[0].meta["interval"], 60)
|
|
|
|
def test_doesnt_reschedule_an_existing_job(self):
|
|
def foo():
|
|
pass
|
|
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
with patch("redash.tasks.rq_scheduler.schedule") as schedule:
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
schedule.assert_not_called()
|
|
|
|
def test_reschedules_a_modified_job(self):
|
|
def foo():
|
|
pass
|
|
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
schedule_periodic_jobs([{"func": foo, "interval": 120}])
|
|
|
|
jobs = [job for job in rq_scheduler.get_jobs()]
|
|
|
|
self.assertEqual(len(jobs), 1)
|
|
self.assertTrue(jobs[0].func_name.endswith("foo"))
|
|
self.assertEqual(jobs[0].meta["interval"], 120)
|
|
|
|
def test_removes_jobs_that_are_no_longer_defined(self):
|
|
def foo():
|
|
pass
|
|
|
|
def bar():
|
|
pass
|
|
|
|
schedule_periodic_jobs(
|
|
[{"func": foo, "interval": 60}, {"func": bar, "interval": 90}]
|
|
)
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
|
|
jobs = [job for job in rq_scheduler.get_jobs()]
|
|
|
|
self.assertEqual(len(jobs), 1)
|
|
self.assertTrue(jobs[0].func_name.endswith("foo"))
|
|
self.assertEqual(jobs[0].meta["interval"], 60)
|
|
|
|
|
|
class TestSchedulerMetrics(TestCase):
|
|
def setUp(self):
|
|
for job in rq_scheduler.get_jobs():
|
|
rq_scheduler.cancel(job)
|
|
job.delete()
|
|
|
|
def test_scheduler_enqueue_job_metric(self):
|
|
def foo():
|
|
pass
|
|
|
|
schedule_periodic_jobs([{"func": foo, "interval": 60}])
|
|
|
|
with patch("statsd.StatsClient.incr") as incr:
|
|
rq_scheduler.enqueue_jobs()
|
|
incr.assert_called_once_with("rq.jobs.created.periodic")
|
|
|