Fix fd leaking

This commit is contained in:
Andrey Pohilko 2012-10-22 15:11:19 +04:00
parent 312f72b1cf
commit 14b11e4df4
11 changed files with 85 additions and 34 deletions

View File

@ -149,6 +149,7 @@ class ConsoleTank:
self.log.debug("Creating corrected INI config for it: %s", corrected_file)
os.write(file_handle, "[" + self.MIGRATE_SECTION + "]\n")
os.write(file_handle, self.__convert_old_multiline_options(open(config, 'r').readlines()))
os.close(file_handle)
return corrected_file
def __add_adapted_config(self, configs, conf_file):

View File

@ -110,6 +110,7 @@ class CpuStat(object):
# logger.debug("Result: %s" % result)
# Numproc, numthreads
# FIXME: 1 remove this expensive operations!!!
command = ['ps axf | wc -l', 'ps -eLf | wc -l']
for cmd in command:
try:

View File

@ -83,6 +83,7 @@ class AgentClient(object):
self.ssh = None
temp_config = tempfile.mkstemp('.cfg', 'agent_')
os.close(temp_config[0])
self.path = {
# Destination path on remote host
'AGENT_REMOTE_FOLDER': '/var/tmp/lunapark_monitoring',
@ -185,7 +186,8 @@ class AgentClient(object):
def uninstall(self):
""" Remove agent's files from remote host"""
log_file = tempfile.mkstemp('.log', "agent_" + self.host + "_")[1]
fh, log_file = tempfile.mkstemp('.log', "agent_" + self.host + "_")
os.close(fh)
cmd = [self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + "_agent.log", log_file]
logging.debug("Copy agent log from %s: %s" , self.host, cmd)
remove = self.ssh.get_scp_pipe(cmd)

View File

@ -44,11 +44,13 @@ class AggregatorPlugin(AbstractPlugin):
self.log.warning("No one set reader for aggregator yet")
def is_test_finished(self):
# read up to 5 samples in single pass
self.__read_samples(5)
return -1
def end_test(self, retcode):
self.__read_samples(force=True)
self.reader.close_files()
return retcode
def add_result_listener(self, listener):
@ -150,6 +152,12 @@ class AbstractReader:
def check_open_files(self):
pass
def close_files(self):
'''
Close opened handlers to avoid fd leak
'''
pass
def get_next_sample(self, force):
pass

View File

@ -4,7 +4,6 @@ from tankcore import AbstractPlugin
import os
import subprocess
import tankcore
import tempfile
# TODO: 3 add console screen widget with info and PB measured via stderr info parsing
class ApacheBenchmarkPlugin(AbstractPlugin):
@ -29,7 +28,7 @@ class ApacheBenchmarkPlugin(AbstractPlugin):
self.url = self.get_option("url", 'http://localhost/')
self.requests = self.get_option("requests", '100')
self.concurrency = self.get_option("concurrency", '1')
self.out_file = tempfile.mkstemp('.log', 'ab_', self.core.artifacts_base_dir)[1]
self.out_file = self.core.mkstemp('.log', 'ab_')
self.core.add_artifact_file(self.out_file)
def prepare_test(self):
@ -96,6 +95,10 @@ class ABReader(AbstractReader):
self.log.debug("Opening ab out file: %s", self.ab.out_file)
self.results = open(self.ab.out_file, 'r')
def close_files(self):
if self.results:
self.results.close()
def get_next_sample(self, force):
if self.results:
read_lines = self.results.readlines()

View File

@ -21,11 +21,6 @@ class ConsoleOnlinePlugin(AbstractPlugin, AggregateResultListener):
return __file__
def configure(self):
try:
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(self)
except KeyError:
self.log.debug("No aggregator for console")
self.info_panel_width = self.get_option("info_panel_width", '33')
self.short_only = int(self.get_option("short_only", '0'))
if sys.stdout.isatty() and not int(self.get_option("disable_all_colors", '0')):
@ -36,6 +31,14 @@ class ConsoleOnlinePlugin(AbstractPlugin, AggregateResultListener):
self.console_markup.__dict__[color] = ''
self.screen = Screen(self.info_panel_width, self.console_markup)
try:
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(self)
except KeyError:
self.log.debug("No aggregator for console")
self.screen.block_rows=[]
self.screen.info_panel_percent=100
def is_test_finished(self):
try:
console_view = self.screen.render_screen().encode('utf-8')

View File

@ -18,6 +18,7 @@ class JMeterPlugin(AbstractPlugin):
self.jmeter_process = None
self.args = None
self.original_jmx = None
self.jtl_file = None
@staticmethod
def get_key():
@ -26,15 +27,13 @@ class JMeterPlugin(AbstractPlugin):
def configure(self):
self.original_jmx = self.get_option("jmx")
self.core.add_artifact_file(self.original_jmx, True)
handle, jtl = tempfile.mkstemp('.jtl', 'jmeter_', self.core.artifacts_base_dir)
os.close(handle)
self.jtl_file = jtl
self.jtl_file = self.core.mkstemp('.jtl', 'jmeter_')
self.core.add_artifact_file(self.jtl_file)
self.jmx = self.__add_writing_section(self.original_jmx, self.jtl_file)
self.core.add_artifact_file(self.jmx)
self.user_args = self.get_option("args", '')
self.jmeter_path = self.get_option("jmeter_path", 'jmeter')
self.jmeter_log = tempfile.mkstemp('.log', 'jmeter_', self.core.artifacts_base_dir)[1]
self.jmeter_log = self.core.mkstemp('.log', 'jmeter_')
self.core.add_artifact_file(self.jmeter_log, True)
def prepare_test(self):
@ -62,10 +61,12 @@ class JMeterPlugin(AbstractPlugin):
if aggregator:
aggregator.add_result_listener(widget)
def start_test(self):
self.log.info("Starting %s with arguments: %s", self.jmeter_path, self.args)
self.jmeter_process = subprocess.Popen(self.args, executable=self.jmeter_path, preexec_fn=os.setsid) # stderr=subprocess.PIPE, stdout=subprocess.PIPE,
def is_test_finished(self):
rc = self.jmeter_process.poll()
if rc != None:
@ -74,7 +75,9 @@ class JMeterPlugin(AbstractPlugin):
else:
return -1
def end_test(self, retcode):
# FIXME: 1 jmeter hangs
if self.jmeter_process:
if self.jmeter_process.poll() == None:
self.log.warn("Terminating jmeter process with PID %s", self.jmeter_process.pid)
@ -109,16 +112,15 @@ class JMeterPlugin(AbstractPlugin):
tpl = open(os.path.dirname(__file__) + '/jmeter_writer.xml', 'r').read()
try:
new_file = tempfile.mkstemp('.jmx', 'modified_', os.path.dirname(os.path.realpath(jmx)))[1]
file_handle, new_file = tempfile.mkstemp('.jmx', 'modified_', os.path.dirname(os.path.realpath(jmx)))
except OSError, e:
self.log.debug("Can't create new jmx near original: %s", e)
new_file = tempfile.mkstemp('.jmx', 'modified_', self.core.artifacts_base_dir)[1]
file_handle, new_file = tempfile.mkstemp('.jmx', 'modified_', self.core.artifacts_base_dir)
self.log.debug("Modified JMX: %s", new_file)
file_handle = open(new_file, 'w')
file_handle.write(''.join(source_lines))
file_handle.write(tpl % jtl)
file_handle.write(closing)
file_handle.close()
os.write(file_handle, ''.join(source_lines))
os.write(file_handle, tpl % jtl)
os.write(file_handle, closing)
os.close(file_handle)
return new_file
@ -147,8 +149,13 @@ class JMeterReader(AbstractReader):
self.log.debug("Opening jmeter out file: %s", self.jmeter.jtl_file)
self.results = open(self.jmeter.jtl_file, 'r')
def close_files(self):
if self.results:
self.results.close()
def get_next_sample(self, force):
if force:
# FIXME: 1 not works
while self.jmeter.jmeter_process and self.jmeter.jmeter_process.poll() == None:
self.log.debug("Waiting for JMeter process to exit")
self.jmeter.jmeter_process.terminate()

View File

@ -10,7 +10,6 @@ from tankcore import AbstractPlugin
import base64
import copy
import os
import tempfile
import time
import traceback
@ -28,6 +27,7 @@ class MonitoringPlugin(AbstractPlugin):
self.config = None
self.process = None
self.monitoring = MonitoringCollector()
self.mon_saver=SaveMonToFile(self.data_file)
self.die_on_fail = True
self.data_file = None
@ -74,8 +74,8 @@ class MonitoringPlugin(AbstractPlugin):
if self.default_target:
self.monitoring.default_target = self.default_target
self.data_file = tempfile.mkstemp('.data', 'monitoring_', self.core.artifacts_base_dir)[1]
self.monitoring.add_listener(SaveMonToFile(self.data_file))
self.data_file = self.core.mkstemp('.data', 'monitoring_')
self.monitoring.add_listener(self.mon_saver)
self.core.add_artifact_file(self.data_file)
try:
@ -120,6 +120,8 @@ class MonitoringPlugin(AbstractPlugin):
self.monitoring.stop()
for log in self.monitoring.artifact_files:
self.core.add_artifact_file(log)
self.mon_saver.close()
return retcode
@ -135,6 +137,10 @@ class SaveMonToFile(MonitoringDataListener):
self.store.write(data_string)
self.store.flush()
def close(self):
if self.store:
self.store.close()
class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener):
'''

View File

@ -19,7 +19,6 @@ import string
import subprocess
import sys
import tankcore
import tempfile
import time
import logging
@ -335,6 +334,13 @@ class PhantomReader(AbstractReader):
self.log.debug("Opening stat file: %s", self.phantom.phantom.stat_log)
self.stat = open(self.phantom.phantom.stat_log, 'r')
def close_files(self):
if self.stat:
self.stat.close()
if self.phout:
self.phout.close()
def get_next_sample(self, force):
if self.stat:
self.__read_stat_data()
@ -598,17 +604,20 @@ class PhantomConfig:
self.address = self.get_option('address', '127.0.0.1')
self.port = self.get_option('port', '80')
self.tank_type = self.get_option("tank_type", 'http')
self.answ_log = tempfile.mkstemp(".log", "answ_", self.owner.core.artifacts_base_dir)[1]
self.answ_log_level = self.get_option("writelog", "none")
if self.answ_log_level == '0':
self.answ_log_level = 'none'
elif self.answ_log_level == '1':
self.answ_log_level = 'all'
self.phout_file = tempfile.mkstemp(".log", "phout_", self.owner.core.artifacts_base_dir)[1]
if self.answ_log_level != 'none':
self.answ_log = self.owner.core.mkstemp(".log", "answ_")
self.phout_file = self.owner.core.mkstemp(".log", "phout_")
self.owner.core.add_artifact_file(self.phout_file)
self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.owner.core.artifacts_base_dir)[1]
self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.owner.core.artifacts_base_dir)[1]
self.stat_log = self.owner.core.mkstemp(".log", "phantom_stat_")
self.phantom_log = self.owner.core.mkstemp(".log", "phantom_")
self.stpd = self.get_option(self.OPTION_STPD, '')
self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1)
self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))
@ -665,14 +674,16 @@ class PhantomConfig:
kwargs['reply_limits'] = ''
handle, filename = tempfile.mkstemp(".conf", "phantom_", self.owner.core.artifacts_base_dir)
filename = self.owner.core.mkstemp(".conf", "phantom_")
self.owner.core.add_artifact_file(filename)
self.log.debug("Generating phantom config: %s", filename)
template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read()
tpl = string.Template(template_str)
config = tpl.substitute(kwargs)
os.write(handle, config)
handle = open(filename, 'w')
handle.write(config)
handle.close()
return filename

View File

@ -16,7 +16,8 @@ def make_load_ammo(uris):
'''
Create temp file for uri-format ammo
'''
filename = tempfile.mkstemp('.ammo', 'uri_')[1]
fd, filename = tempfile.mkstemp('.ammo', 'uri_')
os.close(fd)
tmp_ammo = open(filename, 'w')
for line in uris:
tmp_ammo.write(line + '\n')

View File

@ -465,7 +465,8 @@ class TankCore:
if not force and self.__there_is_locks():
raise RuntimeError("There is lock files")
self.lock_file = tempfile.mkstemp('.lock', 'lunapark_', self.LOCK_DIR)[1]
fh, self.lock_file = tempfile.mkstemp('.lock', 'lunapark_', self.LOCK_DIR)
os.close(fh)
os.chmod(self.lock_file, 0644)
self.config.file = self.lock_file
@ -500,7 +501,14 @@ class TankCore:
retcode = True
return retcode
def mkstemp(self, suffix, prefix):
'''
Generate temp file name in artifacts base dir
and close temp file handle
'''
fd, fname = tempfile.mkstemp(suffix, prefix, self.artifacts_base_dir)
os.close(fd)
return fname
class ConfigManager:
'''