sort configs in etc dir

Split phantomplugin into 3 classes
This commit is contained in:
Andrey Pohilko 2012-10-11 16:52:22 +04:00
parent 607b6db452
commit 8393ae6e6c
6 changed files with 384 additions and 348 deletions

View File

@ -198,7 +198,7 @@ class ConsoleTank:
if not self.options.no_rc:
try:
for filename in os.listdir(self.baseconfigs_location):
for filename in sorted(os.listdir(self.baseconfigs_location)):
if fnmatch.fnmatch(filename, '*.ini'):
configs += [os.path.realpath(self.baseconfigs_location + os.sep + filename)]
except OSError:

View File

@ -48,7 +48,7 @@ class LoadosophiaPlugin(AbstractPlugin):
# phantom
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
main_file = phantom.phout_file
main_file = phantom.phantom.phout_file
except KeyError:
self.log.debug("Phantom not found")
@ -65,7 +65,7 @@ class LoadosophiaPlugin(AbstractPlugin):
mon = self.core.get_plugin_of_type(MonitoringPlugin)
mon_file = mon.data_file
except KeyError:
self.log.debug("Phantom not found")
self.log.debug("Monitoring not found")
self.loadosophia.send_results(self.project_key, main_file, [mon_file])
@ -76,7 +76,6 @@ class LoadosophiaPlugin(AbstractPlugin):
except KeyError:
self.log.debug("Web online not found")
return retcode

View File

@ -55,7 +55,8 @@ class MonitoringPlugin(AbstractPlugin):
except KeyError, ex:
self.log.debug("Phantom plugin not found: %s", ex)
if phantom:
self.default_target = phantom.address
if phantom.phantom:
self.default_target = phantom.phantom.address
if phantom.phout_import_mode:
self.config = None
# TODO: 2 resolve virtual to host address

View File

