Query Runner: eccenca Corporate Memory (SPARQL) - query RDF / Linked Data Knowledge Graphs with redash (#5415)

* add Corporate Memory Runner based on cmempy 21.2.3

* fix code style

* apply some code nice ups

* use extendedEnum, boolean and extra_options for schema description

* use lower case sorting for data source types list

This correctly orders data source names which starts with lower
chars (such as eccenca Corporate Memory)

* add missing dblogo
This commit is contained in:
Sebastian Tramp 2021-03-24 08:15:24 +01:00 committed by GitHub
parent fb90b501cb
commit 70681294a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 272 additions and 1 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

View File

@ -29,7 +29,8 @@ class DataSourceTypeListResource(BaseResource):
@require_admin
def get(self):
return [
q.to_dict() for q in sorted(query_runners.values(), key=lambda q: q.name())
q.to_dict()
for q in sorted(query_runners.values(), key=lambda q: q.name().lower())
]

View File

@ -0,0 +1,268 @@
"""Provide the query runner for eccenca Corporate Memory.
seeAlso: https://documentation.eccenca.com/
seeAlso: https://eccenca.com/
"""
import logging
import json
from os import environ
from redash.query_runner import BaseQueryRunner
from redash.utils import json_dumps, json_loads
from . import register
try:
from cmem.cmempy.queries import SparqlQuery, QueryCatalog, QUERY_STRING
from cmem.cmempy.dp.proxy.graph import get_graphs_list
enabled = True
except ImportError:
enabled = False
logger = logging.getLogger(__name__)
class CorporateMemoryQueryRunner(BaseQueryRunner):
"""Use eccenca Corporate Memory as redash data source"""
# These environment keys are used by cmempy
KNOWN_CONFIG_KEYS = (
"CMEM_BASE_PROTOCOL",
"CMEM_BASE_DOMAIN",
"CMEM_BASE_URI",
"SSL_VERIFY",
"REQUESTS_CA_BUNDLE",
"DP_API_ENDPOINT",
"DI_API_ENDPOINT",
"OAUTH_TOKEN_URI",
"OAUTH_GRANT_TYPE",
"OAUTH_USER",
"OAUTH_PASSWORD",
"OAUTH_CLIENT_ID",
"OAUTH_CLIENT_SECRET",
)
# These variables hold secret data and should NOT be logged
KNOWN_SECRET_KEYS = ("OAUTH_PASSWORD", "OAUTH_CLIENT_SECRET")
# This allows for an easy connection test
noop_query = "SELECT ?noop WHERE {BIND('noop' as ?noop)}"
# We do not want to have comment in our sparql queries
# FEATURE?: Implement annotate_query in case the metadata is useful somewhere
should_annotate_query = False
def __init__(self, configuration):
"""init the class and configuration"""
super(CorporateMemoryQueryRunner, self).__init__(configuration)
"""
FEATURE?: activate SPARQL support in the redash query editor
Currently SPARQL syntax seems not to be available for react-ace
component. However, the ace editor itself supports sparql mode:
https://github.com/ajaxorg/ace/blob/master/lib/ace/mode/sparql.js
then we can hopefully do: self.syntax = "sparql"
FEATURE?: implement the retrieve Query catalog URIs in order to use them in queries
FEATURE?: implement a way to use queries from the query catalog
FEATURE?: allow a checkbox to NOT use owl:imports imported graphs
FEATURE?: allow to use a context graph per data source
"""
self.configuration = configuration
def _setup_environment(self):
"""provide environment for cmempy
cmempy environment variables need to match key in the properties
object of the configuration_schema
"""
for key in self.KNOWN_CONFIG_KEYS:
if key in environ:
environ.pop(key)
value = self.configuration.get(key, None)
if value is not None:
environ[key] = str(value)
if key in self.KNOWN_SECRET_KEYS:
logger.info("{} set by config".format(key))
else:
logger.info("{} set by config to {}".format(key, environ[key]))
@staticmethod
def _transform_sparql_results(results):
"""transforms a SPARQL query result to a redash query result
source structure: SPARQL 1.1 Query Results JSON Format
- seeAlso: https://www.w3.org/TR/sparql11-results-json/
target structure: redash result set
there is no good documentation available
so here an example result set as needed for redash:
data = {
"columns": [ {"name": "name", "type": "string", "friendly_name": "friendly name"}],
"rows": [
{"name": "value 1"},
{"name": "value 2"}
]}
FEATURE?: During the sparql_row loop, we could check the data types of the
values and, in case they are all the same, choose something better than
just string.
"""
logger.info("results are: {}".format(results))
# Not sure why we do not use the json package here but all other
# query runner do it the same way :-)
sparql_results = json_loads(results)
# transform all bindings to redash rows
rows = []
for sparql_row in sparql_results["results"]["bindings"]:
row = {}
for var in sparql_results["head"]["vars"]:
try:
row[var] = sparql_row[var]["value"]
except KeyError:
# not bound SPARQL variables are set as empty strings
row[var] = ""
rows.append(row)
# transform all vars to redash columns
columns = []
for var in sparql_results["head"]["vars"]:
columns.append({"name": var, "friendly_name": var, "type": "string"})
# Not sure why we do not use the json package here but all other
# query runner do it the same way :-)
return json_dumps({"columns": columns, "rows": rows})
@classmethod
def name(cls):
return "eccenca Corporate Memory (with SPARQL)"
@classmethod
def enabled(cls):
return enabled
@classmethod
def type(cls):
return "corporate_memory"
def run_query(self, query, user):
"""send a sparql query to corporate memory"""
query_text = query
logger.info("about to execute query (user='{}'): {}".format(user, query_text))
query = SparqlQuery(query_text)
query_type = query.get_query_type()
# type of None means, there is an error in the query
# so execution is at least tried on endpoint
if query_type not in ["SELECT", None]:
raise ValueError(
"Queries of type {} can not be processed by redash.".format(query_type)
)
self._setup_environment()
try:
data = self._transform_sparql_results(query.get_results())
except Exception as error:
logger.info("Error: {}".format(error))
try:
# try to load Problem Details for HTTP API JSON
details = json.loads(error.response.text)
error = ""
if "title" in details:
error += details["title"] + ": "
if "detail" in details:
error += details["detail"]
return None, error
except Exception:
pass
return None, error
error = None
return data, error
@classmethod
def configuration_schema(cls):
"""provide the configuration of the data source as json schema"""
return {
"type": "object",
"properties": {
"CMEM_BASE_URI": {"type": "string", "title": "Base URL"},
"OAUTH_GRANT_TYPE": {
"type": "string",
"title": "Grant Type",
"default": "client_credentials",
"extendedEnum": [
{"value": "client_credentials", "name": "client_credentials"},
{"value": "password", "name": "password"},
],
},
"OAUTH_CLIENT_ID": {
"type": "string",
"title": "Client ID (e.g. cmem-service-account)",
"default": "cmem-service-account",
},
"OAUTH_CLIENT_SECRET": {
"type": "string",
"title": "Client Secret - only needed for grant type 'client_credentials'",
},
"OAUTH_USER": {
"type": "string",
"title": "User account - only needed for grant type 'password'",
},
"OAUTH_PASSWORD": {
"type": "string",
"title": "User Password - only needed for grant type 'password'",
},
"SSL_VERIFY": {
"type": "boolean",
"title": "Verify SSL certificates for API requests",
"default": True,
},
"REQUESTS_CA_BUNDLE": {
"type": "string",
"title": "Path to the CA Bundle file (.pem)",
},
},
"required": ["CMEM_BASE_URI", "OAUTH_GRANT_TYPE", "OAUTH_CLIENT_ID"],
"secret": ["OAUTH_CLIENT_SECRET", "OAUTH_PASSWORD"],
"extra_options": [
"OAUTH_GRANT_TYPE",
"OAUTH_USER",
"OAUTH_PASSWORD",
"SSL_VERIFY",
"REQUESTS_CA_BUNDLE",
],
}
def get_schema(self, get_stats=False):
"""Get the schema structure (prefixes, graphs)."""
schema = dict()
schema["1"] = {
"name": "-> Common Prefixes <-",
"columns": self._get_common_prefixes_schema(),
}
schema["2"] = {"name": "-> Graphs <-", "columns": self._get_graphs_schema()}
# schema.update(self._get_query_schema())
logger.info(schema.values())
return schema.values()
def _get_graphs_schema(self):
"""Get a list of readable graph FROM clause strings."""
self._setup_environment()
graphs = []
for graph in get_graphs_list():
graphs.append("FROM <{}>".format(graph["iri"]))
return graphs
@staticmethod
def _get_common_prefixes_schema():
"""Get a list of SPARQL prefix declarations."""
common_prefixes = [
"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
"PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>",
"PREFIX owl: <http://www.w3.org/2002/07/owl#>",
"PREFIX schema: <http://schema.org/>",
"PREFIX dct: <http://purl.org/dc/terms/>",
"PREFIX skos: <http://www.w3.org/2004/02/skos/core#>",
]
return common_prefixes
register(CorporateMemoryQueryRunner)

View File

@ -372,6 +372,7 @@ default_query_runners = [
"redash.query_runner.exasol",
"redash.query_runner.cloudwatch",
"redash.query_runner.cloudwatch_insights",
"redash.query_runner.corporate_memory",
]
enabled_query_runners = array_from_string(

View File

@ -36,3 +36,4 @@ pyexasol==0.12.0
python-rapidjson==0.8.0
pyodbc==4.0.28
trino~=0.305
cmem-cmempy==21.2.3