Merge pull request #39525 from zer0def/influx-query

Added arbitrary query execution and additional parameters to continuous query creation for InfluxDB
This commit is contained in:
Mike Place 2017-02-22 11:33:02 -07:00 committed by GitHub
commit 6d12a8cb06
2 changed files with 64 additions and 6 deletions

View File

@ -35,6 +35,8 @@ try:
except ImportError:
HAS_INFLUXDB = False
import collections
import json
import logging
log = logging.getLogger(__name__)
@ -593,7 +595,7 @@ def get_continuous_query(database, name, **client_args):
return {}
def create_continuous_query(database, name, query, **client_args):
def create_continuous_query(database, name, query, resample_time=None, coverage_period=None, **client_args):
'''
Create a continuous query.
@ -604,17 +606,34 @@ def create_continuous_query(database, name, query, **client_args):
name
Name of the continuous query to create.
query:
query
The continuous query string.
resample_time : None
Duration between continuous query resampling.
coverage_period : None
Duration specifying time period per sample.
CLI Example:
.. code-block:: bash
salt '*' influxdb.create_continuous_query mydb cq_month 'SELECT mean(*) INTO mydb.a_month.:MEASUREMENT FROM mydb.a_week./.*/ GROUP BY time(5m), *' '''
client = _client(**client_args)
full_query = 'CREATE CONTINUOUS QUERY {0} ON {1} BEGIN {2} END'
query = full_query.format(name, database, query)
full_query = 'CREATE CONTINUOUS QUERY {name} ON {database}'
if resample_time:
full_query += ' RESAMPLE EVERY {resample_time}'
if coverage_period:
full_query += ' FOR {coverage_period}'
full_query += ' BEGIN {query} END'
query = full_query.format(
name=name,
database=database,
query=query,
resample_time=resample_time,
coverage_period=coverage_period
)
client.query(query)
return True
@ -641,3 +660,36 @@ def drop_continuous_query(database, name, **client_args):
query = 'DROP CONTINUOUS QUERY {0} ON {1}'.format(name, database)
client.query(query)
return True
def _pull_query_results(resultset):
'''
Parses a ResultSet returned from InfluxDB into a dictionary of results,
grouped by series names and optional JSON-encoded grouping tags.
'''
_results = collections.defaultdict(lambda: {})
for _header, _values in resultset.items():
_header, _group_tags = _header
if _group_tags:
_results[_header][json.dumps(_group_tags)] = [_value for _value in _values]
else:
_results[_header] = [_value for _value in _values]
return dict(sorted(_results.items()))
def query(database, query, **client_args):
'''
Execute a query.
database
Name of the database to query on.
query
InfluxQL query string.
'''
client = _client(**client_args)
_result = client.query(query, database=database)
if isinstance(_result, collections.Sequence):
return [_pull_query_results(_query_result) for _query_result in _result if _query_result]
return [_pull_query_results(_result) if _result else {}]

View File

@ -18,7 +18,7 @@ def __virtual__():
return False
def present(name, database, query, **client_args):
def present(name, database, query, resample_time=None, coverage_period=None, **client_args):
'''
Ensure that given continuous query is present.
@ -30,6 +30,12 @@ def present(name, database, query, **client_args):
query
The query content
resample_time : None
Duration between continuous query resampling.
coverage_period : None
Duration specifying time period per sample.
'''
ret = {'name': name,
'changes': {},
@ -45,7 +51,7 @@ def present(name, database, query, **client_args):
.format(name)
return ret
if __salt__['influxdb.create_continuous_query'](
database, name, query
database, name, query, resample_time, coverage_period
):
ret['comment'] = 'continuous query {0} has been created'\
.format(name)