Switch to Redis for pause state storage

This commit is contained in:
Arik Fraimovich 2016-05-30 22:44:09 +03:00
parent 9538ee7c31
commit 59f8af2c44
10 changed files with 33 additions and 23 deletions

View File

@ -406,6 +406,8 @@
}, function(error) {
if (error.status === 403) {
queryResult.update(error.data);
} else if (error.status === 400 && 'job' in error.data) {
queryResult.update(error.data);
}
});

View File

@ -115,7 +115,8 @@ class DataSourcePauseResource(BaseResource):
if data:
reason = data.get('reason')
else:
reason = None
reason = request.args.get('reason')
data_source.pause(reason)
data_source.save()

View File

@ -26,7 +26,7 @@ def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
if missing_params:
return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))
if data_source.is_paused:
if data_source.paused:
if data_source.pause_reason:
message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
else:

View File

@ -373,7 +373,8 @@ class DataSource(BelongsToOrgMixin, BaseModel):
'name': self.name,
'type': self.type,
'syntax': self.query_runner.syntax,
'is_paused': self.is_paused
'paused': self.paused,
'pause_reason': self.pause_reason
}
if all:
@ -415,21 +416,22 @@ class DataSource(BelongsToOrgMixin, BaseModel):
return schema
def _pause_key(self):
return 'ds:{}:pause'.format(self.id)
@property
def is_paused(self):
return self.options.get('is_paused', False)
def paused(self):
return redis_connection.exists(self._pause_key())
@property
def pause_reason(self):
return self.options.get('pause_reason', None)
return redis_connection.get(self._pause_key())
def pause(self, reason=None):
self.options['is_paused'] = True
self.options['pause_reason'] = reason
redis_connection.set(self._pause_key(), reason)
def resume(self):
self.options['is_paused'] = False
self.options['pause_reason'] = None
redis_connection.delete(self._pause_key())
def add_group(self, group, view_only=False):
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)

View File

@ -264,7 +264,7 @@ def refresh_queries():
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
if query.data_source.is_paused:
if query.data_source.paused:
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
else:
enqueue_query(query.query, query.data_source,
@ -348,7 +348,7 @@ def refresh_schemas():
Refreshes the data sources schemas.
"""
for ds in models.DataSource.select():
if ds.is_paused:
if ds.paused:
logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason)
else:
logger.info(u"Refreshing schema for: {}".format(ds.name))

View File

@ -103,15 +103,18 @@ class TestDataSourcePausePost(BaseTestCase):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
def test_pause_sets_reason(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'})
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing')
rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test')
def test_requires_admin(self):
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)
@ -124,7 +127,7 @@ class TestDataSourcePauseDelete(BaseTestCase):
self.factory.data_source.save()
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, False)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False)
def test_requires_admin(self):
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))

View File

@ -75,7 +75,6 @@ class TestQueryResultListAPI(BaseTestCase):
def test_execute_on_paused_data_source(self):
self.factory.data_source.pause()
self.factory.data_source.save()
rv = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,

View File

@ -11,19 +11,19 @@ class TestDataSourceCreate(BaseTestCase):
class TestDataSourceIsPaused(BaseTestCase):
def test_returns_false_by_default(self):
self.assertFalse(self.factory.data_source.is_paused)
self.assertFalse(self.factory.data_source.paused)
def test_persists_selection(self):
self.factory.data_source.pause()
self.assertTrue(self.factory.data_source.is_paused)
self.assertTrue(self.factory.data_source.paused)
self.factory.data_source.resume()
self.assertFalse(self.factory.data_source.is_paused)
self.assertFalse(self.factory.data_source.paused)
def test_allows_setting_reason(self):
reason = "Some good reason."
self.factory.data_source.pause(reason)
self.assertTrue(self.factory.data_source.is_paused)
self.assertTrue(self.factory.data_source.paused)
self.assertEqual(self.factory.data_source.pause_reason, reason)
def test_resume_clears_reason(self):

View File

@ -30,14 +30,12 @@ class TestRefreshQueries(BaseTestCase):
query.save()
query.data_source.pause()
query.data_source.save()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
query.data_source.resume()
query.data_source.save()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()

View File

@ -12,8 +12,13 @@ class TestRefreshSchemas(BaseTestCase):
def test_skips_paused_data_sources(self):
self.factory.data_source.pause()
self.factory.data_source.save()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_not_called()
self.factory.data_source.resume()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)