Merge pull request #967 from lloydw/master

Add: extend ElasticSearch query_runner to support aggregations
This commit is contained in:
Arik Fraimovich 2016-07-05 21:40:33 +03:00 committed by GitHub
commit 906365f011

View File

@ -105,8 +105,6 @@ class BaseElasticSearch(BaseQueryRunner):
r = requests.get(url, auth=self.auth)
mappings_data = r.json()
logger.debug(mappings_data)
for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
@ -133,6 +131,49 @@ class BaseElasticSearch(BaseQueryRunner):
"type" : mappings.get(column_name, "string")})
result_columns_index[friendly_name] = result_columns[-1]
def get_row(rows, row):
if row is None:
row = {}
rows.append(row)
return row
def collect_value(mappings, row, key, value, type):
if result_fields and key not in result_fields_index:
return
mappings[key] = type
add_column_if_needed(mappings, key, key, result_columns, result_columns_index)
row[key] = value
def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index):
if isinstance(data, dict):
for key, value in data.iteritems():
val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index)
if val:
row = get_row(rows, row)
collect_value(mappings, row, key, val, 'long')
for data_key in ['value', 'doc_count']:
if data_key not in data:
continue
if 'key' in data and len(data.keys()) == 2:
key_is_string = 'key_as_string' in data
collect_value(mappings, row, data['key'] if not key_is_string else data['key_as_string'], data[data_key], 'long' if not key_is_string else 'string')
else:
return data[data_key]
elif isinstance(data, list):
for value in data:
result_row = get_row(rows, row)
collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index)
if 'key' in value:
collect_value(mappings, result_row, parent_key, value['key'], 'string')
return None
result_columns_index = {c["name"] : c for c in result_columns}
result_fields_index = {}
@ -140,27 +181,49 @@ class BaseElasticSearch(BaseQueryRunner):
for r in result_fields:
result_fields_index[r] = None
for h in raw_result["hits"]["hits"]:
row = {}
if 'error' in raw_result:
for field, column in ELASTICSEARCH_BUILTIN_FIELDS_MAPPING.iteritems():
if field in h:
add_column_if_needed(mappings, field, column, result_columns, result_columns_index)
row[column] = h[field]
error = raw_result['error']
if len(error) > 10240:
error = error[:10240] + '... continues'
column_name = "_source" if "_source" in h else "fields"
for column in h[column_name]:
if result_fields and column not in result_fields_index:
continue
raise Exception(error)
add_column_if_needed(mappings, column, column, result_columns, result_columns_index)
elif 'aggregations' in raw_result:
value = h[column_name][column]
row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value
if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
for key, data in raw_result["aggregations"].iteritems():
collect_aggregations(mappings, result_rows, key, data, None, result_columns, result_columns_index)
logger.debug("result_rows", str(result_rows))
logger.debug("result_columns", str(result_columns))
elif 'hits' in raw_result and 'hits' in raw_result['hits']:
if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
for h in raw_result["hits"]["hits"]:
row = {}
column_name = "_source" if "_source" in h else "fields"
for column in h[column_name]:
if result_fields and column not in result_fields_index:
continue
add_column_if_needed(mappings, column, column, result_columns, result_columns_index)
value = h[column_name][column]
row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value
if row and len(row) > 0:
result_rows.append(row)
else:
raise Exception("Redash failed to parse the results it got from ElasticSearch.")
class Kibana(BaseElasticSearch):
@ -273,6 +336,7 @@ class ElasticSearch(BaseElasticSearch):
query_dict = json.loads(query)
index_name = query_dict.pop("index", "")
result_fields = query_dict.pop("result_fields", None)
if not self.server_url:
error = "Missing configuration key 'server'"
@ -283,8 +347,6 @@ class ElasticSearch(BaseElasticSearch):
mappings = self._get_mappings(mapping_url)
logger.debug(json.dumps(mappings, indent=4))
params = {"source": json.dumps(query_dict)}
logger.debug("Using URL: %s", url)
logger.debug("Using params : %s", params)
@ -293,7 +355,7 @@ class ElasticSearch(BaseElasticSearch):
result_columns = []
result_rows = []
self._parse_results(mappings, None, r.json(), result_columns, result_rows)
self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows)
json_data = json.dumps({
"columns" : result_columns,
@ -310,3 +372,4 @@ class ElasticSearch(BaseElasticSearch):
register(Kibana)
register(ElasticSearch)