Extract code into a method _get_total_bytes_processed.

This commit is contained in:
Ryota Arai 2015-12-16 20:13:04 +09:00 committed by Ryota Arai
parent 81386bcf37
commit 51949230d6

View File

@ -124,6 +124,14 @@ class BigQuery(BaseQueryRunner):
def _get_project_id(self):
return self.configuration["projectId"]
def _get_total_bytes_processed(self, jobs, query):
job_data = {
"query": query,
"dryRun": True,
}
response = jobs.query(projectId=self._get_project_id(), body=job_data).execute()
return int(response["totalBytesProcessed"])
def run_query(self, query):
logger.debug("BigQuery got query: %s", query)
@ -131,17 +139,6 @@ class BigQuery(BaseQueryRunner):
jobs = bigquery_service.jobs()
project_id = self._get_project_id()
if "maximumTotalMBytesProcessed" in self.configuration:
maximumMB = self.configuration["maximumTotalMBytesProcessed"]
job_data = {
"query": query,
"dryRun": True,
}
response = jobs.query(projectId=project_id, body=job_data).execute()
processedMB = int(response["totalBytesProcessed"]) / 1000.0 / 1000.0
if maximumMB < processedMB:
return None, "Too large data will be processed (%d MBytes; maximum: %d MBytes)" % (processedMB, maximumMB)
job_data = {
"configuration": {
"query": {
@ -151,6 +148,12 @@ class BigQuery(BaseQueryRunner):
}
try:
if "maximumTotalMBytesProcessed" in self.configuration:
maximumMB = self.configuration["maximumTotalMBytesProcessed"]
processedMB = self._get_total_bytes_processed(jobs, query) / 1000.0 / 1000.0
if maximumMB < processedMB:
return None, "Too large data will be processed (%f MBytes; maximum: %d MBytes)" % (processedMB, maximumMB)
insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
current_row = 0
query_reply = _get_query_results(jobs, project_id=project_id,