Add multiple source for telegraf agent

Telegraf agent can read additional files in telegraf-style json format
passed via <Source>/path</Source> option.
Files not existed at the start of agent are ignored.
This commit is contained in:
Mikhail Dyomin 2017-01-12 16:12:03 +03:00
parent bcee4fac90
commit 2d5e513264
3 changed files with 97 additions and 57 deletions

View File

@ -1189,6 +1189,7 @@ Example:
<Disk devices='["vda1","sda1","sda2","sda3"]'></Disk>
<Netstat />
<Custom diff="1" measure="call" label="test">curl -s -H 'Host: host.tld' 'http://localhost:6100/stat' | python -c 'import sys, json; j = json.load(sys.stdin); print "\n".join(`c["values"]["accept"]` for c in j["charts"] if c["name"] == "localqueue_wait_time")'</Custom>
<Source>/path/to/file</Source>
</Host>
<Host address="localhost" telegraf="/usr/bin/telegraf">
@ -1265,7 +1266,8 @@ List of metrics group names and particular metrics in them:
* expect - default: None. Optional expected string in answer
* Custom
* diff - default: 0
* measure - default: call - metric value is a command or script execution output. Example: `<Custom measure="call" diff="1" label="Base size">du -hs
* measure - default: call - metric value is a command or script execution output. Example: `<Custom measure="call" diff="1" label="Base size">du -s /var/lib/mysql/ | awk '{print $1}'</Custom>`
* Source additional source file in telegraf json format, can be used to add custom metrics that needs complex processing and do not fit into standart custom metrics (like log parsing with aggregation)
Console on-line screen

View File

@ -46,16 +46,25 @@ class DataReader(object):
"""generator reads from source line-by-line"""
def __init__(self, filename, pipe=False):
self.pipe = pipe
if not self.pipe:
self.monout = open(filename, 'r')
else:
self.monout = filename
self.buffer = ""
self.closed = False
self.broken = False
self.pipe = pipe
if not self.pipe:
try:
self.monout = open(filename, 'r')
except Exception as ex:
logger.error("Can't open source file %s: %s", filename, ex)
self.broken = True
else:
self.monout = filename
def __iter__(self):
while not self.closed:
if self.broken:
data = ''
else:
data = self.monout.readline()
if data:
parts = data.rsplit('\n', 1)
@ -66,7 +75,7 @@ class DataReader(object):
else:
self.buffer += parts[0]
else:
time.sleep(1)
yield None
if not self.pipe:
self.monout.close()
@ -77,18 +86,15 @@ class DataReader(object):
class Consolidator(object):
"""generator consolidates data from source, cache it by timestamp"""
def __init__(self, source):
self.source = source
def __init__(self, sources):
self.sources = sources
self.results = {}
def __iter__(self):
for chunk in self.source:
if chunk:
def append_chunk(self, source, chunk):
try:
data = json.loads(chunk)
except ValueError:
logger.error(
'unable to decode chunk %s', chunk, exc_info=True)
logger.error('unable to decode chunk %s', chunk, exc_info=True)
else:
try:
ts = data['timestamp']
@ -99,8 +105,7 @@ class Consolidator(object):
metric_name=data['name'],
disk_id=data['tags']['name'])
elif data['name'] == 'net':
data[
'name'] = "{metric_name}-{interface}".format(
data['name'] = "{metric_name}-{interface}".format(
metric_name=data['name'],
interface=data['tags']['interface'])
elif data['name'] == 'cpu':
@ -113,19 +118,33 @@ class Consolidator(object):
self.results[ts][key] = value
except KeyError:
logger.error(
'Malformed json from source: %s',
'Malformed json from source %s: %s',
source,
chunk,
exc_info=True)
except:
logger.error(
'Something nasty happend in consolidator work',
exc_info=True)
if len(self.results) > 5:
ready_to_go_index = min(self.results)
def __iter__(self):
while True:
for s in self.sources:
chunk_limit = 10
chunks_done = 0
chunk = s.next()
while chunk and chunks_done < chunk_limit:
self.append_chunk(s, chunk)
chunk = s.next()
if len(self.results) > 2:
logger.debug(
'Now in buffer: %s', [i for i in self.results.keys()])
dump_seconds = sorted([i for i in self.results.keys()])[:-2]
for ready_second in dump_seconds:
yield json.dumps({
ready_to_go_index:
self.results.pop(ready_to_go_index, None)
ready_second: self.results.pop(ready_second, None)
})
time.sleep(0.5)
class Drain(threading.Thread):
@ -161,6 +180,7 @@ class AgentWorker(threading.Thread):
self.startups = []
self.startup_processes = []
self.shutdowns = []
self.custom_sources = []
self.daemon = True # Thread auto-shutdown
self.finished = False
self.drain = None
@ -199,6 +219,11 @@ class AgentWorker(threading.Thread):
for option in config.options('shutdown'):
if option.startswith('cmd'):
self.shutdowns.append(config.get('shutdown', option))
if config.has_section('source'):
for option in config.options('source'):
if option.startswith('file'):
self.custom_sources.append(config.get('source', option))
logger.info(
'Successfully loaded startup config.\n'
'Startups: %s\n'
@ -222,6 +247,7 @@ class AgentWorker(threading.Thread):
logger.info('Started with pid %d', self.collector.pid)
telegraf_output = self.working_dir + '/monitoring.rawdata'
sources = [telegraf_output] + self.custom_sources
for _ in range(10):
self.collector.poll()
@ -236,7 +262,7 @@ class AgentWorker(threading.Thread):
time.sleep(1)
self.drain = Drain(
Consolidator(DataReader(telegraf_output)), self.results)
Consolidator([iter(DataReader(f)) for f in sources]), self.results)
self.drain.start()
self.drain_stdout = Drain(

View File

@ -105,6 +105,7 @@ class ConfigManager(object):
custom = []
startups = []
shutdowns = []
sources = []
# agent defaults
host_config = {}
for metric in host:
@ -134,11 +135,13 @@ class ConfigManager(object):
startups.append(metric.text)
elif (str(metric.tag)).lower() == 'shutdown':
shutdowns.append(metric.text)
elif (str(metric.tag)).lower() == 'source':
sources.append(metric.text)
if len(host_config) == 0:
logging.info('Empty host config, using defaults')
for section in defaults_enabled:
host_config[section] = defaults[section]
return {
result = {
'host_config': host_config,
'port': int(host.get('port', 22)),
'python': host.get('python', '/usr/bin/env python2'),
@ -149,8 +152,11 @@ class ConfigManager(object):
'custom': custom,
'host': hostname,
'startup': startups,
'shutdown': shutdowns
'shutdown': shutdowns,
'source': sources
}
logger.info("Result config %s", result)
return result
class AgentConfig(object):
@ -161,6 +167,7 @@ class AgentConfig(object):
self.custom = config['custom']
self.startups = config['startup']
self.shutdowns = config['shutdown']
self.sources = config['source']
self.interval = config['interval']
self.comment = config['comment']
self.host_config = config['host_config']
@ -193,6 +200,11 @@ class AgentConfig(object):
config.set('shutdown', "cmd%s" % idx, cmd)
for idx, cmd in enumerate(self.shutdowns)
]
config.add_section('source')
[
config.set('source', "file%s" % idx, path)
for idx, path in enumerate(self.sources)
]
with open(cfg_path, 'w') as fds:
config.write(fds)