plugins.Monitoring.collector pythonization

This commit is contained in:
Andrew Grigorev 2015-02-13 00:10:22 +03:00
parent e80da71693
commit dc902a82a2

View File

@ -1,7 +1,8 @@
"""Target monitoring via SSH"""
import ConfigParser
from collections import defaultdict
from xml import etree
from xml.etree import ElementTree as etree
from subprocess import PIPE, Popen
import base64
import logging
@ -19,27 +20,28 @@ import getpass
import yandextank.core as tankcore
logger = logging.getLogger(__name__)
class Config(object):
""" Config reader helper """
"""Config reader helper"""
def __init__(self, config):
self.tree = etree.parse(config)
def loglevel(self):
"""Get log level from config file. Possible values: info, debug"""
log_level = 'info'
log_level_raw = self.tree.xpath('/Monitoring')[0].get('loglevel')
log_level_raw = self.tree.find('./Monitoring').get('loglevel')
if log_level_raw in ('info', 'debug'):
log_level = log_level_raw
return log_level
class SSHWrapper:
""" separate SSH calls to be able to unit test the collector """
"""Separate SSH calls to be able to unit test the collector"""
def __init__(self, timeout):
self.log = logging.getLogger(__name__)
self.ssh_opts = ['-q', '-o', 'StrictHostKeyChecking=no', '-o', 'PasswordAuthentication=no', '-o',
'NumberOfPasswordPrompts=0', '-o', 'ConnectTimeout=' + str(timeout)]
self.scp_opts = []
@ -47,7 +49,7 @@ class SSHWrapper:
self.port = None
def set_host_port(self, host, port, username):
""" Set host and port to use """
"""Set host and port to use"""
self.host = host
self.port = port
self.username = username
@ -55,20 +57,20 @@ class SSHWrapper:
self.ssh_opts = self.ssh_opts + ['-C', '-p', self.port] + ['-l', self.username]
def get_ssh_pipe(self, cmd):
""" Get open ssh pipe """
"""Get open ssh pipe"""
args = ['ssh'] + self.ssh_opts + [self.host] + cmd
self.log.debug('Executing: %s', args)
logger.debug('Executing: %s', args)
return Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, bufsize=0, preexec_fn=os.setsid, close_fds=True)
def get_scp_pipe(self, cmd):
""" Get open scp pipe """
"""Get open scp pipe"""
args = ['scp'] + self.scp_opts + cmd
self.log.debug('Executing: %s', args)
logger.debug('Executing: %s', args)
return Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, bufsize=0, preexec_fn=os.setsid, close_fds=True)
class AgentClient(object):
""" Agent client connection """
"""Agent client connection"""
def __init__(self):
self.run = []
@ -99,7 +101,7 @@ class AgentClient(object):
self.username = getpass.getuser()
def start(self):
""" Start remote agent """
"""Start remote agent"""
logging.debug('Start monitoring: %s', self.host)
if not self.run:
raise ValueError("Empty run string")
@ -109,9 +111,8 @@ class AgentClient(object):
logging.debug("Started: %s", pipe)
return pipe
def create_agent_config(self, loglevel):
""" Creating config """
"""Creating config"""
try:
float(self.interval)
except:
@ -147,7 +148,7 @@ class AgentClient(object):
return self.path['TEMP_CONFIG']
def install(self, loglevel):
""" Create folder and copy agent and metrics scripts to remote host """
"""Create folder and copy agent and metrics scripts to remote host"""
logging.info("Installing monitoring agent at %s@%s...", self.username, self.host)
agent_config = self.create_agent_config(loglevel)
@ -184,7 +185,14 @@ class AgentClient(object):
raise RuntimeError("AgentClient copy exitcode: %s" % pipe.returncode)
# Copy config
cmd = [self.path['TEMP_CONFIG'], self.username+'@'+'['+self.host+']' + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/agent.cfg']
cmd = [
self.path['TEMP_CONFIG'],
'{user}@[{host}]:{dirname}/agent.cfg'.format(
user=self.username,
host=self.host,
dirname=self.path['AGENT_REMOTE_FOLDER']
)
]
logging.debug("[%s] Copy config: %s", cmd, self.host)
pipe = self.ssh.get_scp_pipe(cmd)
@ -202,7 +210,7 @@ class AgentClient(object):
return agent_config
def uninstall(self):
""" Remove agent's files from remote host"""
"""Remove agent's files from remote host"""
fhandle, log_file = tempfile.mkstemp('.log', "agent_" + self.host + "_")
os.close(fhandle)
cmd = [self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + "_agent.log", log_file]
@ -218,10 +226,10 @@ class AgentClient(object):
class MonitoringCollector:
""" Class to aggregate data from several collectors """
"""Aggregate data from several collectors"""
def __init__(self):
self.log = logging.getLogger(__name__)
self.config = None
self.default_target = None
self.agents = []
@ -236,25 +244,24 @@ class MonitoringCollector:
self.filter_mask = defaultdict(str)
self.ssh_timeout = 5
def add_listener(self, obj):
""" Add data line listener """
self.listeners.append(obj)
def prepare(self):
""" Prepare for monitoring - install agents etc"""
"""Prepare for monitoring - install agents etc"""
# Parse config
agent_config = []
if self.config:
[agent_config, self.filter_conf] = self.getconfig(self.config, self.default_target)
self.log.debug("filter_conf: %s", self.filter_conf)
logger.debug("filter_conf: %s", self.filter_conf)
# Filtering
for host in self.filter_conf:
self.filter_mask[host] = []
self.log.debug("Filter mask: %s", self.filter_mask)
logger.debug("Filter mask: %s", self.filter_mask)
# Creating agent for hosts
logging.debug('Creating agents')
@ -282,7 +289,7 @@ class MonitoringCollector:
self.artifact_files.append(agent.install(conf.loglevel()))
def start(self):
""" Start N parallel agents """
"""Start N parallel agents"""
for agent in self.agents:
pipe = agent.start()
self.agent_pipes.append(pipe)
@ -299,9 +306,8 @@ class MonitoringCollector:
logging.debug("Pipes: %s", self.agent_pipes)
def poll(self):
""" Poll agents for data """
"""Poll agents for data"""
readable, writable, exceptional = select.select(self.outputs, self.inputs, self.excepts, 0)
logging.debug("Streams: %s %s %s", readable, writable, exceptional)
@ -327,7 +333,7 @@ class MonitoringCollector:
try:
lines = to_read.read().split("\n")
except IOError:
self.log.debug("No data available")
logger.debug("No data available")
lines = []
for data in lines:
@ -337,20 +343,19 @@ class MonitoringCollector:
if not self.first_data_received and self.send_data:
self.first_data_received = True
self.log.info("Monitoring received first data")
logger.info("Monitoring received first data")
else:
self.send_collected_data()
return len(self.outputs)
def stop(self):
""" Shutdown agents """
"""Shutdown agents"""
logging.debug("Initiating normal finish")
for pipe in self.agent_pipes:
try:
pipe.stdin.write("stop\n")
except IOError, exc:
except IOError as exc:
logging.warn("Problems stopping agent: %s", traceback.format_exc(exc))
time.sleep(1)
@ -375,17 +380,25 @@ class MonitoringCollector:
for agent in self.agents:
self.artifact_files.append(agent.uninstall())
def send_collected_data(self):
""" sends pending data set to listeners """
"""sends pending data set to listeners"""
for listener in self.listeners:
listener.monitoring_data(self.send_data)
self.send_data = ''
# FIXME: a piese of shit and shame to a programmer
def get_host_config(self, default, default_metric, filter_obj, host, names, target_hint):
def get_host_config(self, filter_obj, host, names, target_hint):
default = {
'System': 'csw,int',
'CPU': 'user,system,iowait',
'Memory': 'free,cached,used',
'Disk': 'read,write',
'Net': 'recv,send',
}
default_metric = ['CPU', 'Memory', 'Disk', 'Net']
hostname = host.get('address').lower()
username = host.get('username')
if hostname == '[target]':
if not target_hint:
raise ValueError("Can't use [target] keyword with no target parameter specified")
@ -440,76 +453,49 @@ class MonitoringCollector:
agent_name = self.get_agent_name(metric, elm)
if agent_name:
names[agent_name] = 1
metric = ','.join(names.keys())
tmp = {}
if metric:
tmp.update({'metric': metric})
else:
tmp.update({'metric': 'cpu-stat'})
if host.get('interval'):
tmp.update({'interval': host.get('interval')})
else:
tmp.update({'interval': 1})
return {
'metric': metric or 'cpu-stat',
'interval': host.get('interval', 1),
'priority': host.get('priority', 0),
'port': host.get('port', '22'),
'python': host.get('python', '/usr/bin/env python2'),
'username': host.get('username', getpass.getuser()),
'custom': custom,
'host': hostname,
'startups': startups,
'shutdowns': shutdowns,
if host.get('priority'):
tmp.update({'priority': host.get('priority')})
else:
tmp.update({'priority': 0})
if host.get('port'):
tmp.update({'port': host.get('port')})
else:
tmp.update({'port': '22'})
if host.get('python'):
tmp.update({'python': host.get('python')})
else:
tmp.update({'python': '/usr/bin/env python2'})
if host.get('username'):
tmp.update({'username': host.get('username')})
else:
tmp.update({'username': getpass.getuser()})
tmp.update({'custom': custom})
tmp.update({'host': hostname})
tmp.update({'startups': startups})
tmp.update({'shutdowns': shutdowns})
filter_obj[hostname] = stats
return tmp
def getconfig(self, filename, target_hint):
""" Prepare config data"""
default = {
'System': 'csw,int',
'CPU': 'user,system,iowait',
'Memory': 'free,cached,used',
'Disk': 'read,write',
'Net': 'recv,send',
# XXX: should be separate?
'stats': {hostname: stats},
}
default_metric = ['CPU', 'Memory', 'Disk', 'Net']
def getconfig(self, filename, target_hint):
"""Prepare config data"""
try:
tree = etree.parse(filename)
except IOError, exc:
except IOError as exc:
logging.error("Error loading config: %s", exc)
raise RuntimeError("Can't read monitoring config %s" % filename)
hosts = tree.xpath('/Monitoring/Host')
hosts = tree.findall('./Monitoring/Host')
names = defaultdict()
config = []
filter_obj = defaultdict(str)
for host in hosts:
host_config = self.get_host_config(default, default_metric, filter_obj, host, names, target_hint)
host_config = self.get_host_config(host, names, target_hint)
# XXX: why stats should be separated?
filter_obj.update(host_config.pop('stats'))
config.append(host_config)
return [config, filter_obj]
def filtering(self, mask, filter_list):
""" Filtering helper """
"""Filtering helper"""
host = filter_list[0]
initial = [0, 1]
res = []
@ -519,22 +505,22 @@ class MonitoringCollector:
try:
res.append(filter_list[key])
except IndexError:
self.log.warn("Problems filtering data: %s with %s", mask, len(filter_list))
logger.warn("Problems filtering data: %s with %s", mask, len(filter_list))
return None
return ';'.join(res)
def filter_unused_data(self, filter_conf, filter_mask, data):
""" Filter unselected metrics from data """
self.log.debug("Filtering data: %s", data)
"""Filter unselected metrics from data"""
logger.debug("Filtering data: %s", data)
out = ''
# Filtering data
keys = data.rstrip().split(';')
if re.match('^start;', data): # make filter_conf mask
host = keys[1]
for i in xrange(3, len(keys)):
for i in range(3, len(keys)):
if keys[i] in filter_conf[host]:
filter_mask[host].append(i - 1)
self.log.debug("Filter mask: %s", filter_mask)
logger.debug("Filter mask: %s", filter_mask)
out = 'start;'
out += self.filtering(filter_mask, keys[1:]).rstrip(';') + '\n'
elif re.match('^\[debug\]', data): # log debug output
@ -592,18 +578,18 @@ class MonitoringCollector:
class MonitoringDataListener:
""" Parent class for data listeners """
"""Parent class for data listeners"""
def __init__(self):
pass
def monitoring_data(self, data_string):
""" Notification about new monitoring data lines """
"""Notification about new monitoring data lines"""
raise NotImplementedError()
class StdOutPrintMon(MonitoringDataListener):
""" Simple listener, writing data to stdout """
"""Simple listener, writing data to stdout"""
def __init__(self):
MonitoringDataListener.__init__(self)
@ -613,15 +599,14 @@ class StdOutPrintMon(MonitoringDataListener):
class MonitoringDataDecoder:
""" The class that serves converting monitoring data lines to dict """
"""The class that serves converting monitoring data lines to dict"""
NA = 'n/a'
def __init__(self):
self.metrics = {}
self.log = logging.getLogger()
def decode_line(self, line):
""" convert mon line to dict """
"""convert mon line to dict"""
is_initial = False
data_dict = {}
data = line.strip().split(';')
@ -653,7 +638,7 @@ class MonitoringDataDecoder:
for metric in self.metrics[host]:
data_dict[metric] = data.pop(0)
self.log.debug("Decoded data %s: %s", host, data_dict)
logger.debug("Decoded data %s: %s", host, data_dict)
return host, data_dict, is_initial, timestamp