mirror of
https://github.com/valitydev/redash.git
synced 2024-11-06 09:05:17 +00:00
Use JSON Schema for data source configuration
This commit is contained in:
parent
20af276772
commit
8a171ba39a
@ -42,6 +42,9 @@ def update(data_source):
|
||||
elif data_source.type == 'script':
|
||||
data_source.options = json.dumps({"path": data_source.options})
|
||||
|
||||
elif data_source.type == 'mongo':
|
||||
data_source.type = 'mongodb'
|
||||
|
||||
else:
|
||||
print "[%s] No need to convert type of: %s" % (data_source.name, data_source.type)
|
||||
|
||||
|
@ -23,6 +23,7 @@ from redash.wsgi import app, auth, api
|
||||
from redash.tasks import QueryTask, record_event
|
||||
from redash.cache import headers as cache_headers
|
||||
from redash.permissions import require_permission
|
||||
from redash.query_runner import query_runners, validate_configuration
|
||||
|
||||
|
||||
@app.route('/ping', methods=['GET'])
|
||||
@ -174,11 +175,35 @@ class MetricsAPI(BaseResource):
|
||||
api.add_resource(MetricsAPI, '/api/metrics/v1/send', endpoint='metrics')
|
||||
|
||||
|
||||
class DataSourceTypeListAPI(BaseResource):
|
||||
@require_permission("admin")
|
||||
def get(self):
|
||||
return [q.to_dict() for q in query_runners.values()]
|
||||
|
||||
api.add_resource(DataSourceTypeListAPI, '/api/data_sources/types', endpoint='data_source_types')
|
||||
|
||||
|
||||
class DataSourceListAPI(BaseResource):
|
||||
def get(self):
|
||||
data_sources = [ds.to_dict() for ds in models.DataSource.all()]
|
||||
return data_sources
|
||||
|
||||
@require_permission("admin")
|
||||
def post(self):
|
||||
req = request.get_json(True)
|
||||
required_fields = ('options', 'name', 'type')
|
||||
for f in required_fields:
|
||||
if f not in req:
|
||||
abort(400)
|
||||
|
||||
if not validate_configuration(req['type'], req['options']):
|
||||
abort(400)
|
||||
|
||||
datasource = models.DataSource.create(name=req['name'], type=req['type'], options=req['options'])
|
||||
|
||||
return datasource.to_dict()
|
||||
|
||||
|
||||
api.add_resource(DataSourceListAPI, '/api/data_sources', endpoint='data_sources')
|
||||
|
||||
|
||||
|
@ -1,12 +1,13 @@
|
||||
import logging
|
||||
import json
|
||||
|
||||
import jsonschema
|
||||
from jsonschema import ValidationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
'ConfigurationError',
|
||||
'Configuration',
|
||||
'ConfigurationField',
|
||||
'ValidationError',
|
||||
'BaseQueryRunner',
|
||||
'TYPE_DATETIME',
|
||||
'TYPE_BOOLEAN',
|
||||
@ -28,67 +29,21 @@ TYPE_DATETIME = 'datetime'
|
||||
TYPE_DATE = 'date'
|
||||
|
||||
|
||||
class ConfigurationError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def _friendly_name(key):
|
||||
return " ".join(key.capitalize().split("_"))
|
||||
|
||||
|
||||
class ConfigurationField(object):
|
||||
def __init__(self, key, name=None, required=False, field_type="string"):
|
||||
if name is None:
|
||||
name = _friendly_name(key)
|
||||
|
||||
self.key = key
|
||||
self.name = name
|
||||
self.required = required
|
||||
self.field_type = field_type
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"key": self.key,
|
||||
"name": self.name,
|
||||
"mandatory": self.required,
|
||||
"field_type": self.field_type
|
||||
}
|
||||
|
||||
|
||||
class Configuration(object):
|
||||
def __init__(self, fields):
|
||||
self.fields = {field.key: field for field in fields}
|
||||
|
||||
def parse(self, configuration):
|
||||
parsed = {}
|
||||
|
||||
for key, field in self.fields.iteritems():
|
||||
if field.required and key not in configuration:
|
||||
raise ConfigurationError("Missing mandatory field: {}".format(field.name))
|
||||
|
||||
if key in configuration:
|
||||
parsed[key] = configuration[key]
|
||||
|
||||
return parsed
|
||||
|
||||
def get_input_definition(self):
|
||||
return [field.to_dict() for field in self.fields]
|
||||
|
||||
|
||||
class BaseQueryRunner(object):
|
||||
def __init__(self, configuration_json):
|
||||
try:
|
||||
configuration_spec = self.configuration_spec()
|
||||
if not isinstance(configuration_spec, Configuration):
|
||||
configuration_spec = Configuration([ConfigurationField(k) for k in configuration_spec])
|
||||
|
||||
self.configuration = configuration_spec.parse(json.loads(configuration_json))
|
||||
except ValueError:
|
||||
raise ConfigurationError("Invalid configuration syntax")
|
||||
def __init__(self, configuration):
|
||||
jsonschema.validate(configuration, self.configuration_schema())
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
raise NotImplementedError()
|
||||
return cls.__name__
|
||||
|
||||
@classmethod
|
||||
def type(cls):
|
||||
return cls.__name__.lower()
|
||||
|
||||
@classmethod
|
||||
def enabled(cls):
|
||||
@ -99,23 +54,31 @@ class BaseQueryRunner(object):
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return Configuration([])
|
||||
def configuration_schema(cls):
|
||||
return {}
|
||||
|
||||
def run_query(self, query):
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def to_dict(cls):
|
||||
return {
|
||||
'name': cls.name(),
|
||||
'type': cls.type(),
|
||||
'configuration_schema': cls.configuration_schema()
|
||||
}
|
||||
|
||||
|
||||
query_runners = {}
|
||||
|
||||
|
||||
def register(query_runner_type, query_runner_class):
|
||||
def register(query_runner_class):
|
||||
global query_runners
|
||||
if query_runner_class.enabled():
|
||||
logger.info("Registering %s query runner.", query_runner_type)
|
||||
query_runners[query_runner_type] = query_runner_class
|
||||
logger.info("Registering %s (%s) query runner.", query_runner_class.name(), query_runner_class.type())
|
||||
query_runners[query_runner_class.type()] = query_runner_class
|
||||
else:
|
||||
logger.warning("%s query runner not enabled; not registering", query_runner_type)
|
||||
logger.warning("%s query runner not enabled; not registering", query_runner_class.name())
|
||||
|
||||
|
||||
def get_query_runner(query_runner_type, configuration_json):
|
||||
@ -124,7 +87,21 @@ def get_query_runner(query_runner_type, configuration_json):
|
||||
if query_runner_class is None:
|
||||
return None
|
||||
|
||||
return query_runner_class(configuration_json)
|
||||
return query_runner_class(json.loads(configuration_json))
|
||||
|
||||
|
||||
def validate_configuration(query_runner_type, configuration_json):
|
||||
global query_runners
|
||||
query_runner_class = query_runners.get(query_runner_type, None)
|
||||
if query_runner_class is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
jsonschema.validate(json.loads(configuration_json), query_runner_class.configuration_schema())
|
||||
except ValidationError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def import_query_runners(query_runner_imports):
|
||||
|
@ -96,8 +96,25 @@ class BigQuery(BaseQueryRunner):
|
||||
return _import()
|
||||
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return "serviceAccount", "privateKey", "projectId"
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'serviceAccount': {
|
||||
'type': 'string',
|
||||
'title': 'Service Account'
|
||||
},
|
||||
'projectId': {
|
||||
'type': 'string',
|
||||
'title': 'Project ID'
|
||||
},
|
||||
'privateKey': {
|
||||
'type': 'string',
|
||||
'title': 'Private Key Path'
|
||||
}
|
||||
},
|
||||
'required': ['serviceAccount', 'projectId', 'privateKey']
|
||||
}
|
||||
|
||||
def __init__(self, configuration_json):
|
||||
super(BigQuery, self).__init__(configuration_json)
|
||||
@ -162,4 +179,4 @@ class BigQuery(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("bigquery", BigQuery)
|
||||
register(BigQuery)
|
@ -26,8 +26,26 @@ def _transform_result(response):
|
||||
|
||||
class Graphite(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return "url", "username", "password", "verify"
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'url': {
|
||||
'type': 'string'
|
||||
},
|
||||
'username': {
|
||||
'type': 'string'
|
||||
},
|
||||
'password': {
|
||||
'type': 'string'
|
||||
},
|
||||
'verify': {
|
||||
'type': 'boolean',
|
||||
'title': 'Verify SSL certificate'
|
||||
}
|
||||
},
|
||||
'required': ['url']
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def annotate_query(cls):
|
||||
@ -36,9 +54,6 @@ class Graphite(BaseQueryRunner):
|
||||
def __init__(self, configuration_json):
|
||||
super(Graphite, self).__init__(configuration_json)
|
||||
|
||||
if "url" not in self.configuration:
|
||||
raise ConfigurationError("Missing url")
|
||||
|
||||
if "username" in self.configuration and self.configuration["username"]:
|
||||
self.auth = (self.configuration["username"], self.configuration["password"])
|
||||
else:
|
||||
@ -65,4 +80,4 @@ class Graphite(BaseQueryRunner):
|
||||
|
||||
return data, error
|
||||
|
||||
register("graphite", Graphite)
|
||||
register(Graphite)
|
@ -55,8 +55,25 @@ def _convert_date(q, field_name):
|
||||
|
||||
class MongoDB(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return "connectionString", "dbName", "replicaSetName"
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'connectionString': {
|
||||
'type': 'string',
|
||||
'title': 'Connection String'
|
||||
},
|
||||
'dbName': {
|
||||
'type': 'string',
|
||||
'title': "Database Name"
|
||||
},
|
||||
'replicaSetName': {
|
||||
'type': 'string',
|
||||
'title': 'Replica Set Name'
|
||||
},
|
||||
'required': ['connectionString']
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def enabled(cls):
|
||||
@ -70,20 +87,10 @@ class MongoDB(BaseQueryRunner):
|
||||
_import()
|
||||
super(MongoDB, self).__init__(configuration_json)
|
||||
|
||||
if "dbName" not in self.configuration or not connection_string["dbName"]:
|
||||
raise ConfigurationError("dbName is missing from connection string")
|
||||
|
||||
self.db_name = self.configuration["dbName"]
|
||||
|
||||
if "connectionString" not in self.configuration or not self.configuration["connectionString"]:
|
||||
raise ConfigurationError("connectionString is missing from connection string")
|
||||
|
||||
self.is_replica_set = True if "replicaSetName" in self.configuration and self.configuration["replicaSetName"] else False
|
||||
|
||||
if self.is_replica_set and not self.configuration["replicaSetName"]:
|
||||
raise ConfigurationError("replicaSetName is set in the connection string JSON but is empty")
|
||||
|
||||
|
||||
def run_query(self, query):
|
||||
if self.is_replica_set:
|
||||
db_connection = pymongo.MongoReplicaSetClient(self.configuration["connectionString"], replicaSet=self.configuration["replicaSetName"])
|
||||
@ -172,4 +179,4 @@ class MongoDB(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("mongo", MongoDB)
|
||||
register(MongoDB)
|
@ -10,8 +10,27 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class Mysql(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return "host", "user", "passwd" "db"
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'host': {
|
||||
'type': 'string'
|
||||
},
|
||||
'user': {
|
||||
'type': 'string'
|
||||
},
|
||||
'passwd': {
|
||||
'type': 'string',
|
||||
'title': 'Password'
|
||||
},
|
||||
'db': {
|
||||
'type': 'string',
|
||||
'title': 'Database name'
|
||||
}
|
||||
},
|
||||
'required': ['db']
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def enabled(cls):
|
||||
@ -25,9 +44,6 @@ class Mysql(BaseQueryRunner):
|
||||
def __init__(self, configuration_json):
|
||||
super(Mysql, self).__init__(configuration_json)
|
||||
|
||||
if 'db' not in configuration_json.keys():
|
||||
raise ConfigurationError("Missing database name")
|
||||
|
||||
self.configuration.update({
|
||||
"charset": "utf8",
|
||||
"use_unicode": True
|
||||
@ -79,4 +95,4 @@ class Mysql(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("mysql", Mysql)
|
||||
register(Mysql)
|
@ -46,12 +46,33 @@ def _wait(conn):
|
||||
|
||||
class PostgreSQL(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return Configuration([ConfigurationField('user'),
|
||||
ConfigurationField('password'),
|
||||
ConfigurationField('host'),
|
||||
ConfigurationField('port'),
|
||||
ConfigurationField('dbname', name='Database Name', required=True)])
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"user": {
|
||||
"type": "string"
|
||||
},
|
||||
"password": {
|
||||
"type": "string"
|
||||
},
|
||||
"host": {
|
||||
"type": "string"
|
||||
},
|
||||
"port": {
|
||||
"type": "number"
|
||||
},
|
||||
"dbname": {
|
||||
"type": "string",
|
||||
"title": "Database Name"
|
||||
}
|
||||
},
|
||||
"required": ["dbname"]
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def type(cls):
|
||||
return "pg"
|
||||
|
||||
def __init__(self, configuration_json):
|
||||
super(PostgreSQL, self).__init__(configuration_json)
|
||||
@ -118,4 +139,4 @@ class PostgreSQL(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("pg", PostgreSQL)
|
||||
register(PostgreSQL)
|
@ -11,8 +11,17 @@ class Script(BaseQueryRunner):
|
||||
return "check_output" in subprocess.__dict__
|
||||
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return ("path",)
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'path': {
|
||||
'type': 'string',
|
||||
'title': 'Scripts path'
|
||||
}
|
||||
},
|
||||
'required': ['path']
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def annotate_query(cls):
|
||||
@ -23,7 +32,7 @@ class Script(BaseQueryRunner):
|
||||
|
||||
# Poor man's protection against running scripts from output the scripts directory
|
||||
if self.configuration["path"].find("../") > -1:
|
||||
raise ConfigurationError("Scripts can only be run from the configured scripts directory")
|
||||
raise ValidationError("Scripts can only be run from the configured scripts directory")
|
||||
|
||||
def run_query(self, query):
|
||||
try:
|
||||
@ -53,4 +62,4 @@ class Script(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("script", Script)
|
||||
register(Script)
|
@ -6,8 +6,16 @@ from redash.query_runner import *
|
||||
|
||||
class Url(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_spec(cls):
|
||||
return ("url",)
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'url': {
|
||||
'type': 'string',
|
||||
'title': 'URL base path'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def annotate_query(cls):
|
||||
@ -47,4 +55,4 @@ class Url(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register("url", Url)
|
||||
register(Url)
|
@ -23,3 +23,4 @@ honcho==0.5.0
|
||||
statsd==2.1.2
|
||||
gunicorn==18.0
|
||||
celery==3.1.11
|
||||
jsonschema==2.4.0
|
||||
|
@ -472,4 +472,45 @@ class TestLogout(BaseTestCase):
|
||||
self.assertTrue(current_user.is_authenticated())
|
||||
rv = c.get('/logout')
|
||||
self.assertEquals(rv.status_code, 302)
|
||||
self.assertFalse(current_user.is_authenticated())
|
||||
self.assertFalse(current_user.is_authenticated())
|
||||
|
||||
|
||||
class DataSourceTypesTest(BaseTestCase):
|
||||
def test_returns_data_for_admin(self):
|
||||
admin = user_factory.create(groups=['admin', 'default'])
|
||||
with app.test_client() as c, authenticated_user(c, user=admin):
|
||||
rv = c.get("/api/data_sources/types")
|
||||
self.assertEqual(rv.status_code, 200)
|
||||
|
||||
def test_returns_403_for_non_admin(self):
|
||||
with app.test_client() as c, authenticated_user(c):
|
||||
rv = c.get("/api/data_sources/types")
|
||||
self.assertEqual(rv.status_code, 403)
|
||||
|
||||
|
||||
class DataSourceTest(BaseTestCase):
|
||||
def test_returns_400_when_missing_fields(self):
|
||||
admin = user_factory.create(groups=['admin', 'default'])
|
||||
with app.test_client() as c, authenticated_user(c, user=admin):
|
||||
rv = c.post("/api/data_sources")
|
||||
self.assertEqual(rv.status_code, 400)
|
||||
|
||||
rv = json_request(c.post, '/api/data_sources', data={'name': 'DS 1'})
|
||||
|
||||
self.assertEqual(rv.status_code, 400)
|
||||
|
||||
def test_returns_400_when_configuration_invalid(self):
|
||||
admin = user_factory.create(groups=['admin', 'default'])
|
||||
with app.test_client() as c, authenticated_user(c, user=admin):
|
||||
rv = json_request(c.post, '/api/data_sources',
|
||||
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'})
|
||||
|
||||
self.assertEqual(rv.status_code, 400)
|
||||
|
||||
def test_creates_data_source(self):
|
||||
admin = user_factory.create(groups=['admin', 'default'])
|
||||
with app.test_client() as c, authenticated_user(c, user=admin):
|
||||
rv = json_request(c.post, '/api/data_sources',
|
||||
data={'name': 'DS 1', 'type': 'pg', 'options': '{"dbname": "redash"}'})
|
||||
|
||||
self.assertEqual(rv.status_code, 200)
|
@ -1,12 +0,0 @@
|
||||
import unittest
|
||||
from redash.query_runner import Configuration, ConfigurationField, ConfigurationError
|
||||
|
||||
|
||||
class TestConfigurationParsing(unittest.TestCase):
|
||||
def test_parse_raises_error_when_missing_mandatory_fields(self):
|
||||
configuration = Configuration([ConfigurationField("dbname", required=True)])
|
||||
self.assertRaises(ConfigurationError, configuration.parse, {})
|
||||
|
||||
def test_parse_returns_value_when_correct(self):
|
||||
configuration = Configuration([ConfigurationField("dbname", required=True)])
|
||||
self.assertDictEqual(configuration.parse({"dbname":"test"}), {"dbname":"test"})
|
Loading…
Reference in New Issue
Block a user