Fix mon collector after refactoring

Implement stop from web
This commit is contained in:
Andrey Pohilko 2012-09-17 13:24:27 +04:00
parent 6ded08e936
commit eae9fb9d87
8 changed files with 120 additions and 95 deletions

View File

@ -5,7 +5,6 @@ from subprocess import PIPE, Popen
import base64
import logging
import os.path
import pwd
import re
import select
import signal
@ -13,8 +12,6 @@ import sys
import tempfile
import time
# FIXME: don't put agent logs in current dir, place somewhere else
# FIXME: synchronize times between agent and collector better
class Config(object):
def __init__(self, config):
@ -29,12 +26,39 @@ class Config(object):
log_level = log_level_raw
return log_level
class SSHWrapper:
'''
separate SSH calls to be able to unit test the collector
'''
def __init__(self):
self.log=logging.getLogger(__name__)
self.ssh_opts = ['-q', '-o', 'StrictHostKeyChecking=no', '-o', 'PasswordAuthentication=no', '-o', 'NumberOfPasswordPrompts=0', '-o', 'ConnectTimeout=5']
self.host = None
self.port = None
def set_host_port(self, host, port):
self.host = host
self.port = port
self.scp_opts = self.ssh_opts + ['-P', self.port]
self.ssh_opts = self.ssh_opts + ['-p', self.port]
def get_ssh_pipe(self, cmd):
args = ['ssh'] + self.ssh_opts + [self.host] + cmd
self.log.debug('Executing: %s', args)
return Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, bufsize=0, preexec_fn=os.setsid)
def get_scp_pipe(self, cmd):
args = ['scp'] + self.scp_opts + cmd
self.log.debug('Executing: %s', args)
return Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, bufsize=0, preexec_fn=os.setsid)
class AgentClient(object):
def __init__(self, **kwargs):
self.run = []
self.port = 22
for key, value in kwargs.iteritems():
setattr(self, key, value)
self.ssh = None
temp_config = tempfile.mkstemp('.cfg', 'agent_')
self.path = {
@ -49,19 +73,13 @@ class AgentClient(object):
'TEMP_CONFIG': temp_config[1]
}
self.ssh_opts = ['-q', '-o', 'StrictHostKeyChecking=no', '-o', 'PasswordAuthentication=no', '-o', 'NumberOfPasswordPrompts=0', '-o', 'ConnectTimeout=5']
user_id = pwd.getpwuid(os.getuid())[0]
if (user_id == 'lunapark'):
self.ssh_opts = ['-i', '/home/lunapark/.ssh/id_dsa'] + self.ssh_opts
self.scp_opts = self.ssh_opts + ['-P', self.port]
self.ssh_opts = self.ssh_opts + ['-p', self.port]
def start(self):
logging.debug('Start monitoring: %s' % self.host)
if not self.run:
raise ValueError("Empty run string")
self.run += ['-t', str(int(time.time()))]
logging.debug(self.run)
pipe = Popen(self.run, stdin=PIPE, stdout=PIPE, stderr=PIPE)
pipe = self.ssh.get_ssh_pipe(self.run)
logging.debug("Started: %s", pipe)
return pipe
@ -84,93 +102,91 @@ class AgentClient(object):
""" Create folder and copy agent and metrics scripts to remote host """
logging.info("Installing monitoring agent at %s...", self.host)
self.create_agent_config(loglevel)
self.ssh.set_host_port(self.host, self.port)
# getting remote temp dir
cmd = ['ssh'] + self.ssh_opts + [self.host, self.python + ' -c "import tempfile; print tempfile.mkdtemp();"']
cmd = [self.python + ' -c "import tempfile; print tempfile.mkdtemp();"']
logging.debug("Get remote temp dir: %s", cmd)
pipe = Popen(cmd, stdout=PIPE, stderr=PIPE, bufsize=0)
pipe = self.ssh.get_ssh_pipe(cmd)
err = pipe.stderr.read().strip()
if err:
logging.error("[%s] ssh error: '%s'" % (self.host, err))
return 1
raise RuntimeError("[%s] ssh error: '%s'" % (self.host, err))
pipe.wait()
logging.debug("Return code [%s]: %s" % (self.host, pipe.returncode))
if pipe.returncode:
return 1
raise RuntimeError("Return code [%s]: %s" % (self.host, pipe.returncode))
# TODO: make less scp/ssh commands, install agent faster
remote_dir = pipe.stdout.read().strip()
if (remote_dir):
self.path['AGENT_REMOTE_FOLDER'] = remote_dir
logging.debug("Remote dir at %s:%s", self.host, self.path['AGENT_REMOTE_FOLDER']);
# Copy agent
cmd = ['scp'] + self.scp_opts + [self.path['AGENT_LOCAL_FOLDER'] + 'agent.py', self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/agent.py']
cmd = [self.path['AGENT_LOCAL_FOLDER'] + 'agent.py', self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/agent.py']
logging.debug("Copy agent to %s: %s" % (self.host, cmd))
pipe = Popen(cmd, stdout=PIPE, bufsize=0)
pipe = self.ssh.get_scp_pipe(cmd)
pipe.wait()
logging.debug("AgentClient copy exitcode: %s", pipe.returncode)
if pipe.returncode != 0:
logging.error("AgentClient copy exitcode: %s", pipe.returncode)
return pipe.returncode
raise RuntimeError("AgentClient copy exitcode: %s" % pipe.returncode)
# Copy config
cmd = ['scp'] + self.scp_opts + [self.path['TEMP_CONFIG'], self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/agent.cfg']
cmd = [self.path['TEMP_CONFIG'], self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/agent.cfg']
logging.debug("[%s] Copy config: %s" % (cmd, self.host))
pipe = Popen(cmd, stdout=PIPE, bufsize=0)
pipe = self.ssh.get_scp_pipe(cmd)
pipe.wait()
logging.debug("AgentClient copy config exitcode: %s", pipe.returncode)
if pipe.returncode != 0:
logging.error("AgentClient copy config exitcode: %s", pipe.returncode)
return pipe.returncode
raise RuntimeError("AgentClient copy config exitcode: %s" % pipe.returncode)
# Copy metric
cmd = ['scp'] + self.scp_opts + ['-r', self.path['METRIC_LOCAL_FOLDER'], self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/']
cmd = ['-r', self.path['METRIC_LOCAL_FOLDER'], self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + '/']
logging.debug("[%s] Copy metric: %s" % (cmd, self.host))
pipe = Popen(cmd, stdout=PIPE, bufsize=0)
pipe = self.ssh.get_scp_pipe(cmd)
pipe.wait()
logging.debug("Metrics copy exitcode: %s", pipe.returncode)
if pipe.returncode != 0:
logging.error("Metrics copy exitcode: %s", pipe.returncode)
return pipe.returncode
raise RuntimeError("Metrics copy exitcode: %s" % pipe.returncode)
if os.getenv("DEBUG"):
debug = "DEBUG=1"
else:
debug = ""
self.run = ['ssh'] + self.ssh_opts + [self.host, '/usr/bin/env', debug, self.python, self.path['AGENT_REMOTE_FOLDER'] + '/agent.py', '-c', self.path['AGENT_REMOTE_FOLDER'] + '/agent.cfg']
return 0
self.run = ['/usr/bin/env', debug, self.python, self.path['AGENT_REMOTE_FOLDER'] + '/agent.py', '-c', self.path['AGENT_REMOTE_FOLDER'] + '/agent.cfg']
def uninstall(self):
""" Remove agent's files from remote host"""
cmd = ['scp'] + self.scp_opts + [self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + "_agent.log", "monitoring_agent_" + self.host + ".log"]
# TODO: copy angent to temp dir and add it as artifact
cmd = [self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + "_agent.log", "monitoring_agent_" + self.host + ".log"]
logging.debug("Copy agent log from %s: %s" % (self.host, cmd))
remove = Popen(cmd, stdout=PIPE, bufsize=0)
remove = self.ssh.get_scp_pipe(cmd)
remove.wait()
logging.info("Removing agent from: %s..." % self.host)
if os.path.isfile(self.path['TEMP_CONFIG']):
os.remove(self.path['TEMP_CONFIG'])
cmd = ['ssh'] + self.ssh_opts + [self.host, 'rm', '-r', self.path['AGENT_REMOTE_FOLDER']]
cmd = ['rm', '-r', self.path['AGENT_REMOTE_FOLDER']]
logging.debug("Uninstall agent from %s: %s" % (self.host, cmd))
remove = Popen(cmd, stdout=PIPE, bufsize=0)
remove = self.ssh.get_ssh_pipe(cmd)
remove.wait()
class MonitoringCollector:
def __init__(self, config):
def __init__(self):
self.log = logging.getLogger(__name__)
self.config = config
self.config = None
self.default_target = None
self.agents = []
self.agent_pipes = []
self.filter_conf = {}
self.listeners = []
self.ssh_wrapper_class = SSHWrapper
def add_listener(self, obj):
self.listeners.append(obj)
@ -195,22 +211,15 @@ class MonitoringCollector:
for adr in agent_config:
logging.debug('Creating agent: %s' % adr)
agent = AgentClient(**adr)
agent.ssh = self.ssh_wrapper_class()
self.agents.append(agent)
# Mass agents install
logging.debug("Agents: %s" % self.agents)
agents_cnt_before = len(self.agents)
self.group_op('install', self.agents, conf.loglevel())
if len(self.agents) != agents_cnt_before:
logging.warn("Some targets can't be monitored")
else:
logging.info("All agents installed OK")
# Nothing installed
if not self.agents:
raise RuntimeError("No agents was installed. Stop monitoring.")
for agent in self.agents:
logging.debug('Install monitoring agent. Host: %s' % agent.host)
agent.install(conf.loglevel())
def start(self):
self.inputs, self.outputs, self.excepts = [], [], []
@ -261,8 +270,9 @@ class MonitoringCollector:
def stop(self):
logging.debug("Initiating normal finish")
for pipe in self.agent_pipes:
logging.debug("Killing %s with %s", pipe.pid, signal.SIGINT)
os.kill(pipe.pid, signal.SIGINT)
if pipe.pid:
logging.debug("Killing %s with %s", pipe.pid, signal.SIGINT)
os.kill(pipe.pid, signal.SIGINT)
self.group_op('uninstall', self.agents, '')
def getconfig(self, filename, target_hint):
@ -387,15 +397,6 @@ class MonitoringCollector:
if os.path.isfile(agent.path['TEMP_CONFIG']):
logging.warning("Seems uninstall failed to remove %s", agent.path['TEMP_CONFIG'])
os.remove(agent.path['TEMP_CONFIG'])
if command == 'install':
for agent in agents:
logging.debug('Install monitoring agent. Host: %s' % agent.host)
if agent.install(loglevel):
logging.debug("[%s] Cannot install. Remove: %s" % (agent.host, agent))
logging.debug("Remove: %s" % agent.path['TEMP_CONFIG'])
if os.path.isfile(agent.path['TEMP_CONFIG']):
os.remove(agent.path['TEMP_CONFIG'])
agents.remove(agent)
def filtering(self, mask, filter_list):
host = filter_list[0]

View File

@ -6,6 +6,8 @@ from Tank.Plugins.Phantom import PhantomPlugin
import logging
import re
# TODO: add quantile criteria
# TODO: find solution for melfa's case
class AutostopPlugin(AbstractPlugin, AggregateResultListener):
SECTION = 'autostop'
@ -382,3 +384,4 @@ class AutostopWidget(AbstractInfoWidget):
return "Autostop:\n " + ("\n ".join(res))
else:
return ''

View File

@ -22,6 +22,7 @@ class DataUploaderPlugin(AbstractPlugin, AggregateResultListener, MonitoringData
API Client class for Yandex KSHM web service
'''
SECTION = 'meta'
RC_STOP_FROM_WEB = 8
def __init__(self, core):
AbstractPlugin.__init__(self, core)
@ -30,6 +31,7 @@ class DataUploaderPlugin(AbstractPlugin, AggregateResultListener, MonitoringData
self.logs_basedir = None
self.operator = pwd.getpwuid(os.geteuid())[0]
self.task_name = ''
self.rc = -1
@staticmethod
def get_key():
@ -69,7 +71,7 @@ class DataUploaderPlugin(AbstractPlugin, AggregateResultListener, MonitoringData
while cwd:
self.log.debug("Checking if dir is named like JIRA issue: %s", cwd)
if issue.match(os.path.basename(cwd)):
res=re.search(issue, os.path.basename(cwd))
res = re.search(issue, os.path.basename(cwd))
self.task = res.group(0)
return
@ -144,6 +146,9 @@ class DataUploaderPlugin(AbstractPlugin, AggregateResultListener, MonitoringData
self.version_tested, self.is_regression, self.regression_component,
tank_type, " ".join(sys.argv), 0)
def is_test_finished(self):
return self.rc
def end_test(self, retcode):
return retcode
@ -175,7 +180,9 @@ class DataUploaderPlugin(AbstractPlugin, AggregateResultListener, MonitoringData
"""
@second_aggregate_data: SecondAggregateData
"""
self.api_client.push_test_data(self.jobno, second_aggregate_data)
if not self.api_client.push_test_data(self.jobno, second_aggregate_data):
self.log.warn("The test was stopped from Web interface")
self.rc = self.RC_STOP_FROM_WEB
def get_sla_by_task(self):
return self.api_client.get_sla_by_task(self.regression_component)
@ -322,13 +329,16 @@ class KSHMAPIClient():
case_data = self.second_data_to_push_item(case)
self.post(uri, case_data)
overall = self.second_data_to_push_item(data.overall)
res = [{'success': 0}]
try:
self.post(uri, overall)
res = self.post(uri, overall)
except Exception, e:
self.log.warn("Failed to push second data to API, retry in 30 sec: %s", e)
time.sleep(30)
self.post(uri, overall)
res = self.post(uri, overall)
return int(res[0]['success'])
def get_sla_by_task(self, component):
if not component or component == '0':

View File

@ -5,7 +5,6 @@ import tempfile
from MonCollector.collector import MonitoringCollector, MonitoringDataListener
from Tank.Plugins.ConsoleOnline import ConsoleOnlinePlugin, AbstractInfoWidget
import copy
import math
import base64
# TODO: wait for first monitoring data
@ -18,7 +17,7 @@ class MonitoringPlugin(AbstractPlugin):
self.default_target = None
self.config = None
self.process = None
self.monitoring = None
self.monitoring = MonitoringCollector()
@staticmethod
def get_key():
@ -47,7 +46,7 @@ class MonitoringPlugin(AbstractPlugin):
self.log.info("Monitoring has been disabled")
else:
self.log.info("Starting monitoring with config: %s", self.config)
self.monitoring = MonitoringCollector(self.config)
self.monitoring.config = self.config
if self.default_target:
self.monitoring.default_target = self.default_target
@ -149,9 +148,9 @@ class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener):
if self.data[host][metric] == self.NA:
self.sign[host][metric] = 1
else:
if float(value)>float(self.data[host][metric]):
if float(value) > float(self.data[host][metric]):
self.sign[host][metric] = 1
elif float(value)<float(self.data[host][metric]):
elif float(value) < float(self.data[host][metric]):
self.sign[host][metric] = -1
else:
self.sign[host][metric] = 0
@ -164,7 +163,7 @@ class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener):
else:
res = "Monitoring is " + screen.markup.WHITE + "online" + screen.markup.RESET + ":\n"
for hostname, metrics in self.data.items():
res += (" "+screen.markup.CYAN+"%s"+screen.markup.RESET+":\n") % hostname
res += (" " + screen.markup.CYAN + "%s" + screen.markup.RESET + ":\n") % hostname
for metric, value in sorted(metrics.iteritems()):
if self.sign[hostname][metric] > 0:
value = screen.markup.GREEN + value + screen.markup.RESET

View File

@ -1,7 +1,8 @@
from Tests.TankTests import TankTestCase
import tempfile
import time
from MonCollector.collector import MonitoringCollector, MonitoringDataListener
from MonCollector.collector import MonitoringCollector, MonitoringDataListener,\
SSHWrapper
from Tank.Plugins.Monitoring import MonitoringPlugin, MonitoringWidget
from Tank.Core import TankCore
import logging
@ -12,30 +13,14 @@ class MonitoringCollectorTestCase(TankTestCase):
data = None
def test_collector(self):
mon = MonitoringCollector("config/mon1.conf")
mon = MonitoringCollector()
mon.config="config/mon1.conf"
mon.ssh_wrapper_class=SSHEmulator
listener = TestMonListener()
mon.add_listener(listener)
mon.prepare()
mon.start()
mon.poll()
self.assertEquals([], listener.data)
listener.data = []
time.sleep(1)
mon.poll()
self.assertNotEquals([], listener.data)
self.assertTrue(listener.data[0].startswith('start;'))
listener.data = []
time.sleep(2)
mon.poll()
self.assertNotEquals([], listener.data)
self.assertFalse(listener.data[0].startswith('start;'))
listener.data = []
time.sleep(3)
mon.poll()
self.assertNotEquals([], listener.data)
self.assertFalse(listener.data[0].startswith('start;'))
listener.data = []
mon.stop()
def test_plugin_disabled(self):
core = TankCore()
@ -59,6 +44,7 @@ class MonitoringCollectorTestCase(TankTestCase):
core.plugins_prepare_test()
mon = MonitoringPlugin(core)
mon.configure()
mon.monitoring.ssh_wrapper_class=SSHEmulator
mon.prepare_test()
mon.start_test()
self.assertEquals(-1, mon.is_test_finished())
@ -75,6 +61,7 @@ class MonitoringCollectorTestCase(TankTestCase):
core.plugins_configure()
core.plugins_prepare_test()
mon = MonitoringPlugin(core)
mon.monitoring.ssh_wrapper_class=SSHEmulator
core.set_option(mon.SECTION, 'config', "config/mon1.conf")
mon.configure()
mon.prepare_test()
@ -112,3 +99,29 @@ class TestMonListener(MonitoringDataListener):
def monitoring_data(self, data_string):
logging.debug("MON DATA: %s", data_string)
self.data.append(data_string)
class SSHEmulator(SSHWrapper):
def __init__(self):
SSHWrapper.__init__(self)
self.scp_pipe = PipeEmul('data/ssh_out.txt', 'data/ssh_err.txt')
self.ssh_pipe = PipeEmul('data/ssh_out.txt', 'data/ssh_err.txt')
def get_scp_pipe(self, cmd):
return self.scp_pipe
def get_ssh_pipe(self, cmd):
return self.ssh_pipe
class PipeEmul:
def __init__(self, out, err):
self.stderr=open(err)
self.stdout=open(out)
self.returncode=0
self.pid=0
def wait(self):
pass

View File

@ -33,12 +33,9 @@ class Custom_TestCase(unittest.TestCase):
self.assertNotEquals(["0.0"], x)
time.sleep(1)
y = self.foo.check()
print y
assert x < y;
time.sleep(0.5)
z = self.foo.check()
print z
assert z > y;
def test_custom_fail(self):
custom_config = {'tail': [], 'call': ['cXVlcnkgY291bnQ=:cXVlcnlfY2xhc3NpZnlfY2xpZW50IGZzdGF0cyB8IGdyZXAgY2xhc3MtY21kIHwgY3V0IC1mIDM=:1']}

0
Tests/data/ssh_err.txt Normal file
View File

2
Tests/data/ssh_out.txt Normal file
View File

@ -0,0 +1,2 @@
start;127.0.0.1;1347631472;Memory_total;Memory_used;Memory_free;Memory_shared;Memory_buff;Memory_cached;Net_recv;Net_send;Disk_read;Disk_write;System_csw;System_int;CPU_user;CPU_nice;CPU_system;CPU_idle;CPU_iowait;CPU_irq;CPU_softirq;System_numproc;System_numthreads
127.0.0.1;1347631473;1507.65625;576.9609375;8055;1518;0;143360;34.9775784753;16.1434977578;0.0