mirror of
https://github.com/valitydev/redash.git
synced 2024-11-07 09:28:51 +00:00
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
f79362c7a3
@ -2,6 +2,7 @@ import httplib2
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
try:
|
||||
import apiclient.errors
|
||||
@ -14,6 +15,7 @@ except ImportError:
|
||||
|
||||
from redash.utils import JSONEncoder
|
||||
|
||||
|
||||
def bigquery(connection_string):
|
||||
def load_key(filename):
|
||||
f = file(filename, "rb")
|
||||
@ -27,12 +29,22 @@ def bigquery(connection_string):
|
||||
"https://www.googleapis.com/auth/bigquery",
|
||||
]
|
||||
|
||||
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"], load_key(connection_string["privateKey"]), scope=scope)
|
||||
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"],
|
||||
load_key(connection_string["privateKey"]), scope=scope)
|
||||
http = httplib2.Http()
|
||||
http = credentials.authorize(http)
|
||||
|
||||
return build("bigquery", "v2", http=http)
|
||||
|
||||
def get_query_results(jobs, project_id, job_id, start_index):
|
||||
query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute()
|
||||
logging.debug('query_reply %s', query_reply)
|
||||
if not query_reply['jobComplete']:
|
||||
time.sleep(10)
|
||||
return get_query_results(jobs, project_id, job_id, start_index)
|
||||
|
||||
return query_reply
|
||||
|
||||
def query_runner(query):
|
||||
bigquery_service = get_bigquery_service()
|
||||
|
||||
@ -52,14 +64,15 @@ def bigquery(connection_string):
|
||||
try:
|
||||
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
|
||||
current_row = 0
|
||||
query_reply = jobs.getQueryResults(projectId=project_id, jobId=insert_response['jobReference']['jobId'], startIndex=current_row).execute()
|
||||
query_reply = get_query_results(jobs, project_id=project_id,
|
||||
job_id=insert_response['jobReference']['jobId'], start_index=current_row)
|
||||
|
||||
rows = []
|
||||
field_names = []
|
||||
for f in query_reply["schema"]["fields"]:
|
||||
field_names.append(f["name"])
|
||||
|
||||
while(("rows" in query_reply) and current_row < query_reply['totalRows']):
|
||||
while ("rows" in query_reply) and current_row < query_reply['totalRows']:
|
||||
for row in query_reply["rows"]:
|
||||
row_data = {}
|
||||
column_index = 0
|
||||
@ -70,26 +83,27 @@ def bigquery(connection_string):
|
||||
rows.append(row_data)
|
||||
|
||||
current_row += len(query_reply['rows'])
|
||||
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], startIndex=current_row).execute()
|
||||
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'],
|
||||
startIndex=current_row).execute()
|
||||
|
||||
columns = [{'name': name,
|
||||
'friendly_name': name,
|
||||
'type': None} for name in field_names]
|
||||
|
||||
data = {
|
||||
"columns" : columns,
|
||||
"rows" : rows
|
||||
"columns": columns,
|
||||
"rows": rows
|
||||
}
|
||||
error = None
|
||||
|
||||
json_data = json.dumps(data, cls=JSONEncoder)
|
||||
except apiclient.errors.HttpError, e:
|
||||
json_data = None
|
||||
error = e.args[1]
|
||||
error = e.content
|
||||
except KeyboardInterrupt:
|
||||
error = "Query cancelled by user."
|
||||
json_data = None
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
raise sys.exc_info()[1], None, sys.exc_info()[2]
|
||||
|
||||
return json_data, error
|
||||
|
Loading…
Reference in New Issue
Block a user