yandex-tank/Tank/Plugins/Monitoring.py
2013-01-27 21:40:16 +04:00

368 lines
13 KiB
Python

'''Module to provide target monitoring'''
from Tank.MonCollector.collector import MonitoringCollector, \
MonitoringDataListener, MonitoringDataDecoder
from Tank.Plugins.ConsoleOnline import ConsoleOnlinePlugin, AbstractInfoWidget
from Tank.Plugins.Phantom import PhantomPlugin
from tankcore import AbstractPlugin
import os
import time
import traceback
from Tank.Plugins.Autostop import AutostopPlugin, AbstractCriteria
import tankcore
import fnmatch
import datetime
class MonitoringPlugin(AbstractPlugin):
'''
Tank plugin
'''
SECTION = 'monitoring'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
self.jobno = None
self.default_target = None
self.config = None
self.process = None
self.monitoring = MonitoringCollector()
self.die_on_fail = True
self.data_file = None
self.mon_saver = None
self.address_resolver = None
@staticmethod
def get_key():
return __file__
def configure(self):
self.config = self.get_option("config", 'auto').strip()
self.default_target = self.get_option("default_target", 'localhost')
self.monitoring.ssh_timeout = tankcore.expand_to_seconds(self.get_option('ssh_timeout', "5s"))
if self.config == 'none' or self.config == 'auto':
self.die_on_fail = False
else:
if self.config and self.config[0] == '<':
xmlfile = self.core.mkstemp(".xml", "monitoring_")
self.core.add_artifact_file(xmlfile)
xml = open(xmlfile, 'w')
xml.write(self.config)
xml.close()
self.config = xmlfile
if not os.path.exists(self.config):
raise OSError("Monitoring config file not found: %s" % self.config)
if self.config == 'none':
self.monitoring = None
if self.config == 'auto':
self.config = os.path.dirname(__file__) + '/monitoring_default_config.xml'
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
autostop.add_criteria_class(MetricHigherCriteria)
autostop.add_criteria_class(MetricLowerCriteria)
except KeyError:
self.log.debug("No autostop plugin found, not adding instances criteria")
def prepare_test(self):
phantom = None
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
if phantom.phout_import_mode:
self.config = None
info=phantom.get_info()
if info:
self.default_target = info.address
self.log.debug("Changed monitoring target to %s", self.default_target)
except KeyError, ex:
self.log.debug("Phantom plugin not found: %s", ex)
if self.address_resolver:
try:
self.default_target = self.address_resolver.resolve_virtual(self.default_target)
except Exception, exc:
self.log.error("Failed to get target info: %s", exc)
if not self.config or self.config == 'none':
self.log.info("Monitoring has been disabled")
else:
self.log.info("Starting monitoring with config: %s", self.config)
self.core.add_artifact_file(self.config, True)
self.monitoring.config = self.config
if self.default_target:
self.monitoring.default_target = self.default_target
self.data_file = self.core.mkstemp('.data', 'monitoring_')
self.mon_saver = SaveMonToFile(self.data_file)
self.monitoring.add_listener(self.mon_saver)
self.core.add_artifact_file(self.data_file)
try:
console = self.core.get_plugin_of_type(ConsoleOnlinePlugin)
except Exception, ex:
self.log.debug("Console not found: %s", ex)
console = None
if console:
widget = MonitoringWidget(self)
console.add_info_widget(widget)
self.monitoring.add_listener(widget)
try:
self.monitoring.prepare()
self.monitoring.start()
count = 0
while not self.monitoring.first_data_received and count < 15 * 5:
time.sleep(0.2)
self.monitoring.poll()
count += 1
except Exception, exc:
self.log.debug("Problems starting monitoring: %s", traceback.format_exc(exc))
if self.die_on_fail:
raise exc
else:
self.log.warning("Failed to start monitoring: %s", exc)
self.monitoring = None
def is_test_finished(self):
if self.monitoring and not self.monitoring.poll():
if self.die_on_fail:
raise RuntimeError("Monitoring died unexpectedly")
else:
self.log.warn("Monitoring died unexpectedly")
self.monitoring = None
return -1
def end_test(self, retcode):
self.log.info("Finishing monitoring")
if self.monitoring:
self.monitoring.stop()
for log in self.monitoring.artifact_files:
self.core.add_artifact_file(log)
if self.mon_saver:
self.mon_saver.close()
return retcode
class SaveMonToFile(MonitoringDataListener):
'''
Default listener - saves data to file
'''
def __init__(self, out_file):
if out_file:
self.store = open(out_file, 'w')
def monitoring_data(self, data_string):
self.store.write(data_string)
self.store.flush()
def close(self):
''' close open files '''
if self.store:
self.store.close()
class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener, MonitoringDataDecoder):
'''
Screen widget
'''
def __init__(self, owner):
AbstractInfoWidget.__init__(self)
MonitoringDataDecoder.__init__(self)
self.owner = owner
self.data = {}
self.sign = {}
self.time = {}
self.max_metric_len = 0
def get_index(self):
return 50
def __handle_data_item(self, host, data):
''' store metric in data tree and calc offset signs '''
for metric, value in data.iteritems():
if value == '' or value == self.NA:
value = self.NA
self.sign[host][metric] = -1
self.data[host][metric] = value
else:
if self.data[host][metric] == self.NA:
self.sign[host][metric] = 1
elif float(value) > float(self.data[host][metric]):
self.sign[host][metric] = 1
elif float(value) < float(self.data[host][metric]):
self.sign[host][metric] = -1
else:
self.sign[host][metric] = 0
self.data[host][metric] = "%.2f" % float(value)
def monitoring_data(self, data_string):
self.log.debug("Mon widget data: %s", data_string)
for line in data_string.split("\n"):
if not line.strip():
continue
host, data, initial, timestamp = self.decode_line(line)
self.time[host] = timestamp
if initial:
self.sign[host] = {}
self.data[host] = {}
for metric in data.keys():
self.sign[host][metric] = 0
self.data[host][metric] = self.NA
else:
self.__handle_data_item(host, data)
def render(self, screen):
if not self.owner.monitoring:
return "Monitoring is " + screen.markup.RED + "offline" + screen.markup.RESET
else:
res = "Monitoring is " + screen.markup.GREEN + "online" + screen.markup.RESET + ":\n"
for hostname, metrics in self.data.items():
tm=datetime.datetime.fromtimestamp(float(self.time[hostname])).strftime('%H:%M:%S')
res += (" " + screen.markup.CYAN + "%s" + screen.markup.RESET + " at %s:\n") % (hostname, tm)
for metric, value in sorted(metrics.iteritems()):
if self.sign[hostname][metric] > 0:
value = screen.markup.YELLOW + value + screen.markup.RESET
elif self.sign[hostname][metric] < 0:
value = screen.markup.CYAN + value + screen.markup.RESET
res += " %s%s: %s\n" % (' ' * (self.max_metric_len - len(metric)), metric.replace('_', ' '), value)
return res.strip()
class AbstractMetricCriteria(AbstractCriteria, MonitoringDataListener, MonitoringDataDecoder):
''' Parent class for metric criteria '''
def __init__(self, autostop, param_str):
AbstractCriteria.__init__(self)
MonitoringDataDecoder.__init__(self)
try:
self.mon = autostop.core.get_plugin_of_type(MonitoringPlugin)
if self.mon.monitoring:
self.mon.monitoring.add_listener(self)
except KeyError:
self.log.warning("No monitoring module, mon autostop disabled")
self.triggered = False
self.autostop = autostop
self.host = param_str.split(',')[0].strip()
self.metric = param_str.split(',')[1].strip()
self.value_limit = float(param_str.split(',')[2])
self.seconds_limit = tankcore.expand_to_seconds(param_str.split(',')[3])
self.last_second = None
self.seconds_count = 0
def monitoring_data(self, data_string):
if self.triggered:
return
for line in data_string.split("\n"):
if not line.strip():
continue
host, data, initial, timestamp = self.decode_line(line)
if initial or not fnmatch.fnmatch(host, self.host):
return
if self.metric not in data.keys() or not data[self.metric] or data[self.metric] == self.NA:
data[self.metric] = 0
self.log.debug("Compare %s %s/%s=%s to %s", self.get_type_string(), host, self.metric, data[self.metric], self.value_limit)
if self.comparison_fn(float(data[self.metric]), self.value_limit):
if not self.seconds_count:
self.cause_second = self.last_second
self.log.debug(self.explain())
self.seconds_count += 1
if self.seconds_count >= self.seconds_limit:
self.triggered = True
return
else:
self.seconds_count = 0
def notify(self, aggregate_second):
if self.seconds_count:
self.autostop.add_counting(self)
self.last_second = aggregate_second
return self.triggered
def comparison_fn(self, arg1, arg2):
''' comparison function '''
raise NotImplementedError()
class MetricHigherCriteria(AbstractMetricCriteria):
''' trigger if metric is higher than limit '''
def __init__(self, autostop, param_str):
AbstractMetricCriteria.__init__(self, autostop, param_str)
def get_rc(self):
return 31
@staticmethod
def get_type_string():
return 'metric_higher'
def explain(self):
items = (self.host, self.metric, self.value_limit, self.seconds_count)
return "%s/%s metric value is higher than %s for %s seconds" % items
def widget_explain(self):
items = (self.host, self.metric, self.value_limit, self.seconds_count, self.seconds_limit)
return ("%s/%s > %s for %s/%ss" % items, float(self.seconds_count) / self.seconds_limit)
def comparison_fn(self, arg1, arg2):
return arg1 > arg2
class MetricLowerCriteria(AbstractMetricCriteria):
''' trigger if metric is lower than limit '''
def __init__(self, autostop, param_str):
AbstractMetricCriteria.__init__(self, autostop, param_str)
def get_rc(self):
return 32
@staticmethod
def get_type_string():
return 'metric_lower'
def explain(self):
items = (self.host, self.metric, self.value_limit, self.seconds_count)
return "%s/%s metric value is lower than %s for %s seconds" % items
def widget_explain(self):
items = (self.host, self.metric, self.value_limit, self.seconds_count, self.seconds_limit)
return ("%s/%s < %s for %s/%ss" % items, float(self.seconds_count) / self.seconds_limit)
def comparison_fn(self, arg1, arg2):
return arg1 < arg2
class AbstractResolver:
def resolve_virtual(self, virt_address):
raise NotImplementedError()