mirror of
https://github.com/valitydev/redash.git
synced 2024-11-07 17:38:54 +00:00
Merge pull request #422 from EverythingMe/feature/288_bq_instance_auth
Feature: BigQueryGCE query runner that uses instance auth (fixes #288)
This commit is contained in:
commit
0e631a5121
@ -5,6 +5,8 @@ import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from redash.query_runner import *
|
||||
from redash.utils import JSONEncoder
|
||||
|
||||
@ -15,6 +17,7 @@ try:
|
||||
from apiclient.discovery import build
|
||||
from apiclient.errors import HttpError
|
||||
from oauth2client.client import SignedJwtAssertionCredentials
|
||||
from oauth2client import gce
|
||||
|
||||
enabled = True
|
||||
except ImportError:
|
||||
@ -66,18 +69,6 @@ def _load_key(filename):
|
||||
f.close()
|
||||
|
||||
|
||||
def _get_bigquery_service(service_account, private_key):
|
||||
scope = [
|
||||
"https://www.googleapis.com/auth/bigquery",
|
||||
]
|
||||
|
||||
credentials = SignedJwtAssertionCredentials(service_account, private_key, scope=scope)
|
||||
http = httplib2.Http()
|
||||
http = credentials.authorize(http)
|
||||
|
||||
return build("bigquery", "v2", http=http)
|
||||
|
||||
|
||||
def _get_query_results(jobs, project_id, job_id, start_index):
|
||||
query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute()
|
||||
logging.debug('query_reply %s', query_reply)
|
||||
@ -117,11 +108,23 @@ class BigQuery(BaseQueryRunner):
|
||||
def __init__(self, configuration_json):
|
||||
super(BigQuery, self).__init__(configuration_json)
|
||||
|
||||
self.private_key = _load_key(self.configuration["privateKey"])
|
||||
def _get_bigquery_service(self):
|
||||
scope = [
|
||||
"https://www.googleapis.com/auth/bigquery",
|
||||
]
|
||||
|
||||
private_key = _load_key(self.configuration["privateKey"])
|
||||
credentials = SignedJwtAssertionCredentials(self.configuration['serviceAccount'], private_key, scope=scope)
|
||||
http = httplib2.Http()
|
||||
http = credentials.authorize(http)
|
||||
|
||||
return build("bigquery", "v2", http=http)
|
||||
|
||||
def _get_project_id(self):
|
||||
return self.configuration["projectId"]
|
||||
|
||||
def run_query(self, query):
|
||||
bigquery_service = _get_bigquery_service(self.configuration["serviceAccount"],
|
||||
self.private_key)
|
||||
bigquery_service = self._get_bigquery_service()
|
||||
|
||||
jobs = bigquery_service.jobs()
|
||||
job_data = {
|
||||
@ -134,13 +137,13 @@ class BigQuery(BaseQueryRunner):
|
||||
|
||||
logger.debug("BigQuery got query: %s", query)
|
||||
|
||||
project_id = self.configuration["projectId"]
|
||||
project_id = self._get_project_id()
|
||||
|
||||
try:
|
||||
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
|
||||
current_row = 0
|
||||
query_reply = _get_query_results(jobs, project_id=project_id,
|
||||
job_id=insert_response['jobReference']['jobId'], start_index=current_row)
|
||||
job_id=insert_response['jobReference']['jobId'], start_index=current_row)
|
||||
|
||||
logger.debug("bigquery replied: %s", query_reply)
|
||||
|
||||
@ -176,4 +179,26 @@ class BigQuery(BaseQueryRunner):
|
||||
|
||||
return json_data, error
|
||||
|
||||
register(BigQuery)
|
||||
|
||||
class BigQueryGCE(BigQuery):
|
||||
@classmethod
|
||||
def type(cls):
|
||||
return "bigquery_gce"
|
||||
|
||||
@classmethod
|
||||
def configuration_schema(cls):
|
||||
return {}
|
||||
|
||||
def _get_project_id(self):
|
||||
return requests.get('http://metadata/computeMetadata/v1/project/project-id', headers={'Metadata-Flavor': 'Google'}).content
|
||||
|
||||
def _get_bigquery_service(self):
|
||||
credentials = gce.AppAssertionCredentials(scope='https://www.googleapis.com/auth/bigquery')
|
||||
http = httplib2.Http()
|
||||
http = credentials.authorize(http)
|
||||
|
||||
return build("bigquery", "v2", http=http)
|
||||
|
||||
|
||||
register(BigQuery)
|
||||
register(BigQueryGCE)
|
Loading…
Reference in New Issue
Block a user