Add more logging to execute_query

This commit is contained in:
Arik Fraimovich 2016-01-24 11:32:44 +02:00
parent 197bbde788
commit a1eec8490a

View File

@ -252,23 +252,30 @@ class QueryExecutionError(Exception):
pass
# TODO: convert this into a class, to simplify and avoid code duplication for logging
# class ExecuteQueryTask(BaseTask):
# def run(self, ...):
# # logic
@celery.task(bind=True, base=BaseTask, track_started=True, throws=(QueryExecutionError,))
def execute_query(self, query, data_source_id, metadata):
signal.signal(signal.SIGINT, signal_handler)
start_time = time.time()
logger.info("Loading data source (%d)...", data_source_id)
logger.info("task=execute_query state=load_ds ds_id=%d", data_source_id)
# TODO: we should probably cache data sources in Redis
data_source = models.DataSource.get_by_id(data_source_id)
self.update_state(state='STARTED', meta={'start_time': start_time, 'custom_message': ''})
logger.info("Executing query:\n%s", query)
logger.debug("Executing query:\n%s", query)
query_hash = gen_query_hash(query)
query_runner = get_query_runner(data_source.type, data_source.options)
logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
if query_runner.annotate_query():
metadata['Task ID'] = self.request.id
metadata['Query Hash'] = query_hash
@ -285,6 +292,10 @@ def execute_query(self, query, data_source_id, metadata):
with statsd_client.timer('query_runner.{}.{}.run_time'.format(data_source.type, data_source.name)):
data, error = query_runner.run_query(annotated_query)
logger.info("task=execute_query state=after query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
run_time = time.time() - start_time
logger.info("Query finished... data length=%s, error=%s", data and len(data), error)
@ -295,8 +306,14 @@ def execute_query(self, query, data_source_id, metadata):
if not error:
query_result, updated_query_ids = models.QueryResult.store_result(data_source.org_id, data_source.id, query_hash, query, data, run_time, utils.utcnow())
logger.info("task=execute_query state=after_store query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
for query_id in updated_query_ids:
check_alerts_for_query.delay(query_id)
logger.info("task=execute_query state=after_alerts query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
else:
raise QueryExecutionError(error)