Add DB2 as a data source using ibm-db python package (#2848)

* Add DB2 as a data source using ibm-db python package

* fix some codeclimate issue

* fix long line and missing white space

* Manage case of bad import

* Add DB2 query_runner as default query runner

* Fixed minor PEP8 rules
This commit is contained in:
Nicolas Ferrandini 2018-10-15 16:13:39 +02:00 committed by Arik Fraimovich
parent a8a3ec66fd
commit e14c8b61a0
4 changed files with 155 additions and 1 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

152
redash/query_runner/db2.py Normal file
View File

@ -0,0 +1,152 @@
import os
import json
import logging
from redash.query_runner import *
from redash.utils import JSONEncoder
logger = logging.getLogger(__name__)
try:
import select
import ibm_db_dbi
types_map = {
ibm_db_dbi.NUMBER: TYPE_INTEGER,
ibm_db_dbi.BIGINT: TYPE_INTEGER,
ibm_db_dbi.ROWID: TYPE_INTEGER,
ibm_db_dbi.FLOAT: TYPE_FLOAT,
ibm_db_dbi.DECIMAL: TYPE_FLOAT,
ibm_db_dbi.DATE: TYPE_DATE,
ibm_db_dbi.TIME: TYPE_DATETIME,
ibm_db_dbi.DATETIME: TYPE_DATETIME,
ibm_db_dbi.BINARY: TYPE_STRING,
ibm_db_dbi.XML: TYPE_STRING,
ibm_db_dbi.TEXT: TYPE_STRING,
ibm_db_dbi.STRING: TYPE_STRING
}
enabled = True
except ImportError:
enabled = False
class DB2(BaseSQLQueryRunner):
noop_query = "SELECT 1 FROM SYSIBM.SYSDUMMY1"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"user": {
"type": "string"
},
"password": {
"type": "string"
},
"host": {
"type": "string",
"default": "127.0.0.1"
},
"port": {
"type": "number",
"default": 50000
},
"dbname": {
"type": "string",
"title": "Database Name"
}
},
"order": ['host', 'port', 'user', 'password', 'dbname'],
"required": ["dbname"],
"secret": ["password"]
}
@classmethod
def type(cls):
return "db2"
@classmethod
def enabled(cls):
try:
import ibm_db
except ImportError:
return False
return True
def _get_definitions(self, schema, query):
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
results = json.loads(results)
for row in results['rows']:
if row['TABLE_SCHEMA'] != u'public':
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
else:
table_name = row['TABLE_NAME']
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['COLUMN_NAME'])
def _get_tables(self, schema):
query = """
SELECT rtrim(t.TABSCHEMA) as table_schema,
t.TABNAME as table_name,
c.COLNAME as column_name
from syscat.tables t
join syscat.columns c
on t.TABSCHEMA = c.TABSCHEMA AND t.TABNAME = c.TABNAME
WHERE t.type IN ('T') and t.TABSCHEMA not in ('SYSIBM')
"""
self._get_definitions(schema, query)
return schema.values()
def _get_connection(self):
self.connection_string = "DATABASE={};HOSTNAME={};PORT={};PROTOCOL=TCPIP;UID={};PWD={};".format(
self.configuration["dbname"], self.configuration["host"], self.configuration["port"], self.configuration["user"], self.configuration["password"])
connection = ibm_db_dbi.connect(self.connection_string, "", "")
return connection
def run_query(self, query, user):
connection = self._get_connection()
cursor = connection.cursor()
try:
cursor.execute(query)
if cursor.description is not None:
columns = self.fetch_columns([(i[0], types_map.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json.dumps(data, cls=JSONEncoder)
else:
error = 'Query completed but it returned no data.'
json_data = None
except (select.error, OSError) as e:
error = "Query interrupted. Please retry."
json_data = None
except ibm_db_dbi.DatabaseError as e:
error = e.message
json_data = None
except (KeyboardInterrupt, InterruptException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
finally:
connection.close()
return json_data, error
register(DB2)

View File

@ -182,7 +182,8 @@ default_query_runners = [
'redash.query_runner.salesforce',
'redash.query_runner.query_results',
'redash.query_runner.prometheus',
'redash.query_runner.qubole'
'redash.query_runner.qubole',
'redash.query_runner.db2'
]
enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))

View File

@ -22,6 +22,7 @@ simple_salesforce==0.72.2
PyAthena>=1.0.0
pymapd>=0.2.1
qds-sdk>=1.9.6
ibm-db>=2.0.9
# certifi is needed to support MongoDB and SSL:
certifi
# We don't install snowflake connector by default, as it's causing conflicts with