From 8393ae6e6ca7fb8d892854042d3eaa35bb67900a Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Thu, 11 Oct 2012 16:52:22 +0400 Subject: [PATCH] sort configs in etc dir Split phantomplugin into 3 classes --- Tank/ConsoleWorker.py | 2 +- Tank/Plugins/Loadosophia.py | 5 +- Tank/Plugins/Monitoring.py | 3 +- Tank/Plugins/Phantom.py | 717 +++++++++++++++++++----------------- Tests/PhantomPluginTest.py | 2 +- Tests/config/phantom.conf | 3 +- 6 files changed, 384 insertions(+), 348 deletions(-) diff --git a/Tank/ConsoleWorker.py b/Tank/ConsoleWorker.py index d6831fe..e21b61b 100644 --- a/Tank/ConsoleWorker.py +++ b/Tank/ConsoleWorker.py @@ -198,7 +198,7 @@ class ConsoleTank: if not self.options.no_rc: try: - for filename in os.listdir(self.baseconfigs_location): + for filename in sorted(os.listdir(self.baseconfigs_location)): if fnmatch.fnmatch(filename, '*.ini'): configs += [os.path.realpath(self.baseconfigs_location + os.sep + filename)] except OSError: diff --git a/Tank/Plugins/Loadosophia.py b/Tank/Plugins/Loadosophia.py index 715524a..9058d77 100644 --- a/Tank/Plugins/Loadosophia.py +++ b/Tank/Plugins/Loadosophia.py @@ -48,7 +48,7 @@ class LoadosophiaPlugin(AbstractPlugin): # phantom try: phantom = self.core.get_plugin_of_type(PhantomPlugin) - main_file = phantom.phout_file + main_file = phantom.phantom.phout_file except KeyError: self.log.debug("Phantom not found") @@ -65,7 +65,7 @@ class LoadosophiaPlugin(AbstractPlugin): mon = self.core.get_plugin_of_type(MonitoringPlugin) mon_file = mon.data_file except KeyError: - self.log.debug("Phantom not found") + self.log.debug("Monitoring not found") self.loadosophia.send_results(self.project_key, main_file, [mon_file]) @@ -76,7 +76,6 @@ class LoadosophiaPlugin(AbstractPlugin): except KeyError: self.log.debug("Web online not found") - return retcode diff --git a/Tank/Plugins/Monitoring.py b/Tank/Plugins/Monitoring.py index 64f57e2..a0bae58 100644 --- a/Tank/Plugins/Monitoring.py +++ b/Tank/Plugins/Monitoring.py @@ -55,7 +55,8 @@ class MonitoringPlugin(AbstractPlugin): except KeyError, ex: self.log.debug("Phantom plugin not found: %s", ex) if phantom: - self.default_target = phantom.address + if phantom.phantom: + self.default_target = phantom.phantom.address if phantom.phout_import_mode: self.config = None # TODO: 2 resolve virtual to host address diff --git a/Tank/Plugins/Phantom.py b/Tank/Plugins/Phantom.py index b644b9d..3b09849 100644 --- a/Tank/Plugins/Phantom.py +++ b/Tank/Plugins/Phantom.py @@ -22,6 +22,7 @@ import sys import tankcore import tempfile import time +import logging # TODO: 3 chosen cases # TODO: 2 if instances_schedule enabled - pass to phantom the top count as instances limit @@ -30,253 +31,50 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener): ''' Plugin for running phantom tool ''' - OPTION_STEPS = 'steps' - OPTION_TEST_DURATION = 'test_duration' - OPTION_INSTANCES_LIMIT = 'instances' - OPTION_AMMO_COUNT = 'ammo_count' - OPTION_LOOP = 'loop' - OPTION_LOOP_COUNT = 'loop_count' - OPTION_AMMOFILE = "ammofile" - OPTION_SCHEDULE = 'rps_schedule' - OPTION_LOADSCHEME = 'loadscheme' - OPTION_PORT = "port" - OPTION_IP = 'address' - OPTION_STPD = 'stpd_file' - SECTION = 'phantom' def __init__(self, core): AbstractPlugin.__init__(self, core) - # FIXME: 3 obviously needs refactoring, at least extracting phantom-specific options into separate class self.process = None - self.timeout = 1000 - self.answ_log = None - self.phout_file = None - self.stat_log = None - self.phantom_log = None - self.config = None - self.instances = None - self.use_caching = None - self.http_ver = None - self.rps_schedule = [] self.phout_import_mode = 0 self.did_phout_import_try = False - self.steps = [] - self.phantom_start_time = None - self.ipv6 = None - self.ammo_file = None - self.instances_schedule = None - self.loop_limit = None - self.ammo_limit = None - self.uris = None - self.headers = None - self.autocases = None - self.cache_dir = None - self.force_stepping = None self.phantom_path = None - self.phantom_modules_path = None - self.ssl = None - self.address = None - self.port = None - self.tank_type = None - self.answ_log_level = None - self.stpd = None - self.threads = None - self.gatling = None - self.phantom_http_line = None - self.phantom_http_field_num = None - self.phantom_http_field = None - self.phantom_http_entity = None self.eta_file = None self.processed_ammo_count = 0 + self.config = None + self.stepper = None + self.phantom = None + @staticmethod def get_key(): return __file__ - def __check_address(self): - ''' - Analyse target address setting, resolve it to IP - ''' - try: - ipaddr.IPv6Address(self.address) - self.ipv6 = True - except AddressValueError: - self.log.debug("Not ipv6 address: %s", self.address) - self.ipv6 = False - address_port = self.address.split(":") - self.address = address_port[0] - if len(address_port) > 1: - self.port = address_port[1] - try: - ipaddr.IPv4Address(self.address) - except AddressValueError: - self.log.debug("Not ipv4 address: %s", self.address) - ip_addr = socket.gethostbyname(self.address) - reverse_name = socket.gethostbyaddr(ip_addr)[0] - self.log.debug("Address %s ip_addr: %s, reverse-resolve: %s", self.address, ip_addr, reverse_name) - if reverse_name.startswith(self.address): - self.address = ip_addr - else: - raise ValueError("Address %s reverse-resolved to %s, but must match" % (self.address, reverse_name)) - - - def __read_phantom_options(self): - ''' - Read phantom tool specific options - ''' - self.phantom_path = self.get_option("phantom_path", 'phantom') - self.config = self.get_option("config", '') - self.phantom_modules_path = self.get_option("phantom_modules_path", "/usr/lib/phantom") - self.ssl = int(self.get_option("ssl", '0')) - self.address = self.get_option(self.OPTION_IP, '127.0.0.1') - self.port = self.get_option(self.OPTION_PORT, '80') - self.tank_type = self.get_option("tank_type", 'http') - self.answ_log = tempfile.mkstemp(".log", "answ_", self.core.artifacts_base_dir)[1] - self.answ_log_level = self.get_option("writelog", "none") - if self.answ_log_level == '0': - self.answ_log_level = 'none' - elif self.answ_log_level == '1': - self.answ_log_level = 'all' - self.phout_file = self.get_option("phout_file", '') - if not self.phout_file: - self.phout_file = tempfile.mkstemp(".log", "phout_", self.core.artifacts_base_dir)[1] - self.core.add_artifact_file(self.phout_file) - else: - self.phout_import_mode = 1 - self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.core.artifacts_base_dir)[1] - self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.core.artifacts_base_dir)[1] - self.stpd = self.get_option(self.OPTION_STPD, '') - self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1) - self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000')) - self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n")) - self.phantom_http_line = self.get_option("phantom_http_line", "") - self.phantom_http_field_num = self.get_option("phantom_http_field_num", "") - self.phantom_http_field = self.get_option("phantom_http_field", "") - self.phantom_http_entity = self.get_option("phantom_http_entity", "") - - def configure(self): - # stepper part - self.ammo_file = self.get_option(self.OPTION_AMMOFILE, '') - self.instances_schedule = self.get_option("instances_schedule", '') - self.loop_limit = int(self.get_option(self.OPTION_LOOP, "-1")) - self.ammo_limit = int(self.get_option("ammo_limit", "-1")) # TODO: 3 stepper should implement ammo_limit - sched = self.get_option(self.OPTION_SCHEDULE, '') - sched = " ".join(sched.split("\n")) - sched = sched.split(')') - self.rps_schedule = [] - for step in sched: - if step.strip(): - self.rps_schedule.append(step.strip() + ')') - self.uris = self.get_option("uris", '').strip().split("\n") - while '' in self.uris: - self.uris.remove('') - self.headers = self.get_option("headers", '').strip().split("\n") - while '' in self.headers: - self.headers.remove('') - self.http_ver = self.get_option("header_http", '1.1') - self.autocases = self.get_option("autocases", '0') - self.use_caching = int(self.get_option("use_caching", '1')) - self.cache_dir = os.path.expanduser(self.get_option("cache_dir", self.core.artifacts_base_dir)) - self.force_stepping = int(self.get_option("force_stepping", '0')) - - # phantom part - self.__read_phantom_options() - + + def configure(self): # plugin part + self.config = self.get_option("config", '') self.eta_file = self.get_option("eta_file", '') - - self.core.add_artifact_file(self.answ_log) - self.core.add_artifact_file(self.stat_log) - self.core.add_artifact_file(self.phantom_log) - self.core.add_artifact_file(self.config) + self.phantom_path = self.get_option("phantom_path", 'phantom') + self.core.add_artifact_file(self.eta_file) + self.core.add_artifact_file(self.config) - self.__check_address() + if self.get_option(PhantomConfig.OPTION_PHOUT, ''): + self.phout_import_mode = 1 try: autostop = self.core.get_plugin_of_type(AutostopPlugin) autostop.add_criteria_class(UsedInstancesCriteria) except KeyError: self.log.debug("No autostop plugin found, not adding instances criteria") - - def __compose_config(self): - ''' - Generate phantom tool run config - ''' - if not self.stpd: - raise RuntimeError("Cannot proceed with no source file") - - kwargs = {} - kwargs['ssl_transport'] = "transport_t ssl_transport = transport_ssl_t { timeout = 1s } transport = ssl_transport" if self.ssl else "" - kwargs['method_stream'] = "method_stream_ipv6_t" if self.ipv6 else "method_stream_ipv4_t" - kwargs['proto'] = "http_proto" if self.tank_type == 'http' else "none_proto" - kwargs['threads'] = self.threads - kwargs['answ_log'] = self.answ_log - kwargs['answ_log_level'] = self.answ_log_level - kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else '' - kwargs['phout'] = self.phout_file - kwargs['stpd'] = self.stpd - if self.gatling: - kwargs['bind'] = 'bind={ ' + self.gatling + ' }' - else: - kwargs['bind'] = '' - kwargs['ip'] = self.address - kwargs['port'] = self.port - kwargs['timeout'] = self.timeout - kwargs['instances'] = self.instances - kwargs['stat_log'] = self.stat_log - kwargs['phantom_log'] = self.phantom_log - tune = '' - if self.phantom_http_entity: - tune += "entity = " + self.phantom_http_entity + "\n" - if self.phantom_http_field: - tune += "field = " + self.phantom_http_field + "\n" - if self.phantom_http_field_num: - tune += "field_num = " + self.phantom_http_field_num + "\n" - if self.phantom_http_line: - tune += "line = " + self.phantom_http_line + "\n" - if tune: - kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}" - else: - kwargs['reply_limits'] = '' - - - handle, filename = tempfile.mkstemp(".conf", "phantom_", self.core.artifacts_base_dir) - self.core.add_artifact_file(filename) - self.log.debug("Generating phantom config: %s", filename) - template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read() - tpl = string.Template(template_str) - config = tpl.substitute(kwargs) - - os.write(handle, config) - return filename - - - def __prepare_stepper(self): - ''' - Generate test data if necessary - ''' - self.stpd = self.__get_stpd_filename() - self.core.set_option(self.SECTION, self.OPTION_STPD, self.stpd) - if self.use_caching and not self.force_stepping and os.path.exists(self.stpd) and os.path.exists(self.stpd + ".conf"): - self.log.info("Using cached stpd-file: %s", self.stpd) - stepper = Stepper(self.stpd) # just to store cached data - self.__read_cached_options(self.stpd + ".conf", stepper) - else: - stepper = self.__make_stpd_file(self.stpd) - - self.steps = stepper.steps - - #self.core.set_option(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES, stepper.cases) - self.core.set_option(self.SECTION, self.OPTION_STEPS, ' '.join([str(x) for x in stepper.steps])) - self.core.set_option(self.SECTION, self.OPTION_LOADSCHEME, stepper.loadscheme) - self.core.set_option(self.SECTION, self.OPTION_LOOP_COUNT, str(stepper.loop_count)) - self.core.set_option(self.SECTION, self.OPTION_AMMO_COUNT, str(stepper.ammo_count)) - self.__calculate_test_duration(stepper.steps) - - self.core.config.flush(self.stpd + ".conf") - + + if not self.config and not self.phout_import_mode: + self.phantom = PhantomConfig(self) + self.phantom.read_config() + self.stepper = StepperWrapper(self) + self.stepper.read_config() + def prepare_test(self): aggregator = None @@ -291,35 +89,36 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener): aggregator.add_result_listener(self) if not self.phout_import_mode: - self.__prepare_stepper() + self.stepper.prepare_stepper() + self.phantom.stpd = self.stepper.stpd if not self.config: - self.config = self.__compose_config() + self.config = self.phantom.compose_config() args = [self.phantom_path, 'check', self.config] retcode = tankcore.execute(args, catch_out=True) if retcode: raise RuntimeError("Subprocess returned %s",) - try: - console = self.core.get_plugin_of_type(ConsoleOnlinePlugin) - except Exception, ex: - self.log.debug("Console not found: %s", ex) - console = None - - if console: - widget = PhantomProgressBarWidget(self) - if self.eta_file: - widget.eta_file = self.eta_file - console.add_info_widget(widget) - aggregator = self.core.get_plugin_of_type(AggregatorPlugin) - aggregator.add_result_listener(widget) - - widget = PhantomInfoWidget(self) - console.add_info_widget(widget) - aggregator = self.core.get_plugin_of_type(AggregatorPlugin) - aggregator.add_result_listener(widget) - + try: + console = self.core.get_plugin_of_type(ConsoleOnlinePlugin) + except Exception, ex: + self.log.debug("Console not found: %s", ex) + console = None + + if console: + widget = PhantomProgressBarWidget(self) + if self.eta_file: + widget.eta_file = self.eta_file + console.add_info_widget(widget) + aggregator = self.core.get_plugin_of_type(AggregatorPlugin) + aggregator.add_result_listener(widget) + + widget = PhantomInfoWidget(self) + console.add_info_widget(widget) + aggregator = self.core.get_plugin_of_type(AggregatorPlugin) + aggregator.add_result_listener(widget) + def start_test(self): if not self.phout_import_mode: @@ -362,7 +161,7 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener): def post_process(self, retcode): if not retcode: - count = self.get_option(self.OPTION_AMMO_COUNT) + count = self.get_option(StepperWrapper.OPTION_AMMO_COUNT) if count != self.processed_ammo_count: self.log.warning("Planned ammo count %s differs from processed %s", count, self.processed_ammo_count) return retcode @@ -372,90 +171,6 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener): self.processed_ammo_count += second_aggregate_data.overall.RPS - def __get_stpd_filename(self): - ''' - Choose the name for stepped data file - ''' - if self.use_caching: - sep = "|" - hasher = hashlib.md5() - hashed_str = self.instances_schedule + sep + str(self.loop_limit) - hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(self.rps_schedule) + sep + self.autocases - hashed_str += sep + ";".join(self.uris) + sep + ";".join(self.headers) - - if self.ammo_file: - if not os.path.exists(self.ammo_file): - raise RuntimeError("Ammo file not found: %s", self.ammo_file) - - hashed_str += sep + os.path.realpath(self.ammo_file) - stat = os.stat(self.ammo_file) - cnt = 0 - for stat_option in stat: - if cnt == 7: # skip access time - continue - cnt += 1 - hashed_str += ";" + str(stat_option) - else: - if not self.uris: - raise RuntimeError("Neither phantom.ammofile nor phantom.uris specified") - hashed_str += sep + ';'.join(self.uris) + sep + ';'.join(self.headers) - - self.log.debug("stpd-hash source: %s", hashed_str) - hasher.update(hashed_str) - - if not os.path.exists(self.cache_dir): - os.makedirs(self.cache_dir) - stpd = self.cache_dir + '/' + os.path.basename(self.ammo_file) + "_" + hasher.hexdigest() + ".stpd" - self.log.debug("Generated cache file name: %s", stpd) - else: - stpd = os.path.realpath("ammo.stpd") - - return stpd - - - def __calculate_test_duration(self, steps): - ''' - Get total test duration - ''' - duration = 0 - for rps, dur in tankcore.pairs(steps): - duration += dur - - self.core.set_option(self.SECTION, self.OPTION_TEST_DURATION, str(duration)) - - - def __read_cached_options(self, cached_config, stepper): - ''' - Merge stpd cached options to current config - ''' - self.log.debug("Reading cached stepper options: %s", cached_config) - external_stepper_conf = ConfigParser.ConfigParser() - external_stepper_conf.read(cached_config) - #stepper.cases = external_stepper_conf.get(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES) - stepper.steps = [int(x) for x in external_stepper_conf.get(self.SECTION, self.OPTION_STEPS).split(' ')] - stepper.loadscheme = external_stepper_conf.get(self.SECTION, self.OPTION_LOADSCHEME) - stepper.loop_count = external_stepper_conf.get(self.SECTION, self.OPTION_LOOP_COUNT) - stepper.ammo_count = external_stepper_conf.get(self.SECTION, self.OPTION_AMMO_COUNT) - - - def __make_stpd_file(self, stpd): - ''' - stpd generation using Stepper class - ''' - self.log.info("Making stpd-file: %s", self.stpd) - stepper = Stepper(stpd) - stepper.autocases = int(self.autocases) - stepper.rps_schedule = self.rps_schedule - stepper.instances_schedule = self.instances_schedule - stepper.loop_limit = self.loop_limit - stepper.uris = self.uris - stepper.headers = self.headers - stepper.header_http = self.http_ver - stepper.ammofile = self.ammo_file - - stepper.generate_stpd() - return stepper - class PhantomProgressBarWidget(AbstractInfoWidget, AggregateResultListener): ''' @@ -468,8 +183,8 @@ class PhantomProgressBarWidget(AbstractInfoWidget, AggregateResultListener): AbstractInfoWidget.__init__(self) self.owner = sender self.ammo_progress = 0 - self.ammo_count = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_AMMO_COUNT)) - self.test_duration = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_TEST_DURATION)) + self.ammo_count = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_AMMO_COUNT)) + self.test_duration = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_TEST_DURATION)) self.eta_file = None def render(self, screen): @@ -531,10 +246,10 @@ class PhantomInfoWidget(AbstractInfoWidget, AggregateResultListener): self.instances = 0 self.planned = 0 self.RPS = 0 - self.instances_limit = int(self.owner.core.get_option(PhantomPlugin.SECTION, PhantomPlugin.OPTION_INSTANCES_LIMIT)) + self.instances_limit = int(self.owner.core.get_option(PhantomPlugin.SECTION, PhantomConfig.OPTION_INSTANCES_LIMIT)) self.selfload = 0 self.time_lag = 0 - self.ammo_count = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_AMMO_COUNT)) + self.ammo_count = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_AMMO_COUNT)) self.planned_rps_duration = 0 def render(self, screen): @@ -608,16 +323,17 @@ class PhantomReader(AbstractReader): self.last_sample_time = 0 def check_open_files(self): - if not self.phout and os.path.exists(self.phantom.phout_file): - self.log.debug("Opening phout file: %s", self.phantom.phout_file) - self.phout = open(self.phantom.phout_file, 'r') + if not self.phout and self.phantom.phantom and os.path.exists(self.phantom.phantom.phout_file): + self.log.debug("Opening phout file: %s", self.phantom.phantom.phout_file) + self.phout = open(self.phantom.phantom.phout_file, 'r') # strange decision to place it here, but no better idea yet - for item in tankcore.pairs(self.phantom.steps): - self.steps.append([item[0], item[1]]) + if self.phantom.stepper: + for item in tankcore.pairs(self.phantom.stepper.steps): + self.steps.append([item[0], item[1]]) - if not self.stat and self.phantom.stat_log and os.path.exists(self.phantom.stat_log): - self.log.debug("Opening stat file: %s", self.phantom.stat_log) - self.stat = open(self.phantom.stat_log, 'r') + if not self.stat and self.phantom.phantom and self.phantom.phantom.stat_log and os.path.exists(self.phantom.phantom.stat_log): + self.log.debug("Opening stat file: %s", self.phantom.phantom.stat_log) + self.stat = open(self.phantom.phantom.stat_log, 'r') def get_next_sample(self, force): if self.stat: @@ -806,3 +522,324 @@ class UsedInstancesCriteria(AbstractCriteria): items = (self.get_level_str(), self.seconds_count, self.seconds_limit) return ("Instances >%s for %s/%ss" % items, float(self.seconds_count) / self.seconds_limit) +class PhantomConfig: + + OPTION_INSTANCES_LIMIT = 'instances' + OPTION_PHOUT = "phout_file" + OPTION_STPD = 'stpd_file' + + def __init__(self, owner): + self.owner = owner + self.log = logging.getLogger(__name__) + # phant + self.timeout = 1000 + self.answ_log = None + self.phout_file = None + self.stat_log = None + self.phantom_log = None + self.instances = None + self.phantom_start_time = None + self.ipv6 = None + self.phantom_modules_path = None + self.ssl = None + self.address = None + self.port = None + self.tank_type = None + self.answ_log_level = None + self.stpd = None + self.threads = None + self.gatling = None + self.phantom_http_line = None + self.phantom_http_field_num = None + self.phantom_http_field = None + self.phantom_http_entity = None + + + def get_option(self, option_ammofile, param2=None): + return self.owner.get_option(option_ammofile, param2) + + + def __check_address(self): + ''' + Analyse target address setting, resolve it to IP + ''' + try: + ipaddr.IPv6Address(self.address) + self.ipv6 = True + except AddressValueError: + self.log.debug("Not ipv6 address: %s", self.address) + self.ipv6 = False + address_port = self.address.split(":") + self.address = address_port[0] + if len(address_port) > 1: + self.port = address_port[1] + try: + ipaddr.IPv4Address(self.address) + except AddressValueError: + self.log.debug("Not ipv4 address: %s", self.address) + ip_addr = socket.gethostbyname(self.address) + reverse_name = socket.gethostbyaddr(ip_addr)[0] + self.log.debug("Address %s ip_addr: %s, reverse-resolve: %s", self.address, ip_addr, reverse_name) + if reverse_name.startswith(self.address): + self.address = ip_addr + else: + raise ValueError("Address %s reverse-resolved to %s, but must match" % (self.address, reverse_name)) + + + def read_config(self): + ''' + Read phantom tool specific options + ''' + self.phantom_modules_path = self.get_option("phantom_modules_path", "/usr/lib/phantom") + self.ssl = int(self.get_option("ssl", '0')) + self.address = self.get_option('address', '127.0.0.1') + self.port = self.get_option('port', '80') + self.tank_type = self.get_option("tank_type", 'http') + self.answ_log = tempfile.mkstemp(".log", "answ_", self.owner.core.artifacts_base_dir)[1] + self.answ_log_level = self.get_option("writelog", "none") + if self.answ_log_level == '0': + self.answ_log_level = 'none' + elif self.answ_log_level == '1': + self.answ_log_level = 'all' + self.phout_file = tempfile.mkstemp(".log", "phout_", self.owner.core.artifacts_base_dir)[1] + self.owner.core.add_artifact_file(self.phout_file) + + self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.owner.core.artifacts_base_dir)[1] + self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.owner.core.artifacts_base_dir)[1] + self.stpd = self.get_option(self.OPTION_STPD, '') + self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1) + self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000')) + self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n")) + self.phantom_http_line = self.get_option("phantom_http_line", "") + self.phantom_http_field_num = self.get_option("phantom_http_field_num", "") + self.phantom_http_field = self.get_option("phantom_http_field", "") + self.phantom_http_entity = self.get_option("phantom_http_entity", "") + + self.__check_address() + self.owner.core.add_artifact_file(self.answ_log) + self.owner.core.add_artifact_file(self.stat_log) + self.owner.core.add_artifact_file(self.phantom_log) + + def compose_config(self): + ''' + Generate phantom tool run config + ''' + if not self.stpd: + raise RuntimeError("Cannot proceed with no source file") + + kwargs = {} + kwargs['ssl_transport'] = "transport_t ssl_transport = transport_ssl_t { timeout = 1s } transport = ssl_transport" if self.ssl else "" + kwargs['method_stream'] = "method_stream_ipv6_t" if self.ipv6 else "method_stream_ipv4_t" + kwargs['proto'] = "http_proto" if self.tank_type == 'http' else "none_proto" + kwargs['threads'] = self.threads + kwargs['answ_log'] = self.answ_log + kwargs['answ_log_level'] = self.answ_log_level + kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else '' + kwargs['phout'] = self.phout_file + kwargs['stpd'] = self.stpd + if self.gatling: + kwargs['bind'] = 'bind={ ' + self.gatling + ' }' + else: + kwargs['bind'] = '' + kwargs['ip'] = self.address + kwargs['port'] = self.port + kwargs['timeout'] = self.timeout + kwargs['instances'] = self.instances + kwargs['stat_log'] = self.stat_log + kwargs['phantom_log'] = self.phantom_log + tune = '' + if self.phantom_http_entity: + tune += "entity = " + self.phantom_http_entity + "\n" + if self.phantom_http_field: + tune += "field = " + self.phantom_http_field + "\n" + if self.phantom_http_field_num: + tune += "field_num = " + self.phantom_http_field_num + "\n" + if self.phantom_http_line: + tune += "line = " + self.phantom_http_line + "\n" + if tune: + kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}" + else: + kwargs['reply_limits'] = '' + + + handle, filename = tempfile.mkstemp(".conf", "phantom_", self.owner.core.artifacts_base_dir) + self.owner.core.add_artifact_file(filename) + self.log.debug("Generating phantom config: %s", filename) + template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read() + tpl = string.Template(template_str) + config = tpl.substitute(kwargs) + + os.write(handle, config) + return filename + + + +class StepperWrapper: + OPTION_STPD = 'stpd_file' + OPTION_STEPS = 'steps' + OPTION_TEST_DURATION = 'test_duration' + OPTION_AMMO_COUNT = 'ammo_count' + OPTION_LOOP = 'loop' + OPTION_LOOP_COUNT = 'loop_count' + OPTION_AMMOFILE = "ammofile" + OPTION_SCHEDULE = 'rps_schedule' + OPTION_LOADSCHEME = 'loadscheme' + + + def __init__(self, owner): + self.owner = owner + self.log = logging.getLogger(__name__) + # stepper + self.rps_schedule = [] + self.use_caching = None + self.http_ver = None + self.steps = [] + self.ammo_file = None + self.instances_schedule = None + self.loop_limit = None + self.ammo_limit = None + self.uris = None + self.headers = None + self.autocases = None + self.cache_dir = None + self.force_stepping = None + self.stpd = None + + + def get_option(self, option_ammofile, param2=None): + return self.owner.get_option(option_ammofile, param2) + + + def read_config(self): + # stepper part + self.ammo_file = self.get_option(self.OPTION_AMMOFILE, '') + self.instances_schedule = self.get_option("instances_schedule", '') + self.loop_limit = int(self.get_option(self.OPTION_LOOP, "-1")) + self.ammo_limit = int(self.get_option("ammo_limit", "-1")) # TODO: 3 stepper should implement ammo_limit + sched = self.get_option(self.OPTION_SCHEDULE, '') + sched = " ".join(sched.split("\n")) + sched = sched.split(')') + self.rps_schedule = [] + for step in sched: + if step.strip(): + self.rps_schedule.append(step.strip() + ')') + self.uris = self.get_option("uris", '').strip().split("\n") + while '' in self.uris: + self.uris.remove('') + self.headers = self.get_option("headers", '').strip().split("\n") + while '' in self.headers: + self.headers.remove('') + self.http_ver = self.get_option("header_http", '1.1') + self.autocases = self.get_option("autocases", '0') + self.use_caching = int(self.get_option("use_caching", '1')) + self.cache_dir = os.path.expanduser(self.get_option("cache_dir", self.owner.core.artifacts_base_dir)) + self.force_stepping = int(self.get_option("force_stepping", '0')) + + def prepare_stepper(self): + ''' + Generate test data if necessary + ''' + self.stpd = self.__get_stpd_filename() + self.owner.core.set_option(self.owner.SECTION, self.OPTION_STPD, self.stpd) + if self.use_caching and not self.force_stepping and os.path.exists(self.stpd) and os.path.exists(self.stpd + ".conf"): + self.log.info("Using cached stpd-file: %s", self.stpd) + stepper = Stepper(self.stpd) # just to store cached data + self.__read_cached_options(self.stpd + ".conf", stepper) + else: + stepper = self.__make_stpd_file(self.stpd) + + self.steps = stepper.steps + + #self.core.set_option(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES, stepper.cases) + self.owner.core.set_option(self.owner.SECTION, self.OPTION_STEPS, ' '.join([str(x) for x in stepper.steps])) + self.owner.core.set_option(self.owner.SECTION, self.OPTION_LOADSCHEME, stepper.loadscheme) + self.owner.core.set_option(self.owner.SECTION, self.OPTION_LOOP_COUNT, str(stepper.loop_count)) + self.owner.core.set_option(self.owner.SECTION, self.OPTION_AMMO_COUNT, str(stepper.ammo_count)) + self.__calculate_test_duration(stepper.steps) + + self.owner.core.config.flush(self.stpd + ".conf") + + def __get_stpd_filename(self): + ''' + Choose the name for stepped data file + ''' + if self.use_caching: + sep = "|" + hasher = hashlib.md5() + hashed_str = self.instances_schedule + sep + str(self.loop_limit) + hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(self.rps_schedule) + sep + self.autocases + hashed_str += sep + ";".join(self.uris) + sep + ";".join(self.headers) + + if self.ammo_file: + if not os.path.exists(self.ammo_file): + raise RuntimeError("Ammo file not found: %s", self.ammo_file) + + hashed_str += sep + os.path.realpath(self.ammo_file) + stat = os.stat(self.ammo_file) + cnt = 0 + for stat_option in stat: + if cnt == 7: # skip access time + continue + cnt += 1 + hashed_str += ";" + str(stat_option) + else: + if not self.uris: + raise RuntimeError("Neither phantom.ammofile nor phantom.uris specified") + hashed_str += sep + ';'.join(self.uris) + sep + ';'.join(self.headers) + + self.log.debug("stpd-hash source: %s", hashed_str) + hasher.update(hashed_str) + + if not os.path.exists(self.cache_dir): + os.makedirs(self.cache_dir) + stpd = self.cache_dir + '/' + os.path.basename(self.ammo_file) + "_" + hasher.hexdigest() + ".stpd" + self.log.debug("Generated cache file name: %s", stpd) + else: + stpd = os.path.realpath("ammo.stpd") + + return stpd + + + def __calculate_test_duration(self, steps): + ''' + Get total test duration + ''' + duration = 0 + for rps, dur in tankcore.pairs(steps): + duration += dur + + self.owner.core.set_option(self.owner.SECTION, self.OPTION_TEST_DURATION, str(duration)) + + + def __read_cached_options(self, cached_config, stepper): + ''' + Merge stpd cached options to current config + ''' + self.log.debug("Reading cached stepper options: %s", cached_config) + external_stepper_conf = ConfigParser.ConfigParser() + external_stepper_conf.read(cached_config) + #stepper.cases = external_stepper_conf.get(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES) + stepper.steps = [int(x) for x in external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_STEPS).split(' ')] + stepper.loadscheme = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_LOADSCHEME) + stepper.loop_count = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_LOOP_COUNT) + stepper.ammo_count = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_AMMO_COUNT) + + + def __make_stpd_file(self, stpd): + ''' + stpd generation using Stepper class + ''' + self.log.info("Making stpd-file: %s", self.stpd) + stepper = Stepper(stpd) + stepper.autocases = int(self.autocases) + stepper.rps_schedule = self.rps_schedule + stepper.instances_schedule = self.instances_schedule + stepper.loop_limit = self.loop_limit + stepper.uris = self.uris + stepper.headers = self.headers + stepper.header_http = self.http_ver + stepper.ammofile = self.ammo_file + + stepper.generate_stpd() + return stepper + diff --git a/Tests/PhantomPluginTest.py b/Tests/PhantomPluginTest.py index 03959c9..59ad5eb 100644 --- a/Tests/PhantomPluginTest.py +++ b/Tests/PhantomPluginTest.py @@ -90,7 +90,7 @@ class PhantomPluginTestCase(TankTestCase): def test_multiload_parsing(self): self.foo.core.set_option('phantom', 'rps_schedule', 'const(1,1) line(1,100,60)\nstep(1,10,1,10)') self.foo.configure() - self.assertEquals(['const(1,1)', 'line(1,100,60)', 'step(1,10,1,10)'], self.foo.rps_schedule) + self.assertEquals(['const(1,1)', 'line(1,100,60)', 'step(1,10,1,10)'], self.foo.stepper.rps_schedule) def test_reader(self): self.foo.phout_file = 'data/phout_timeout_mix.txt' diff --git a/Tests/config/phantom.conf b/Tests/config/phantom.conf index 497e843..16b97de 100644 --- a/Tests/config/phantom.conf +++ b/Tests/config/phantom.conf @@ -4,8 +4,7 @@ plugin_console=Tank/Plugins/ConsoleOnline.py [phantom] phantom_path=phantom -config=data/phantom_ready.conf -stpd_file=data/ammo.stpd +#config=data/phantom_ready.conf ammofile=data/dummy.ammo instances_schedule=line(1,10,1m) loop=1