Merge pull request #1085 from getredash/feature/pause-api

Feature: API to pause a data source
This commit is contained in:
Arik Fraimovich 2016-05-31 09:08:03 +03:00
commit 857caab20e
13 changed files with 210 additions and 13 deletions

View File

@ -406,6 +406,8 @@
}, function(error) {
if (error.status === 403) {
queryResult.update(error.data);
} else if (error.status === 400 && 'job' in error.data) {
queryResult.update(error.data);
}
});

View File

@ -6,7 +6,7 @@ from redash.utils import json_dumps
from redash.handlers.base import org_scoped_rule
from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource
from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource
from redash.handlers.events import EventResource
from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
@ -49,6 +49,7 @@ api.add_org_resource(DashboardShareResource, '/api/dashboards/<dashboard_id>/sha
api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types')
api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources')
api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema')
api.add_org_resource(DataSourcePauseResource, '/api/data_sources/<data_source_id>/pause')
api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source')
api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups')

View File

@ -106,3 +106,38 @@ class DataSourceSchemaResource(BaseResource):
return schema
class DataSourcePauseResource(BaseResource):
@require_admin
def post(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data = request.get_json(force=True, silent=True)
if data:
reason = data.get('reason')
else:
reason = request.args.get('reason')
data_source.pause(reason)
data_source.save()
self.record_event({
'action': 'pause',
'object_id': data_source.id,
'object_type': 'datasource'
})
return data_source.to_dict()
@require_admin
def delete(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data_source.resume()
data_source.save()
self.record_event({
'action': 'resume',
'object_id': data_source.id,
'object_type': 'datasource'
})
return data_source.to_dict()

View File

@ -16,12 +16,23 @@ from redash.utils import collect_query_parameters, collect_parameters_from_reque
from redash.tasks.queries import enqueue_query
def error_response(message):
return {'job': {'status': 4, 'error': message}}, 400
def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
query_parameters = set(collect_query_parameters(query_text))
missing_params = set(query_parameters) - set(parameter_values.keys())
if missing_params:
return {'job': {'status': 4,
'error': 'Missing parameter value for: {}'.format(", ".join(missing_params))}}, 400
return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))
if data_source.paused:
if data_source.pause_reason:
message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
else:
message = '{} is paused. Please try later.'.format(data_source.name)
return error_response(message)
if query_parameters:
query_text = pystache.render(query_text, parameter_values)

View File

@ -372,7 +372,9 @@ class DataSource(BelongsToOrgMixin, BaseModel):
'id': self.id,
'name': self.name,
'type': self.type,
'syntax': self.query_runner.syntax
'syntax': self.query_runner.syntax,
'paused': self.paused,
'pause_reason': self.pause_reason
}
if all:
@ -414,6 +416,23 @@ class DataSource(BelongsToOrgMixin, BaseModel):
return schema
def _pause_key(self):
return 'ds:{}:pause'.format(self.id)
@property
def paused(self):
return redis_connection.exists(self._pause_key())
@property
def pause_reason(self):
return redis_connection.get(self._pause_key())
def pause(self, reason=None):
redis_connection.set(self._pause_key(), reason)
def resume(self):
redis_connection.delete(self._pause_key())
def add_group(self, group, view_only=False):
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)
setattr(self, 'data_source_groups', dsg)

View File

@ -264,9 +264,13 @@ def refresh_queries():
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
if query.data_source.paused:
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
else:
enqueue_query(query.query, query.data_source,
scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id)
outdated_queries_count += 1
@ -344,11 +348,14 @@ def refresh_schemas():
Refreshes the data sources schemas.
"""
for ds in models.DataSource.select():
logger.info("Refreshing schema for: {}".format(ds.name))
if ds.paused:
logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason)
else:
logger.info(u"Refreshing schema for: {}".format(ds.name))
try:
ds.get_schema(refresh=True)
except Exception:
logger.exception("Failed refreshing schema for the data source: %s", ds.name)
logger.exception(u"Failed refreshing schema for the data source: %s", ds.name)
def signal_handler(*args):

View File

@ -63,6 +63,9 @@ class ConfigurationContainer(object):
def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)
def __setitem__(self, key, value):
self._config[key] = value
def __getitem__(self, item):
if item in self._config:
return self._config[item]

View File

@ -52,7 +52,8 @@ org_factory = ModelFactory(redash.models.Organization,
data_source_factory = ModelFactory(redash.models.DataSource,
name=Sequence('Test {}'),
type='pg',
options=ConfigurationContainer.from_json('{"dbname": "test"}'),
# If we don't use lambda here it will reuse the same options between tests:
options=lambda: ConfigurationContainer.from_json('{"dbname": "test"}'),
org=1)
dashboard_factory = ModelFactory(redash.models.Dashboard,

View File

@ -96,3 +96,39 @@ class TestDataSourceListAPIPost(BaseTestCase):
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200)
class TestDataSourcePausePost(BaseTestCase):
def test_pauses_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
def test_pause_sets_reason(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'})
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing')
rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test')
def test_requires_admin(self):
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)
class TestDataSourcePauseDelete(BaseTestCase):
def test_resumes_data_source(self):
admin = self.factory.create_admin()
self.factory.data_source.pause()
self.factory.data_source.save()
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False)
def test_requires_admin(self):
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)

View File

@ -73,6 +73,18 @@ class TestQueryResultListAPI(BaseTestCase):
self.assertEquals(rv.status_code, 200)
self.assertIn('job', rv.json)
def test_execute_on_paused_data_source(self):
self.factory.data_source.pause()
rv = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': 'SELECT 1',
'max_age': 0})
self.assertEquals(rv.status_code, 400)
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)
class TestQueryResultAPI(BaseTestCase):
def test_has_no_access_to_data_source(self):

View File

@ -7,3 +7,29 @@ class TestDataSourceCreate(BaseTestCase):
def test_adds_data_source_to_default_group(self):
data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg')
self.assertIn(self.factory.org.default_group.id, data_source.groups)
class TestDataSourceIsPaused(BaseTestCase):
def test_returns_false_by_default(self):
self.assertFalse(self.factory.data_source.paused)
def test_persists_selection(self):
self.factory.data_source.pause()
self.assertTrue(self.factory.data_source.paused)
self.factory.data_source.resume()
self.assertFalse(self.factory.data_source.paused)
def test_allows_setting_reason(self):
reason = "Some good reason."
self.factory.data_source.pause(reason)
self.assertTrue(self.factory.data_source.paused)
self.assertEqual(self.factory.data_source.pause_reason, reason)
def test_resume_clears_reason(self):
self.factory.data_source.pause("Reason")
self.factory.data_source.resume()
self.assertEqual(self.factory.data_source.pause_reason, None)
def test_reason_is_none_by_default(self):
self.assertEqual(self.factory.data_source.pause_reason, None)

View File

@ -21,6 +21,26 @@ class TestRefreshQueries(BaseTestCase):
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self):
query = self.factory.create_query(schedule="60")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query.save()
query.data_source.pause()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
query.data_source.resume()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
def test_skips_fresh_queries(self):
query = self.factory.create_query(schedule="1200")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)

View File

@ -0,0 +1,24 @@
import datetime
from mock import patch, call, ANY
from tests import BaseTestCase
from redash.tasks import refresh_schemas
class TestRefreshSchemas(BaseTestCase):
def test_calls_refresh_of_all_data_sources(self):
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)
def test_skips_paused_data_sources(self):
self.factory.data_source.pause()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_not_called()
self.factory.data_source.resume()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)