Merge pull request #210 from yosit/master-yosit

support monitoring a long running job.
This commit is contained in:
Arik Fraimovich 2014-05-13 09:07:11 +03:00
commit 2c34ecde35

View File

@ -2,6 +2,7 @@ import httplib2
import json
import logging
import sys
import time
try:
import apiclient.errors
@ -14,6 +15,7 @@ except ImportError:
from redash.utils import JSONEncoder
def bigquery(connection_string):
def load_key(filename):
f = file(filename, "rb")
@ -27,12 +29,22 @@ def bigquery(connection_string):
"https://www.googleapis.com/auth/bigquery",
]
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"], load_key(connection_string["privateKey"]), scope=scope)
credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"],
load_key(connection_string["privateKey"]), scope=scope)
http = httplib2.Http()
http = credentials.authorize(http)
return build("bigquery", "v2", http=http)
def get_query_results(jobs, project_id, job_id, start_index):
query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute()
logging.debug('query_reply %s', query_reply)
if not query_reply['jobComplete']:
time.sleep(10)
return get_query_results(jobs, project_id, job_id, start_index)
return query_reply
def query_runner(query):
bigquery_service = get_bigquery_service()
@ -52,14 +64,15 @@ def bigquery(connection_string):
try:
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
current_row = 0
query_reply = jobs.getQueryResults(projectId=project_id, jobId=insert_response['jobReference']['jobId'], startIndex=current_row).execute()
query_reply = get_query_results(jobs, project_id=project_id,
job_id=insert_response['jobReference']['jobId'], start_index=current_row)
rows = []
field_names = []
for f in query_reply["schema"]["fields"]:
field_names.append(f["name"])
while(("rows" in query_reply) and current_row < query_reply['totalRows']):
while ("rows" in query_reply) and current_row < query_reply['totalRows']:
for row in query_reply["rows"]:
row_data = {}
column_index = 0
@ -70,7 +83,8 @@ def bigquery(connection_string):
rows.append(row_data)
current_row += len(query_reply['rows'])
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], startIndex=current_row).execute()
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'],
startIndex=current_row).execute()
columns = [{'name': name,
'friendly_name': name,
@ -85,11 +99,11 @@ def bigquery(connection_string):
json_data = json.dumps(data, cls=JSONEncoder)
except apiclient.errors.HttpError, e:
json_data = None
error = e.args[1]
error = e.content
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except Exception as e:
except Exception:
raise sys.exc_info()[1], None, sys.exc_info()[2]
return json_data, error