Refactoring monitoring

This commit is contained in:
Andrey Pohilko 2012-09-13 21:45:47 +04:00
parent 12d788017d
commit aa5ab1ea20
3 changed files with 162 additions and 142 deletions

View File

@ -32,7 +32,7 @@ class Config(object):
class AgentClient(object):
def __init__(self, **kwargs):
self.run = []
setattr(self, 'port', '22')
self.port = 22
for key, value in kwargs.iteritems():
setattr(self, key, value)
@ -42,8 +42,8 @@ class AgentClient(object):
'AGENT_REMOTE_FOLDER': '/var/tmp/lunapark_monitoring',
# Source path on tank
'AGENT_LOCAL_FOLDER': '/usr/lib/yandex/load-monitoring/agent/',
'METRIC_LOCAL_FOLDER': '/usr/lib/yandex/load-monitoring/agent/metric',
'AGENT_LOCAL_FOLDER': os.path.dirname(__file__) + '/agent/',
'METRIC_LOCAL_FOLDER': os.path.dirname(__file__) + '/agent/metric',
# Temp config path
'TEMP_CONFIG': temp_config[1]
@ -162,115 +162,7 @@ class AgentClient(object):
remove = Popen(cmd, stdout=PIPE, bufsize=0)
remove.wait()
def group_op(command, agents, loglevel):
""" Group install and uninstall for list of agents"""
logging.debug("Group operation %s on agents: %s", command, agents);
if command == 'uninstall':
for agent in agents:
logging.debug('Uninstall monitoring agent. Host: %s' % agent.host)
agent.uninstall()
logging.info("Remove: %s" % agent.path['TEMP_CONFIG'])
if os.path.isfile(agent.path['TEMP_CONFIG']):
logging.warning("Seems uninstall failed to remove %s", agent.path['TEMP_CONFIG'])
os.remove(agent.path['TEMP_CONFIG'])
if command == 'install':
for agent in agents:
logging.debug('Install monitoring agent. Host: %s' % agent.host)
if agent.install(loglevel):
logging.debug("[%s] Cannot install. Remove: %s" % (agent.host, agent))
logging.info("Remove: %s" % agent.path['TEMP_CONFIG'])
if os.path.isfile(agent.path['TEMP_CONFIG']):
os.remove(agent.path['TEMP_CONFIG'])
agents.remove(agent)
def filtering(mask, list):
host = list[0]
initial = [0, 1]
out = ''
# print "mask: %s" % mask
# print "list: %s " % list
res = []
if mask[host]:
keys = initial + mask[host]
for key in keys:
res.append(list[key])
# print "key: %s, value: -%s-" % (key, list[key])
out += list[key] + ';'
# print "res: %s" % res
# print res
# print join(res, ";")
return join(res, ";")
def filter_unused_data(filter_conf, filter_mask, data):
out = ''
# Filtering data
keys = data.rstrip().split(';')
if re.match('^start;', data): # make filter_conf mask
host = keys[1]
for i in xrange(3, len(keys)):
if keys[i] in filter_conf[host]:
filter_mask[host].append(i - 1)
out = 'start;'
out += filtering(filter_mask, keys[1:]).rstrip(';') + '\n'
elif re.match('^\[debug\]', data): # log debug output
logging.debug('agent debug: %s' % data.rstrip())
else:
filtered = filtering(filter_mask, keys)
if filtered:
out = filtered + '\n' # filtering values
# filtered = filtering(filter_mask, keys).rstrip(';')
return out
def get_agent_name(metric, param):
depend = {
'CPU': {
'idle': 'cpu-stat',
'user': 'cpu-stat',
'system': 'cpu-stat',
'iowait': 'cpu-stat',
'nice': 'cpu-stat'
},
'System': {
'la1': 'cpu-la',
'la5': 'cpu-la',
'la15': 'cpu-la',
'csw': 'cpu-stat',
'int': 'cpu-stat',
'numproc': 'cpu-stat',
'numthreads': 'cpu-stat',
},
'Memory': {
'free': 'mem',
'used': 'mem',
'cached': 'mem',
'buff': 'mem',
},
'Disk': {
'read': 'disk',
'write': 'disk',
},
'Net': {
'recv': 'net',
'send': 'net',
'tx': 'net-tx-rx',
'rx': 'net-tx-rx',
'retransmit': 'net-retrans',
'estab': 'net-tcp',
'closewait': 'net-tcp',
'timewait': 'net-tcp',
}
}
if depend[metric][param]:
return depend[metric][param]
else:
return ''
class MonitoringSender:
jobno = None
# TODO: move it to data uploader
def send_data(self, send_data):
''' Handle HTTP data send'''
@ -336,68 +228,71 @@ class MonitoringSender:
class MonitoringCollector:
def __init__(self, config, out_file):
self.reported_ok = 0
self.log=logging.getLogger(__name__)
self.config = config
self.out_file = out_file
self.default_target = None
self.agents = []
self.agent_pipes = []
self.filter_conf = {}
def main(self):
def prepare(self):
# Defining local storage
self.store = sys.stdout
if self.out_file:
self.store = open(self.out_file, 'w')
# Agents list
agents = []
# Parse config
agent_config = []
filter_conf = {}
if self.config:
[agent_config, filter_conf] = self.getconfig(self.config, self.default_target)
[agent_config, self.filter_conf] = self.getconfig(self.config, self.default_target)
self.log.debug("filter_conf: %s", self.filter_conf)
conf = Config(self.config)
logging.info('Logging level: %s' % conf.loglevel())
# Filtering
filter_mask = defaultdict(str)
for host in filter_conf:
filter_mask[host] = []
self.filter_mask = defaultdict(str)
for host in self.filter_conf:
self.filter_mask[host] = []
self.log.debug("Filter mask: %s", self.filter_mask)
# Creating agent for hosts
logging.debug('Creating agents')
for adr in agent_config:
logging.debug('Creating agent: %s' % adr)
agent = AgentClient(**adr)
agents.append(agent)
self.agents.append(agent)
# Mass agents install
logging.debug("Agents: %s" % agents)
agents_cnt_before = len(agents)
group_op('install', agents, conf.loglevel())
logging.debug("Agents: %s" % self.agents)
agents_cnt_before = len(self.agents)
self.group_op('install', self.agents, conf.loglevel())
if len(agents) != agents_cnt_before:
if len(self.agents) != agents_cnt_before:
logging.warn("Some targets can't be monitored")
else:
logging.info("All agents installed OK")
# Nothing installed
if not agents:
if not self.agents:
raise RuntimeError("No agents was installed. Stop monitoring.")
def start(self):
self.inputs, self.outputs, self.excepts = [], [], []
agent_pipes = []
# Start N parallel agents
for a in agents:
for a in self.agents:
pipe = a.start()
agent_pipes.append(pipe)
self.agent_pipes.append(pipe)
self.outputs.append(pipe.stdout)
self.excepts.append(pipe.stderr)
logging.debug("Pipes: %s", agent_pipes)
logging.debug("Pipes: %s", self.agent_pipes)
def poll(self):
send_data=''
send_data = ''
readable, writable, exceptional = select.select(self.outputs, self.inputs, self.excepts, 0)
logging.debug("Streams: %s %s %s", readable, writable, exceptional)
@ -424,14 +319,13 @@ class MonitoringCollector:
continue
logging.debug("Got data: %s", data.strip())
send_data += filter_unused_data(self.filter_conf, self.filter_mask, data)
self.store.write(self.send_data)
send_data += self.filter_unused_data(self.filter_conf, self.filter_mask, data)
# TODO: make store one more data listener
self.store.write(send_data)
self.store.flush()
#TODO: notify liseners
#TODO: notify liseners
return len(self.outputs)
def stop(self):
@ -439,7 +333,7 @@ class MonitoringCollector:
for pipe in self.agent_pipes:
logging.debug("Killing %s with %s", pipe.pid, signal.SIGINT)
os.kill(pipe.pid, signal.SIGINT)
group_op('uninstall', self.agents, '')
self.group_op('uninstall', self.agents, '')
def getconfig(self, filename, target_hint):
default = {
@ -485,7 +379,7 @@ class MonitoringCollector:
continue;
stat = "%s_%s" % (metric.tag, el)
stats.append(stat)
agent_name = get_agent_name(metric.tag, el)
agent_name = self.get_agent_name(metric.tag, el)
if agent_name:
names[agent_name] = 1
# custom metric ('call' and 'tail' methods)
@ -510,7 +404,7 @@ class MonitoringCollector:
for el in m:
stat = "%s_%s" % (metric, el)
stats.append(stat)
agent_name = get_agent_name(metric, el)
agent_name = self.get_agent_name(metric, el)
if agent_name:
names[agent_name] = 1
@ -551,3 +445,111 @@ class MonitoringCollector:
return [config, filter]
def group_op(self, command, agents, loglevel):
""" Group install and uninstall for list of agents"""
logging.debug("Group operation %s on agents: %s", command, agents);
if command == 'uninstall':
for agent in agents:
logging.debug('Uninstall monitoring agent. Host: %s' % agent.host)
agent.uninstall()
logging.info("Remove: %s" % agent.path['TEMP_CONFIG'])
if os.path.isfile(agent.path['TEMP_CONFIG']):
logging.warning("Seems uninstall failed to remove %s", agent.path['TEMP_CONFIG'])
os.remove(agent.path['TEMP_CONFIG'])
if command == 'install':
for agent in agents:
logging.debug('Install monitoring agent. Host: %s' % agent.host)
if agent.install(loglevel):
logging.debug("[%s] Cannot install. Remove: %s" % (agent.host, agent))
logging.info("Remove: %s" % agent.path['TEMP_CONFIG'])
if os.path.isfile(agent.path['TEMP_CONFIG']):
os.remove(agent.path['TEMP_CONFIG'])
agents.remove(agent)
def filtering(self, mask, filter_list):
host = filter_list[0]
initial = [0, 1]
out = ''
# print "mask: %s" % mask
# print "filter_list: %s " % filter_list
res = []
if mask[host]:
keys = initial + mask[host]
for key in keys:
res.append(filter_list[key])
# print "key: %s, value: -%s-" % (key, filter_list[key])
out += filter_list[key] + ';'
# print "res: %s" % res
# print res
# print join(res, ";")
return join(res, ";")
def filter_unused_data(self, filter_conf, filter_mask, data):
out = ''
# Filtering data
keys = data.rstrip().split(';')
if re.match('^start;', data): # make filter_conf mask
host = keys[1]
for i in xrange(3, len(keys)):
if keys[i] in filter_conf[host]:
filter_mask[host].append(i - 1)
out = 'start;'
out += self.filtering(filter_mask, keys[1:]).rstrip(';') + '\n'
elif re.match('^\[debug\]', data): # log debug output
logging.debug('agent debug: %s' % data.rstrip())
else:
filtered = self.filtering(filter_mask, keys)
if filtered:
out = filtered + '\n' # filtering values
# filtered = filtering(filter_mask, keys).rstrip(';')
return out
def get_agent_name(self, metric, param):
depend = {
'CPU': {
'idle': 'cpu-stat',
'user': 'cpu-stat',
'system': 'cpu-stat',
'iowait': 'cpu-stat',
'nice': 'cpu-stat'
},
'System': {
'la1': 'cpu-la',
'la5': 'cpu-la',
'la15': 'cpu-la',
'csw': 'cpu-stat',
'int': 'cpu-stat',
'numproc': 'cpu-stat',
'numthreads': 'cpu-stat',
},
'Memory': {
'free': 'mem',
'used': 'mem',
'cached': 'mem',
'buff': 'mem',
},
'Disk': {
'read': 'disk',
'write': 'disk',
},
'Net': {
'recv': 'net',
'send': 'net',
'tx': 'net-tx-rx',
'rx': 'net-tx-rx',
'retransmit': 'net-retrans',
'estab': 'net-tcp',
'closewait': 'net-tcp',
'timewait': 'net-tcp',
}
}
if depend[metric][param]:
return depend[metric][param]
else:
return ''

View File

@ -0,0 +1,18 @@
from Monitoring.collector import MonitoringCollector
from Tests.TankTests import TankTestCase
import tempfile
import time
class MonitoringCollectorTestCase(TankTestCase):
data = None
def test_regular(self):
mon = MonitoringCollector("config/mon1.conf", tempfile.mkdtemp()[1])
mon.prepare()
mon.start()
mon.poll()
time.sleep(1)
mon.poll()
time.sleep(1)
mon.poll()
mon.stop()