From 14b11e4df46a65eb59244f26563df40ba394047d Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Mon, 22 Oct 2012 15:11:19 +0400 Subject: [PATCH] Fix fd leaking --- Tank/ConsoleWorker.py | 1 + Tank/MonCollector/agent/agent.py | 1 + Tank/MonCollector/collector.py | 4 +++- Tank/Plugins/Aggregator.py | 10 +++++++++- Tank/Plugins/ApacheBenchmark.py | 7 +++++-- Tank/Plugins/ConsoleOnline.py | 13 ++++++++----- Tank/Plugins/JMeter.py | 29 ++++++++++++++++++----------- Tank/Plugins/Monitoring.py | 12 +++++++++--- Tank/Plugins/Phantom.py | 25 ++++++++++++++++++------- Tank/Plugins/Stepper.py | 3 ++- tankcore.py | 14 +++++++++++--- 11 files changed, 85 insertions(+), 34 deletions(-) diff --git a/Tank/ConsoleWorker.py b/Tank/ConsoleWorker.py index 22b983a..2000c60 100644 --- a/Tank/ConsoleWorker.py +++ b/Tank/ConsoleWorker.py @@ -149,6 +149,7 @@ class ConsoleTank: self.log.debug("Creating corrected INI config for it: %s", corrected_file) os.write(file_handle, "[" + self.MIGRATE_SECTION + "]\n") os.write(file_handle, self.__convert_old_multiline_options(open(config, 'r').readlines())) + os.close(file_handle) return corrected_file def __add_adapted_config(self, configs, conf_file): diff --git a/Tank/MonCollector/agent/agent.py b/Tank/MonCollector/agent/agent.py index fa0717e..c7132a3 100755 --- a/Tank/MonCollector/agent/agent.py +++ b/Tank/MonCollector/agent/agent.py @@ -110,6 +110,7 @@ class CpuStat(object): # logger.debug("Result: %s" % result) # Numproc, numthreads + # FIXME: 1 remove this expensive operations!!! command = ['ps axf | wc -l', 'ps -eLf | wc -l'] for cmd in command: try: diff --git a/Tank/MonCollector/collector.py b/Tank/MonCollector/collector.py index 5972ee0..bc2d524 100644 --- a/Tank/MonCollector/collector.py +++ b/Tank/MonCollector/collector.py @@ -83,6 +83,7 @@ class AgentClient(object): self.ssh = None temp_config = tempfile.mkstemp('.cfg', 'agent_') + os.close(temp_config[0]) self.path = { # Destination path on remote host 'AGENT_REMOTE_FOLDER': '/var/tmp/lunapark_monitoring', @@ -185,7 +186,8 @@ class AgentClient(object): def uninstall(self): """ Remove agent's files from remote host""" - log_file = tempfile.mkstemp('.log', "agent_" + self.host + "_")[1] + fh, log_file = tempfile.mkstemp('.log', "agent_" + self.host + "_") + os.close(fh) cmd = [self.host + ':' + self.path['AGENT_REMOTE_FOLDER'] + "_agent.log", log_file] logging.debug("Copy agent log from %s: %s" , self.host, cmd) remove = self.ssh.get_scp_pipe(cmd) diff --git a/Tank/Plugins/Aggregator.py b/Tank/Plugins/Aggregator.py index 0542f63..0824248 100644 --- a/Tank/Plugins/Aggregator.py +++ b/Tank/Plugins/Aggregator.py @@ -44,11 +44,13 @@ class AggregatorPlugin(AbstractPlugin): self.log.warning("No one set reader for aggregator yet") def is_test_finished(self): + # read up to 5 samples in single pass self.__read_samples(5) return -1 def end_test(self, retcode): self.__read_samples(force=True) + self.reader.close_files() return retcode def add_result_listener(self, listener): @@ -149,7 +151,13 @@ class AbstractReader: def check_open_files(self): pass - + + def close_files(self): + ''' + Close opened handlers to avoid fd leak + ''' + pass + def get_next_sample(self, force): pass diff --git a/Tank/Plugins/ApacheBenchmark.py b/Tank/Plugins/ApacheBenchmark.py index 1cd59c4..ee7becf 100644 --- a/Tank/Plugins/ApacheBenchmark.py +++ b/Tank/Plugins/ApacheBenchmark.py @@ -4,7 +4,6 @@ from tankcore import AbstractPlugin import os import subprocess import tankcore -import tempfile # TODO: 3 add console screen widget with info and PB measured via stderr info parsing class ApacheBenchmarkPlugin(AbstractPlugin): @@ -29,7 +28,7 @@ class ApacheBenchmarkPlugin(AbstractPlugin): self.url = self.get_option("url", 'http://localhost/') self.requests = self.get_option("requests", '100') self.concurrency = self.get_option("concurrency", '1') - self.out_file = tempfile.mkstemp('.log', 'ab_', self.core.artifacts_base_dir)[1] + self.out_file = self.core.mkstemp('.log', 'ab_') self.core.add_artifact_file(self.out_file) def prepare_test(self): @@ -95,6 +94,10 @@ class ABReader(AbstractReader): if not self.results and os.path.exists(self.ab.out_file): self.log.debug("Opening ab out file: %s", self.ab.out_file) self.results = open(self.ab.out_file, 'r') + + def close_files(self): + if self.results: + self.results.close() def get_next_sample(self, force): if self.results: diff --git a/Tank/Plugins/ConsoleOnline.py b/Tank/Plugins/ConsoleOnline.py index 4317482..c2b5d98 100644 --- a/Tank/Plugins/ConsoleOnline.py +++ b/Tank/Plugins/ConsoleOnline.py @@ -21,11 +21,6 @@ class ConsoleOnlinePlugin(AbstractPlugin, AggregateResultListener): return __file__ def configure(self): - try: - aggregator = self.core.get_plugin_of_type(AggregatorPlugin) - aggregator.add_result_listener(self) - except KeyError: - self.log.debug("No aggregator for console") self.info_panel_width = self.get_option("info_panel_width", '33') self.short_only = int(self.get_option("short_only", '0')) if sys.stdout.isatty() and not int(self.get_option("disable_all_colors", '0')): @@ -36,6 +31,14 @@ class ConsoleOnlinePlugin(AbstractPlugin, AggregateResultListener): self.console_markup.__dict__[color] = '' self.screen = Screen(self.info_panel_width, self.console_markup) + try: + aggregator = self.core.get_plugin_of_type(AggregatorPlugin) + aggregator.add_result_listener(self) + except KeyError: + self.log.debug("No aggregator for console") + self.screen.block_rows=[] + self.screen.info_panel_percent=100 + def is_test_finished(self): try: console_view = self.screen.render_screen().encode('utf-8') diff --git a/Tank/Plugins/JMeter.py b/Tank/Plugins/JMeter.py index c2d1942..3fa9854 100644 --- a/Tank/Plugins/JMeter.py +++ b/Tank/Plugins/JMeter.py @@ -18,6 +18,7 @@ class JMeterPlugin(AbstractPlugin): self.jmeter_process = None self.args = None self.original_jmx = None + self.jtl_file = None @staticmethod def get_key(): @@ -26,15 +27,13 @@ class JMeterPlugin(AbstractPlugin): def configure(self): self.original_jmx = self.get_option("jmx") self.core.add_artifact_file(self.original_jmx, True) - handle, jtl = tempfile.mkstemp('.jtl', 'jmeter_', self.core.artifacts_base_dir) - os.close(handle) - self.jtl_file = jtl + self.jtl_file = self.core.mkstemp('.jtl', 'jmeter_') self.core.add_artifact_file(self.jtl_file) self.jmx = self.__add_writing_section(self.original_jmx, self.jtl_file) self.core.add_artifact_file(self.jmx) self.user_args = self.get_option("args", '') self.jmeter_path = self.get_option("jmeter_path", 'jmeter') - self.jmeter_log = tempfile.mkstemp('.log', 'jmeter_', self.core.artifacts_base_dir)[1] + self.jmeter_log = self.core.mkstemp('.log', 'jmeter_') self.core.add_artifact_file(self.jmeter_log, True) def prepare_test(self): @@ -62,10 +61,12 @@ class JMeterPlugin(AbstractPlugin): if aggregator: aggregator.add_result_listener(widget) + def start_test(self): self.log.info("Starting %s with arguments: %s", self.jmeter_path, self.args) self.jmeter_process = subprocess.Popen(self.args, executable=self.jmeter_path, preexec_fn=os.setsid) # stderr=subprocess.PIPE, stdout=subprocess.PIPE, + def is_test_finished(self): rc = self.jmeter_process.poll() if rc != None: @@ -73,8 +74,10 @@ class JMeterPlugin(AbstractPlugin): return rc else: return -1 + def end_test(self, retcode): + # FIXME: 1 jmeter hangs if self.jmeter_process: if self.jmeter_process.poll() == None: self.log.warn("Terminating jmeter process with PID %s", self.jmeter_process.pid) @@ -109,16 +112,15 @@ class JMeterPlugin(AbstractPlugin): tpl = open(os.path.dirname(__file__) + '/jmeter_writer.xml', 'r').read() try: - new_file = tempfile.mkstemp('.jmx', 'modified_', os.path.dirname(os.path.realpath(jmx)))[1] + file_handle, new_file = tempfile.mkstemp('.jmx', 'modified_', os.path.dirname(os.path.realpath(jmx))) except OSError, e: self.log.debug("Can't create new jmx near original: %s", e) - new_file = tempfile.mkstemp('.jmx', 'modified_', self.core.artifacts_base_dir)[1] + file_handle, new_file = tempfile.mkstemp('.jmx', 'modified_', self.core.artifacts_base_dir) self.log.debug("Modified JMX: %s", new_file) - file_handle = open(new_file, 'w') - file_handle.write(''.join(source_lines)) - file_handle.write(tpl % jtl) - file_handle.write(closing) - file_handle.close() + os.write(file_handle, ''.join(source_lines)) + os.write(file_handle, tpl % jtl) + os.write(file_handle, closing) + os.close(file_handle) return new_file @@ -146,9 +148,14 @@ class JMeterReader(AbstractReader): if not self.results and os.path.exists(self.jmeter.jtl_file): self.log.debug("Opening jmeter out file: %s", self.jmeter.jtl_file) self.results = open(self.jmeter.jtl_file, 'r') + + def close_files(self): + if self.results: + self.results.close() def get_next_sample(self, force): if force: + # FIXME: 1 not works while self.jmeter.jmeter_process and self.jmeter.jmeter_process.poll() == None: self.log.debug("Waiting for JMeter process to exit") self.jmeter.jmeter_process.terminate() diff --git a/Tank/Plugins/Monitoring.py b/Tank/Plugins/Monitoring.py index 3bac222..935c004 100644 --- a/Tank/Plugins/Monitoring.py +++ b/Tank/Plugins/Monitoring.py @@ -10,7 +10,6 @@ from tankcore import AbstractPlugin import base64 import copy import os -import tempfile import time import traceback @@ -28,6 +27,7 @@ class MonitoringPlugin(AbstractPlugin): self.config = None self.process = None self.monitoring = MonitoringCollector() + self.mon_saver=SaveMonToFile(self.data_file) self.die_on_fail = True self.data_file = None @@ -74,8 +74,8 @@ class MonitoringPlugin(AbstractPlugin): if self.default_target: self.monitoring.default_target = self.default_target - self.data_file = tempfile.mkstemp('.data', 'monitoring_', self.core.artifacts_base_dir)[1] - self.monitoring.add_listener(SaveMonToFile(self.data_file)) + self.data_file = self.core.mkstemp('.data', 'monitoring_') + self.monitoring.add_listener(self.mon_saver) self.core.add_artifact_file(self.data_file) try: @@ -120,6 +120,8 @@ class MonitoringPlugin(AbstractPlugin): self.monitoring.stop() for log in self.monitoring.artifact_files: self.core.add_artifact_file(log) + + self.mon_saver.close() return retcode @@ -134,6 +136,10 @@ class SaveMonToFile(MonitoringDataListener): def monitoring_data(self, data_string): self.store.write(data_string) self.store.flush() + + def close(self): + if self.store: + self.store.close() class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener): diff --git a/Tank/Plugins/Phantom.py b/Tank/Plugins/Phantom.py index 0469647..0ed2d57 100644 --- a/Tank/Plugins/Phantom.py +++ b/Tank/Plugins/Phantom.py @@ -19,7 +19,6 @@ import string import subprocess import sys import tankcore -import tempfile import time import logging @@ -335,6 +334,13 @@ class PhantomReader(AbstractReader): self.log.debug("Opening stat file: %s", self.phantom.phantom.stat_log) self.stat = open(self.phantom.phantom.stat_log, 'r') + def close_files(self): + if self.stat: + self.stat.close() + + if self.phout: + self.phout.close() + def get_next_sample(self, force): if self.stat: self.__read_stat_data() @@ -598,17 +604,20 @@ class PhantomConfig: self.address = self.get_option('address', '127.0.0.1') self.port = self.get_option('port', '80') self.tank_type = self.get_option("tank_type", 'http') - self.answ_log = tempfile.mkstemp(".log", "answ_", self.owner.core.artifacts_base_dir)[1] self.answ_log_level = self.get_option("writelog", "none") if self.answ_log_level == '0': self.answ_log_level = 'none' elif self.answ_log_level == '1': self.answ_log_level = 'all' - self.phout_file = tempfile.mkstemp(".log", "phout_", self.owner.core.artifacts_base_dir)[1] + + if self.answ_log_level != 'none': + self.answ_log = self.owner.core.mkstemp(".log", "answ_") + + self.phout_file = self.owner.core.mkstemp(".log", "phout_") self.owner.core.add_artifact_file(self.phout_file) - self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.owner.core.artifacts_base_dir)[1] - self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.owner.core.artifacts_base_dir)[1] + self.stat_log = self.owner.core.mkstemp(".log", "phantom_stat_") + self.phantom_log = self.owner.core.mkstemp(".log", "phantom_") self.stpd = self.get_option(self.OPTION_STPD, '') self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1) self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000')) @@ -665,14 +674,16 @@ class PhantomConfig: kwargs['reply_limits'] = '' - handle, filename = tempfile.mkstemp(".conf", "phantom_", self.owner.core.artifacts_base_dir) + filename = self.owner.core.mkstemp(".conf", "phantom_") self.owner.core.add_artifact_file(filename) self.log.debug("Generating phantom config: %s", filename) template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read() tpl = string.Template(template_str) config = tpl.substitute(kwargs) - os.write(handle, config) + handle = open(filename, 'w') + handle.write(config) + handle.close() return filename diff --git a/Tank/Plugins/Stepper.py b/Tank/Plugins/Stepper.py index 30f9319..6d88e7f 100644 --- a/Tank/Plugins/Stepper.py +++ b/Tank/Plugins/Stepper.py @@ -16,7 +16,8 @@ def make_load_ammo(uris): ''' Create temp file for uri-format ammo ''' - filename = tempfile.mkstemp('.ammo', 'uri_')[1] + fd, filename = tempfile.mkstemp('.ammo', 'uri_') + os.close(fd) tmp_ammo = open(filename, 'w') for line in uris: tmp_ammo.write(line + '\n') diff --git a/tankcore.py b/tankcore.py index ddf0ddc..879681e 100644 --- a/tankcore.py +++ b/tankcore.py @@ -465,7 +465,8 @@ class TankCore: if not force and self.__there_is_locks(): raise RuntimeError("There is lock files") - self.lock_file = tempfile.mkstemp('.lock', 'lunapark_', self.LOCK_DIR)[1] + fh, self.lock_file = tempfile.mkstemp('.lock', 'lunapark_', self.LOCK_DIR) + os.close(fh) os.chmod(self.lock_file, 0644) self.config.file = self.lock_file @@ -500,8 +501,15 @@ class TankCore: retcode = True return retcode - - + def mkstemp(self, suffix, prefix): + ''' + Generate temp file name in artifacts base dir + and close temp file handle + ''' + fd, fname = tempfile.mkstemp(suffix, prefix, self.artifacts_base_dir) + os.close(fd) + return fname + class ConfigManager: ''' Option storage class