Bigquery support

This commit is contained in:
Eran Sandler 2014-03-05 08:46:27 +02:00
parent 7c89ff5c1b
commit 5b42a4b36e
3 changed files with 101 additions and 1 deletions

View File

@ -169,7 +169,7 @@ class Manager(object):
def start_workers(self, workers_count, connection_type, connection_string):
if self.workers:
return self.workers
if connection_type == 'mysql':
from redash.data import query_runner_mysql
runner = query_runner_mysql.mysql(connection_string)
@ -181,6 +181,10 @@ class Manager(object):
else:
connection_params['auth'] = None
runner = query_runner_graphite.graphite(connection_params)
elif connection_type == 'bigquery':
from redash.data import query_runner_bigquery
connection_params = json.loads(connection_string)
runner = query_runner_bigquery.bigquery(connection_params)
else:
from redash.data import query_runner
runner = query_runner.redshift(connection_string)

View File

@ -0,0 +1,94 @@
import httplib2
import json
import logging
import sys
import apiclient.errors
from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.client import SignedJwtAssertionCredentials
from redash.utils import JSONEncoder
def bigquery(connection_string):
def load_key(filename):
f = file(filename, "rb")
try:
return f.read()
finally:
f.close()
def get_bigquery_service():
scope = [
"https://www.googleapis.com/auth/bigquery",
]
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"], load_key(connection_string["privateKey"]), scope=scope)
http = httplib2.Http()
http = credentials.authorize(http)
return build("bigquery", "v2", http=http)
def query_runner(query):
bigquery_service = get_bigquery_service()
jobs = bigquery_service.jobs()
job_data = {
"configuration": {
"query": {
"query": query,
}
}
}
logging.debug("bigquery got query: %s", query)
project_id = connection_string["projectId"]
try:
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
current_row = 0
query_reply = jobs.getQueryResults(projectId=project_id, jobId=insert_response['jobReference']['jobId'], startIndex=current_row).execute()
rows = []
field_names = []
for f in query_reply["schema"]["fields"]:
field_names.append(f["name"])
while(("rows" in query_reply) and current_row < query_reply['totalRows']):
for row in query_reply["rows"]:
row_data = {}
column_index = 0
for cell in row["f"]:
row_data[field_names[column_index]] = cell["v"]
column_index += 1
rows.append(row_data)
current_row += len(query_reply['rows'])
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], startIndex=current_row).execute()
columns = [{'name': name,
'friendly_name': name,
'type': None} for name in field_names]
data = {
"columns" : columns,
"rows" : rows
}
error = None
json_data = json.dumps(data, cls=JSONEncoder)
except apiclient.errors.HttpError, e:
json_data = None
error = e.args[1]
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
return json_data, error
return query_runner

View File

@ -50,6 +50,8 @@ CONNECTION_ADAPTER = os.environ.get("REDASH_CONNECTION_ADAPTER", "pg")
# -- mysql: CONNECTION_STRING = "Server=;User=;Pwd=;Database="
# -- pg: CONNECTION_STRING = "user= password= host= port=5439 dbname="
# -- graphite: CONNECTION_STRING = {"url": "https://graphite.yourcompany.com", "auth": ["user", "password"], "verify": true}
# -- bigquery: CONNECTION_STRING = {"serviceAccount" : "43242343247-fjdfakljr3r2@developer.gserviceaccount.com", "privateKey" : "/somewhere/23fjkfjdsfj21312-privatekey.p12", "projectId" : "myproject-123" }
# to obtain bigquery credentials follow the guidelines at https://developers.google.com/bigquery/authorization#service-accounts
CONNECTION_STRING = os.environ.get("REDASH_CONNECTION_STRING", "user= password= host= port=5439 dbname=")
# Connection settings for re:dash's own database (where we store the queries, results, etc)