aligned the file. added sleep to job status retry and fixed the error message when HttpError is raised

This commit is contained in:
Yosi Taguri 2014-05-13 09:03:30 +03:00 committed by Arik Fraimovich
parent 75bc9bb318
commit e499e8099d

View File

@ -2,6 +2,7 @@ import httplib2
import json
import logging
import sys
import time
try:
import apiclient.errors
@ -14,6 +15,7 @@ except ImportError:
from redash.utils import JSONEncoder
def bigquery(connection_string):
def load_key(filename):
f = file(filename, "rb")
@ -27,7 +29,8 @@ def bigquery(connection_string):
"https://www.googleapis.com/auth/bigquery",
]
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"], load_key(connection_string["privateKey"]), scope=scope)
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"],
load_key(connection_string["privateKey"]), scope=scope)
http = httplib2.Http()
http = credentials.authorize(http)
@ -36,7 +39,8 @@ def bigquery(connection_string):
def get_query_results(jobs, project_id, job_id, start_index):
query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute()
logging.debug('query_reply %s', query_reply)
if query_reply['jobComplete'] != True:
if not query_reply['jobComplete']:
time.sleep(10)
return get_query_results(jobs, project_id, job_id, start_index)
return query_reply
@ -60,14 +64,15 @@ def bigquery(connection_string):
try:
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
current_row = 0
query_reply = get_query_results(jobs, project_id=project_id, job_id=insert_response['jobReference']['jobId'], start_index=current_row)
query_reply = get_query_results(jobs, project_id=project_id,
job_id=insert_response['jobReference']['jobId'], start_index=current_row)
rows = []
field_names = []
for f in query_reply["schema"]["fields"]:
field_names.append(f["name"])
while(("rows" in query_reply) and current_row < query_reply['totalRows']):
while ("rows" in query_reply) and current_row < query_reply['totalRows']:
for row in query_reply["rows"]:
row_data = {}
column_index = 0
@ -78,26 +83,27 @@ def bigquery(connection_string):
rows.append(row_data)
current_row += len(query_reply['rows'])
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], startIndex=current_row).execute()
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'],
startIndex=current_row).execute()
columns = [{'name': name,
'friendly_name': name,
'type': None} for name in field_names]
data = {
"columns" : columns,
"rows" : rows
"columns": columns,
"rows": rows
}
error = None
json_data = json.dumps(data, cls=JSONEncoder)
except apiclient.errors.HttpError, e:
json_data = None
error = e.args[1]
error = e.content
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except Exception as e:
except Exception:
raise sys.exc_info()[1], None, sys.exc_info()[2]
return json_data, error