@ -22,6 +22,7 @@ import sys
import tankcore
import tempfile
import time
import logging
# TODO: 3 chosen cases
# TODO: 2 if instances_schedule enabled - pass to phantom the top count as instances limit
@ -30,253 +31,50 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener):
'''
Plugin for running phantom tool
'''
OPTION_STEPS = 'steps'
OPTION_TEST_DURATION = 'test_duration'
OPTION_INSTANCES_LIMIT = 'instances'
OPTION_AMMO_COUNT = 'ammo_count'
OPTION_LOOP = 'loop'
OPTION_LOOP_COUNT = 'loop_count'
OPTION_AMMOFILE = "ammofile"
OPTION_SCHEDULE = 'rps_schedule'
OPTION_LOADSCHEME = 'loadscheme'
OPTION_PORT = "port"
OPTION_IP = 'address'
OPTION_STPD = 'stpd_file'
SECTION = 'phantom'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
# FIXME: 3 obviously needs refactoring, at least extracting phantom-specific options into separate class
self.process = None
self.timeout = 1000
self.answ_log = None
self.phout_file = None
self.stat_log = None
self.phantom_log = None
self.config = None
self.instances = None
self.use_caching = None
self.http_ver = None
self.rps_schedule = []
self.phout_import_mode = 0
self.did_phout_import_try = False
self.steps = []
self.phantom_start_time = None
self.ipv6 = None
self.ammo_file = None
self.instances_schedule = None
self.loop_limit = None
self.ammo_limit = None
self.uris = None
self.headers = None
self.autocases = None
self.cache_dir = None
self.force_stepping = None
self.phantom_path = None
self.phantom_modules_path = None
self.ssl = None
self.address = None
self.port = None
self.tank_type = None
self.answ_log_level = None
self.stpd = None
self.threads = None
self.gatling = None
self.phantom_http_line = None
self.phantom_http_field_num = None
self.phantom_http_field = None
self.phantom_http_entity = None
self.eta_file = None
self.processed_ammo_count = 0
self.config = None
self.stepper = None
self.phantom = None
@staticmethod
def get_key():
return __file__
def __check_address(self):
'''
Analyse target address setting, resolve it to IP
'''
try:
ipaddr.IPv6Address(self.address)
self.ipv6 = True
except AddressValueError:
self.log.debug("Not ipv6 address: %s", self.address)
self.ipv6 = False
address_port = self.address.split(":")
self.address = address_port[0]
if len(address_port) > 1:
self.port = address_port[1]
try:
ipaddr.IPv4Address(self.address)
except AddressValueError:
self.log.debug("Not ipv4 address: %s", self.address)
ip_addr = socket.gethostbyname(self.address)
reverse_name = socket.gethostbyaddr(ip_addr)[0]
self.log.debug("Address %s ip_addr: %s, reverse-resolve: %s", self.address, ip_addr, reverse_name)
if reverse_name.startswith(self.address):
self.address = ip_addr
else:
raise ValueError("Address %s reverse-resolved to %s, but must match" % (self.address, reverse_name))
def __read_phantom_options(self):
'''
Read phantom tool specific options
'''
self.phantom_path = self.get_option("phantom_path", 'phantom')
self.config = self.get_option("config", '')
self.phantom_modules_path = self.get_option("phantom_modules_path", "/usr/lib/phantom")
self.ssl = int(self.get_option("ssl", '0'))
self.address = self.get_option(self.OPTION_IP, '127.0.0.1')
self.port = self.get_option(self.OPTION_PORT, '80')
self.tank_type = self.get_option("tank_type", 'http')
self.answ_log = tempfile.mkstemp(".log", "answ_", self.core.artifacts_base_dir)[1]
self.answ_log_level = self.get_option("writelog", "none")
if self.answ_log_level == '0':
self.answ_log_level = 'none'
elif self.answ_log_level == '1':
self.answ_log_level = 'all'
self.phout_file = self.get_option("phout_file", '')
if not self.phout_file:
self.phout_file = tempfile.mkstemp(".log", "phout_", self.core.artifacts_base_dir)[1]
self.core.add_artifact_file(self.phout_file)
else:
self.phout_import_mode = 1
self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.core.artifacts_base_dir)[1]
self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.core.artifacts_base_dir)[1]
self.stpd = self.get_option(self.OPTION_STPD, '')
self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1)
self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))
self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n"))
self.phantom_http_line = self.get_option("phantom_http_line", "")
self.phantom_http_field_num = self.get_option("phantom_http_field_num", "")
self.phantom_http_field = self.get_option("phantom_http_field", "")
self.phantom_http_entity = self.get_option("phantom_http_entity", "")
def configure(self):
# stepper part
self.ammo_file = self.get_option(self.OPTION_AMMOFILE, '')
self.instances_schedule = self.get_option("instances_schedule", '')
self.loop_limit = int(self.get_option(self.OPTION_LOOP, "-1"))
self.ammo_limit = int(self.get_option("ammo_limit", "-1")) # TODO: 3 stepper should implement ammo_limit
sched = self.get_option(self.OPTION_SCHEDULE, '')
sched = " ".join(sched.split("\n"))
sched = sched.split(')')
self.rps_schedule = []
for step in sched:
if step.strip():
self.rps_schedule.append(step.strip() + ')')
self.uris = self.get_option("uris", '').strip().split("\n")
while '' in self.uris:
self.uris.remove('')
self.headers = self.get_option("headers", '').strip().split("\n")
while '' in self.headers:
self.headers.remove('')
self.http_ver = self.get_option("header_http", '1.1')
self.autocases = self.get_option("autocases", '0')
self.use_caching = int(self.get_option("use_caching", '1'))
self.cache_dir = os.path.expanduser(self.get_option("cache_dir", self.core.artifacts_base_dir))
self.force_stepping = int(self.get_option("force_stepping", '0'))
# phantom part
self.__read_phantom_options()
def configure(self):
# plugin part
self.config = self.get_option("config", '')
self.eta_file = self.get_option("eta_file", '')
self.core.add_artifact_file(self.answ_log)
self.core.add_artifact_file(self.stat_log)
self.core.add_artifact_file(self.phantom_log)
self.core.add_artifact_file(self.config)
self.phantom_path = self.get_option("phantom_path", 'phantom')
self.core.add_artifact_file(self.eta_file)
self.core.add_artifact_file(self.config)
self.__check_address()
if self.get_option(PhantomConfig.OPTION_PHOUT, ''):
self.phout_import_mode = 1
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
autostop.add_criteria_class(UsedInstancesCriteria)
except KeyError:
self.log.debug("No autostop plugin found, not adding instances criteria")
def __compose_config(self):
'''
Generate phantom tool run config
'''
if not self.stpd:
raise RuntimeError("Cannot proceed with no source file")
kwargs = {}
kwargs['ssl_transport'] = "transport_t ssl_transport = transport_ssl_t { timeout = 1s } transport = ssl_transport" if self.ssl else ""
kwargs['method_stream'] = "method_stream_ipv6_t" if self.ipv6 else "method_stream_ipv4_t"
kwargs['proto'] = "http_proto" if self.tank_type == 'http' else "none_proto"
kwargs['threads'] = self.threads
kwargs['answ_log'] = self.answ_log
kwargs['answ_log_level'] = self.answ_log_level
kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else ''
kwargs['phout'] = self.phout_file
kwargs['stpd'] = self.stpd
if self.gatling:
kwargs['bind'] = 'bind={ ' + self.gatling + ' }'
else:
kwargs['bind'] = ''
kwargs['ip'] = self.address
kwargs['port'] = self.port
kwargs['timeout'] = self.timeout
kwargs['instances'] = self.instances
kwargs['stat_log'] = self.stat_log
kwargs['phantom_log'] = self.phantom_log
tune = ''
if self.phantom_http_entity:
tune += "entity = " + self.phantom_http_entity + "\n"
if self.phantom_http_field:
tune += "field = " + self.phantom_http_field + "\n"
if self.phantom_http_field_num:
tune += "field_num = " + self.phantom_http_field_num + "\n"
if self.phantom_http_line:
tune += "line = " + self.phantom_http_line + "\n"
if tune:
kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}"
else:
kwargs['reply_limits'] = ''
handle, filename = tempfile.mkstemp(".conf", "phantom_", self.core.artifacts_base_dir)
self.core.add_artifact_file(filename)
self.log.debug("Generating phantom config: %s", filename)
template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read()
tpl = string.Template(template_str)
config = tpl.substitute(kwargs)
os.write(handle, config)
return filename
def __prepare_stepper(self):
'''
Generate test data if necessary
'''
self.stpd = self.__get_stpd_filename()
self.core.set_option(self.SECTION, self.OPTION_STPD, self.stpd)
if self.use_caching and not self.force_stepping and os.path.exists(self.stpd) and os.path.exists(self.stpd + ".conf"):
self.log.info("Using cached stpd-file: %s", self.stpd)
stepper = Stepper(self.stpd) # just to store cached data
self.__read_cached_options(self.stpd + ".conf", stepper)
else:
stepper = self.__make_stpd_file(self.stpd)
self.steps = stepper.steps
#self.core.set_option(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES, stepper.cases)
self.core.set_option(self.SECTION, self.OPTION_STEPS, ' '.join([str(x) for x in stepper.steps]))
self.core.set_option(self.SECTION, self.OPTION_LOADSCHEME, stepper.loadscheme)
self.core.set_option(self.SECTION, self.OPTION_LOOP_COUNT, str(stepper.loop_count))
self.core.set_option(self.SECTION, self.OPTION_AMMO_COUNT, str(stepper.ammo_count))
self.__calculate_test_duration(stepper.steps)
self.core.config.flush(self.stpd + ".conf")
if not self.config and not self.phout_import_mode:
self.phantom = PhantomConfig(self)
self.phantom.read_config()
self.stepper = StepperWrapper(self)
self.stepper.read_config()
def prepare_test(self):
aggregator = None
@ -291,35 +89,36 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener):
aggregator.add_result_listener(self)
if not self.phout_import_mode:
self.__prepare_stepper()
self.stepper.prepare_stepper()
self.phantom.stpd = self.stepper.stpd
if not self.config:
self.config = self.__compose_config()
self.config = self.phantom.compose_config()
args = [self.phantom_path, 'check', self.config]
retcode = tankcore.execute(args, catch_out=True)
if retcode:
raise RuntimeError("Subprocess returned %s",)
try:
console = self.core.get_plugin_of_type(ConsoleOnlinePlugin)
except Exception, ex:
self.log.debug("Console not found: %s", ex)
console = None
if console:
widget = PhantomProgressBarWidget(self)
if self.eta_file:
widget.eta_file = self.eta_file
console.add_info_widget(widget)
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(widget)
widget = PhantomInfoWidget(self)
console.add_info_widget(widget)
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(widget)
try:
console = self.core.get_plugin_of_type(ConsoleOnlinePlugin)
except Exception, ex:
self.log.debug("Console not found: %s", ex)
console = None
if console:
widget = PhantomProgressBarWidget(self)
if self.eta_file:
widget.eta_file = self.eta_file
console.add_info_widget(widget)
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(widget)
widget = PhantomInfoWidget(self)
console.add_info_widget(widget)
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(widget)
def start_test(self):
if not self.phout_import_mode:
@ -362,7 +161,7 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener):
def post_process(self, retcode):
if not retcode:
count = self.get_option(self.OPTION_AMMO_COUNT)
count = self.get_option(StepperWrapper.OPTION_AMMO_COUNT)
if count != self.processed_ammo_count:
self.log.warning("Planned ammo count %s differs from processed %s", count, self.processed_ammo_count)
return retcode
@ -372,90 +171,6 @@ class PhantomPlugin(AbstractPlugin, AggregateResultListener):
self.processed_ammo_count += second_aggregate_data.overall.RPS
def __get_stpd_filename(self):
'''
Choose the name for stepped data file
'''
if self.use_caching:
sep = "|"
hasher = hashlib.md5()
hashed_str = self.instances_schedule + sep + str(self.loop_limit)
hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(self.rps_schedule) + sep + self.autocases
hashed_str += sep + ";".join(self.uris) + sep + ";".join(self.headers)
if self.ammo_file:
if not os.path.exists(self.ammo_file):
raise RuntimeError("Ammo file not found: %s", self.ammo_file)
hashed_str += sep + os.path.realpath(self.ammo_file)
stat = os.stat(self.ammo_file)
cnt = 0
for stat_option in stat:
if cnt == 7: # skip access time
continue
cnt += 1
hashed_str += ";" + str(stat_option)
else:
if not self.uris:
raise RuntimeError("Neither phantom.ammofile nor phantom.uris specified")
hashed_str += sep + ';'.join(self.uris) + sep + ';'.join(self.headers)
self.log.debug("stpd-hash source: %s", hashed_str)
hasher.update(hashed_str)
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
stpd = self.cache_dir + '/' + os.path.basename(self.ammo_file) + "_" + hasher.hexdigest() + ".stpd"
self.log.debug("Generated cache file name: %s", stpd)
else:
stpd = os.path.realpath("ammo.stpd")
return stpd
def __calculate_test_duration(self, steps):
'''
Get total test duration
'''
duration = 0
for rps, dur in tankcore.pairs(steps):
duration += dur
self.core.set_option(self.SECTION, self.OPTION_TEST_DURATION, str(duration))
def __read_cached_options(self, cached_config, stepper):
'''
Merge stpd cached options to current config
'''
self.log.debug("Reading cached stepper options: %s", cached_config)
external_stepper_conf = ConfigParser.ConfigParser()
external_stepper_conf.read(cached_config)
#stepper.cases = external_stepper_conf.get(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES)
stepper.steps = [int(x) for x in external_stepper_conf.get(self.SECTION, self.OPTION_STEPS).split(' ')]
stepper.loadscheme = external_stepper_conf.get(self.SECTION, self.OPTION_LOADSCHEME)
stepper.loop_count = external_stepper_conf.get(self.SECTION, self.OPTION_LOOP_COUNT)
stepper.ammo_count = external_stepper_conf.get(self.SECTION, self.OPTION_AMMO_COUNT)
def __make_stpd_file(self, stpd):
'''
stpd generation using Stepper class
'''
self.log.info("Making stpd-file: %s", self.stpd)
stepper = Stepper(stpd)
stepper.autocases = int(self.autocases)
stepper.rps_schedule = self.rps_schedule
stepper.instances_schedule = self.instances_schedule
stepper.loop_limit = self.loop_limit
stepper.uris = self.uris
stepper.headers = self.headers
stepper.header_http = self.http_ver
stepper.ammofile = self.ammo_file
stepper.generate_stpd()
return stepper
class PhantomProgressBarWidget(AbstractInfoWidget, AggregateResultListener):
'''
@ -468,8 +183,8 @@ class PhantomProgressBarWidget(AbstractInfoWidget, AggregateResultListener):
AbstractInfoWidget.__init__(self)
self.owner = sender
self.ammo_progress = 0
self.ammo_count = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_AMMO_COUNT))
self.test_duration = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_TEST_DURATION))
self.ammo_count = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_AMMO_COUNT))
self.test_duration = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_TEST_DURATION))
self.eta_file = None
def render(self, screen):
@ -531,10 +246,10 @@ class PhantomInfoWidget(AbstractInfoWidget, AggregateResultListener):
self.instances = 0
self.planned = 0
self.RPS = 0
self.instances_limit = int(self.owner.core.get_option(PhantomPlugin.SECTION, PhantomPlugin.OPTION_INSTANCES_LIMIT))
self.instances_limit = int(self.owner.core.get_option(PhantomPlugin.SECTION, PhantomConfig.OPTION_INSTANCES_LIMIT))
self.selfload = 0
self.time_lag = 0
self.ammo_count = int(self.owner.core.get_option(self.owner.SECTION, self.owner.OPTION_AMMO_COUNT))
self.ammo_count = int(self.owner.core.get_option(PhantomPlugin.SECTION, StepperWrapper.OPTION_AMMO_COUNT))
self.planned_rps_duration = 0
def render(self, screen):
@ -608,16 +323,17 @@ class PhantomReader(AbstractReader):
self.last_sample_time = 0
def check_open_files(self):
if not self.phout and os.path.exists(self.phantom.phout_file):
self.log.debug("Opening phout file: %s", self.phantom.phout_file)
self.phout = open(self.phantom.phout_file, 'r')
if not self.phout and self.phantom.phantom and os.path.exists(self.phantom.phantom.phout_file):
self.log.debug("Opening phout file: %s", self.phantom.phantom.phout_file)
self.phout = open(self.phantom.phantom.phout_file, 'r')
# strange decision to place it here, but no better idea yet
for item in tankcore.pairs(self.phantom.steps):
self.steps.append([item[0], item[1]])
if self.phantom.stepper:
for item in tankcore.pairs(self.phantom.stepper.steps):
self.steps.append([item[0], item[1]])
if not self.stat and self.phantom.stat_log and os.path.exists(self.phantom.stat_log):
self.log.debug("Opening stat file: %s", self.phantom.stat_log)
self.stat = open(self.phantom.stat_log, 'r')
if not self.stat and self.phantom.phantom and self.phantom.phantom.stat_log and os.path.exists(self.phantom.phantom.stat_log):
self.log.debug("Opening stat file: %s", self.phantom.phantom.stat_log)
self.stat = open(self.phantom.phantom.stat_log, 'r')
def get_next_sample(self, force):
if self.stat:
@ -806,3 +522,324 @@ class UsedInstancesCriteria(AbstractCriteria):
items = (self.get_level_str(), self.seconds_count, self.seconds_limit)
return ("Instances >%s for %s/%ss" % items, float(self.seconds_count) / self.seconds_limit)
class PhantomConfig:
OPTION_INSTANCES_LIMIT = 'instances'
OPTION_PHOUT = "phout_file"
OPTION_STPD = 'stpd_file'
def __init__(self, owner):
self.owner = owner
self.log = logging.getLogger(__name__)
# phant
self.timeout = 1000
self.answ_log = None
self.phout_file = None
self.stat_log = None
self.phantom_log = None
self.instances = None
self.phantom_start_time = None
self.ipv6 = None
self.phantom_modules_path = None
self.ssl = None
self.address = None
self.port = None
self.tank_type = None
self.answ_log_level = None
self.stpd = None
self.threads = None
self.gatling = None
self.phantom_http_line = None
self.phantom_http_field_num = None
self.phantom_http_field = None
self.phantom_http_entity = None
def get_option(self, option_ammofile, param2=None):
return self.owner.get_option(option_ammofile, param2)
def __check_address(self):
'''
Analyse target address setting, resolve it to IP
'''
try:
ipaddr.IPv6Address(self.address)
self.ipv6 = True
except AddressValueError:
self.log.debug("Not ipv6 address: %s", self.address)
self.ipv6 = False
address_port = self.address.split(":")
self.address = address_port[0]
if len(address_port) > 1:
self.port = address_port[1]
try:
ipaddr.IPv4Address(self.address)
except AddressValueError:
self.log.debug("Not ipv4 address: %s", self.address)
ip_addr = socket.gethostbyname(self.address)
reverse_name = socket.gethostbyaddr(ip_addr)[0]
self.log.debug("Address %s ip_addr: %s, reverse-resolve: %s", self.address, ip_addr, reverse_name)
if reverse_name.startswith(self.address):
self.address = ip_addr
else:
raise ValueError("Address %s reverse-resolved to %s, but must match" % (self.address, reverse_name))
def read_config(self):
'''
Read phantom tool specific options
'''
self.phantom_modules_path = self.get_option("phantom_modules_path", "/usr/lib/phantom")
self.ssl = int(self.get_option("ssl", '0'))
self.address = self.get_option('address', '127.0.0.1')
self.port = self.get_option('port', '80')
self.tank_type = self.get_option("tank_type", 'http')
self.answ_log = tempfile.mkstemp(".log", "answ_", self.owner.core.artifacts_base_dir)[1]
self.answ_log_level = self.get_option("writelog", "none")
if self.answ_log_level == '0':
self.answ_log_level = 'none'
elif self.answ_log_level == '1':
self.answ_log_level = 'all'
self.phout_file = tempfile.mkstemp(".log", "phout_", self.owner.core.artifacts_base_dir)[1]
self.owner.core.add_artifact_file(self.phout_file)
self.stat_log = tempfile.mkstemp(".log", "phantom_stat_", self.owner.core.artifacts_base_dir)[1]
self.phantom_log = tempfile.mkstemp(".log", "phantom_", self.owner.core.artifacts_base_dir)[1]
self.stpd = self.get_option(self.OPTION_STPD, '')
self.threads = self.get_option("threads", int(multiprocessing.cpu_count() / 2) + 1)
self.instances = int(self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))
self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n"))
self.phantom_http_line = self.get_option("phantom_http_line", "")
self.phantom_http_field_num = self.get_option("phantom_http_field_num", "")
self.phantom_http_field = self.get_option("phantom_http_field", "")
self.phantom_http_entity = self.get_option("phantom_http_entity", "")
self.__check_address()
self.owner.core.add_artifact_file(self.answ_log)
self.owner.core.add_artifact_file(self.stat_log)
self.owner.core.add_artifact_file(self.phantom_log)
def compose_config(self):
'''
Generate phantom tool run config
'''
if not self.stpd:
raise RuntimeError("Cannot proceed with no source file")
kwargs = {}
kwargs['ssl_transport'] = "transport_t ssl_transport = transport_ssl_t { timeout = 1s } transport = ssl_transport" if self.ssl else ""
kwargs['method_stream'] = "method_stream_ipv6_t" if self.ipv6 else "method_stream_ipv4_t"
kwargs['proto'] = "http_proto" if self.tank_type == 'http' else "none_proto"
kwargs['threads'] = self.threads
kwargs['answ_log'] = self.answ_log
kwargs['answ_log_level'] = self.answ_log_level
kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else ''
kwargs['phout'] = self.phout_file
kwargs['stpd'] = self.stpd
if self.gatling:
kwargs['bind'] = 'bind={ ' + self.gatling + ' }'
else:
kwargs['bind'] = ''
kwargs['ip'] = self.address
kwargs['port'] = self.port
kwargs['timeout'] = self.timeout
kwargs['instances'] = self.instances
kwargs['stat_log'] = self.stat_log
kwargs['phantom_log'] = self.phantom_log
tune = ''
if self.phantom_http_entity:
tune += "entity = " + self.phantom_http_entity + "\n"
if self.phantom_http_field:
tune += "field = " + self.phantom_http_field + "\n"
if self.phantom_http_field_num:
tune += "field_num = " + self.phantom_http_field_num + "\n"
if self.phantom_http_line:
tune += "line = " + self.phantom_http_line + "\n"
if tune:
kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}"
else:
kwargs['reply_limits'] = ''
handle, filename = tempfile.mkstemp(".conf", "phantom_", self.owner.core.artifacts_base_dir)
self.owner.core.add_artifact_file(filename)
self.log.debug("Generating phantom config: %s", filename)
template_str = open(os.path.dirname(__file__) + "/phantom.conf.tpl", 'r').read()
tpl = string.Template(template_str)
config = tpl.substitute(kwargs)
os.write(handle, config)
return filename
class StepperWrapper:
OPTION_STPD = 'stpd_file'
OPTION_STEPS = 'steps'
OPTION_TEST_DURATION = 'test_duration'
OPTION_AMMO_COUNT = 'ammo_count'
OPTION_LOOP = 'loop'
OPTION_LOOP_COUNT = 'loop_count'
OPTION_AMMOFILE = "ammofile"
OPTION_SCHEDULE = 'rps_schedule'
OPTION_LOADSCHEME = 'loadscheme'
def __init__(self, owner):
self.owner = owner
self.log = logging.getLogger(__name__)
# stepper
self.rps_schedule = []
self.use_caching = None
self.http_ver = None
self.steps = []
self.ammo_file = None
self.instances_schedule = None
self.loop_limit = None
self.ammo_limit = None
self.uris = None
self.headers = None
self.autocases = None
self.cache_dir = None
self.force_stepping = None
self.stpd = None
def get_option(self, option_ammofile, param2=None):
return self.owner.get_option(option_ammofile, param2)
def read_config(self):
# stepper part
self.ammo_file = self.get_option(self.OPTION_AMMOFILE, '')
self.instances_schedule = self.get_option("instances_schedule", '')
self.loop_limit = int(self.get_option(self.OPTION_LOOP, "-1"))
self.ammo_limit = int(self.get_option("ammo_limit", "-1")) # TODO: 3 stepper should implement ammo_limit
sched = self.get_option(self.OPTION_SCHEDULE, '')
sched = " ".join(sched.split("\n"))
sched = sched.split(')')
self.rps_schedule = []
for step in sched:
if step.strip():
self.rps_schedule.append(step.strip() + ')')
self.uris = self.get_option("uris", '').strip().split("\n")
while '' in self.uris:
self.uris.remove('')
self.headers = self.get_option("headers", '').strip().split("\n")
while '' in self.headers:
self.headers.remove('')
self.http_ver = self.get_option("header_http", '1.1')
self.autocases = self.get_option("autocases", '0')
self.use_caching = int(self.get_option("use_caching", '1'))
self.cache_dir = os.path.expanduser(self.get_option("cache_dir", self.owner.core.artifacts_base_dir))
self.force_stepping = int(self.get_option("force_stepping", '0'))
def prepare_stepper(self):
'''
Generate test data if necessary
'''
self.stpd = self.__get_stpd_filename()
self.owner.core.set_option(self.owner.SECTION, self.OPTION_STPD, self.stpd)
if self.use_caching and not self.force_stepping and os.path.exists(self.stpd) and os.path.exists(self.stpd + ".conf"):
self.log.info("Using cached stpd-file: %s", self.stpd)
stepper = Stepper(self.stpd) # just to store cached data
self.__read_cached_options(self.stpd + ".conf", stepper)
else:
stepper = self.__make_stpd_file(self.stpd)
self.steps = stepper.steps
#self.core.set_option(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES, stepper.cases)
self.owner.core.set_option(self.owner.SECTION, self.OPTION_STEPS, ' '.join([str(x) for x in stepper.steps]))
self.owner.core.set_option(self.owner.SECTION, self.OPTION_LOADSCHEME, stepper.loadscheme)
self.owner.core.set_option(self.owner.SECTION, self.OPTION_LOOP_COUNT, str(stepper.loop_count))
self.owner.core.set_option(self.owner.SECTION, self.OPTION_AMMO_COUNT, str(stepper.ammo_count))
self.__calculate_test_duration(stepper.steps)
self.owner.core.config.flush(self.stpd + ".conf")
def __get_stpd_filename(self):
'''
Choose the name for stepped data file
'''
if self.use_caching:
sep = "|"
hasher = hashlib.md5()
hashed_str = self.instances_schedule + sep + str(self.loop_limit)
hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(self.rps_schedule) + sep + self.autocases
hashed_str += sep + ";".join(self.uris) + sep + ";".join(self.headers)
if self.ammo_file:
if not os.path.exists(self.ammo_file):
raise RuntimeError("Ammo file not found: %s", self.ammo_file)
hashed_str += sep + os.path.realpath(self.ammo_file)
stat = os.stat(self.ammo_file)
cnt = 0
for stat_option in stat:
if cnt == 7: # skip access time
continue
cnt += 1
hashed_str += ";" + str(stat_option)
else:
if not self.uris:
raise RuntimeError("Neither phantom.ammofile nor phantom.uris specified")
hashed_str += sep + ';'.join(self.uris) + sep + ';'.join(self.headers)
self.log.debug("stpd-hash source: %s", hashed_str)
hasher.update(hashed_str)
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
stpd = self.cache_dir + '/' + os.path.basename(self.ammo_file) + "_" + hasher.hexdigest() + ".stpd"
self.log.debug("Generated cache file name: %s", stpd)
else:
stpd = os.path.realpath("ammo.stpd")
return stpd
def __calculate_test_duration(self, steps):
'''
Get total test duration
'''
duration = 0
for rps, dur in tankcore.pairs(steps):
duration += dur
self.owner.core.set_option(self.owner.SECTION, self.OPTION_TEST_DURATION, str(duration))
def __read_cached_options(self, cached_config, stepper):
'''
Merge stpd cached options to current config
'''
self.log.debug("Reading cached stepper options: %s", cached_config)
external_stepper_conf = ConfigParser.ConfigParser()
external_stepper_conf.read(cached_config)
#stepper.cases = external_stepper_conf.get(AggregatorPlugin.SECTION, AggregatorPlugin.OPTION_CASES)
stepper.steps = [int(x) for x in external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_STEPS).split(' ')]
stepper.loadscheme = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_LOADSCHEME)
stepper.loop_count = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_LOOP_COUNT)
stepper.ammo_count = external_stepper_conf.get(PhantomPlugin.SECTION, self.OPTION_AMMO_COUNT)
def __make_stpd_file(self, stpd):
'''
stpd generation using Stepper class
'''
self.log.info("Making stpd-file: %s", self.stpd)
stepper = Stepper(stpd)
stepper.autocases = int(self.autocases)
stepper.rps_schedule = self.rps_schedule
stepper.instances_schedule = self.instances_schedule
stepper.loop_limit = self.loop_limit
stepper.uris = self.uris
stepper.headers = self.headers
stepper.header_http = self.http_ver
stepper.ammofile = self.ammo_file
stepper.generate_stpd()
return stepper

View File

@ -90,7 +90,7 @@ class PhantomPluginTestCase(TankTestCase):
def test_multiload_parsing(self):
self.foo.core.set_option('phantom', 'rps_schedule', 'const(1,1) line(1,100,60)\nstep(1,10,1,10)')
self.foo.configure()
self.assertEquals(['const(1,1)', 'line(1,100,60)', 'step(1,10,1,10)'], self.foo.rps_schedule)
self.assertEquals(['const(1,1)', 'line(1,100,60)', 'step(1,10,1,10)'], self.foo.stepper.rps_schedule)
def test_reader(self):
self.foo.phout_file = 'data/phout_timeout_mix.txt'

View File

@ -4,8 +4,7 @@ plugin_console=Tank/Plugins/ConsoleOnline.py
[phantom]
phantom_path=phantom
config=data/phantom_ready.conf
stpd_file=data/ammo.stpd
#config=data/phantom_ready.conf
ammofile=data/dummy.ammo
instances_schedule=line(1,10,1m)
loop=1