yandex-tank/Tank/stepper/main.py
Alexey Lavrenuke (load testing) 6b75794855 exit when StopIteration raised
2013-09-19 15:52:46 +04:00

279 lines
10 KiB
Python

'''
Module contains top-level generators.
'''
from itertools import izip
import format as fmt
from config import ComponentFactory
import info
import os
import json
import hashlib
import logging
import re
class AmmoFactory(object):
'''
A generator that produces ammo.
'''
def __init__(self, factory):
'''
Factory parameter is a configured ComponentFactory that
is able to produce load plan and ammo generator.
'''
self.factory = factory
self.load_plan = factory.get_load_plan()
self.ammo_generator = factory.get_ammo_generator()
self.filter = lambda missile: True
self.marker = factory.get_marker()
def __iter__(self):
'''
Returns a generator of (timestamp, marker, missile) tuples
where missile is in a string representation. Load Plan (timestamps
generator) and ammo generator are taken from the previously
configured ComponentFactory, passed as a parameter to the
__init__ method of this class.
'''
for ammo_tuple in (
(timestamp, marker or self.marker(missile), missile)
for timestamp, (missile, marker)
in izip(self.load_plan, self.ammo_generator)
):
yield ammo_tuple
class Stepper(object):
def __init__(self, **kwargs):
info.status = info.StepperStatus()
self.af = AmmoFactory(ComponentFactory(**kwargs))
self.ammo = fmt.Stpd(self.af)
def write(self, f):
for missile in self.ammo:
f.write(missile)
try:
info.status.inc_ammo_count()
except StopIteration:
break
class StepperWrapper(object):
# TODO: review and rewrite this class
'''
Wrapper for cached stepper functionality
'''
OPTION_STPD = 'stpd_file'
OPTION_STEPS = 'steps'
OPTION_TEST_DURATION = 'test_duration'
OPTION_AMMO_COUNT = 'ammo_count'
OPTION_LOOP = 'loop'
OPTION_LOOP_COUNT = 'loop_count'
OPTION_AMMOFILE = "ammofile"
OPTION_SCHEDULE = 'rps_schedule'
OPTION_LOADSCHEME = 'loadscheme'
OPTION_INSTANCES_LIMIT = 'instances'
def __init__(self, core, section):
self.log = logging.getLogger(__name__)
self.core = core
self.section = section
self.cache_dir = '.'
# per-shoot params
self.instances = 1000
self.rps_schedule = []
self.http_ver = '1.0'
self.ammo_file = None
self.instances_schedule = ''
self.loop_limit = -1
self.ammo_limit = -1
self.uris = []
self.headers = []
self.autocases = 0
self.use_caching = True
self.force_stepping = None
self.ammo_type = 'phantom'
# out params
self.stpd = None
self.steps = []
self.ammo_count = 1
self.duration = 0
self.loop_count = 0
self.loadscheme = ""
self.file_cache = 8192
def get_option(self, option_ammofile, param2=None):
''' get_option wrapper'''
result = self.core.get_option(self.section, option_ammofile, param2)
self.log.debug(
"Option %s.%s = %s", self.section, option_ammofile, result)
return result
@staticmethod
def get_available_options():
opts = [StepperWrapper.OPTION_AMMOFILE, StepperWrapper.OPTION_LOOP,
StepperWrapper.OPTION_SCHEDULE, StepperWrapper.OPTION_STPD]
opts += ["instances_schedule", "uris",
"headers", "header_http", "autocases", "ammo_type"]
opts += ["use_caching", "cache_dir", "force_stepping", "file_cache"]
return opts
def read_config(self):
''' stepper part of reading options '''
self.log.info("Configuring StepperWrapper...")
self.ammo_file = self.get_option(self.OPTION_AMMOFILE, '')
self.ammo_type = self.get_option('ammo_type', self.ammo_type)
if self.ammo_file:
self.ammo_file = os.path.expanduser(self.ammo_file)
self.loop_limit = int(self.get_option(self.OPTION_LOOP, "-1"))
self.ammo_limit = int(self.get_option("ammo_limit", "-1"))
# TODO: 3 stepper should implement ammo_limit
def make_steps(schedule):
steps = []
for step in " ".join(schedule.split("\n")).split(')'):
if step.strip():
steps.append(step.strip() + ')')
return steps
self.rps_schedule = make_steps(
self.get_option(self.OPTION_SCHEDULE, ''))
self.instances_schedule = make_steps(
self.get_option("instances_schedule", ''))
self.instances = int(
self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))
self.uris = self.get_option("uris", '').strip().split("\n")
while '' in self.uris:
self.uris.remove('')
rx = re.compile('\[(.*?)\]')
self.headers = rx.findall(self.get_option("headers", ''))
self.http_ver = self.get_option("header_http", self.http_ver)
self.autocases = self.get_option("autocases", '0')
self.use_caching = int(self.get_option("use_caching", '1'))
self.file_cache = int(self.get_option('file_cache', '8192'))
cache_dir = self.core.get_option(
self.section, "cache_dir", self.core.artifacts_base_dir)
self.cache_dir = os.path.expanduser(cache_dir)
self.force_stepping = int(self.get_option("force_stepping", '0'))
self.stpd = self.get_option(self.OPTION_STPD, "")
def prepare_stepper(self):
''' Generate test data if necessary '''
if not self.stpd:
self.stpd = self.__get_stpd_filename()
self.core.set_option(self.section, self.OPTION_STPD, self.stpd)
if self.use_caching and not self.force_stepping and os.path.exists(self.stpd) and os.path.exists(self.__si_filename()):
self.log.info("Using cached stpd-file: %s", self.stpd)
stepper_info = self.__read_cached_options()
info.status.publish('instances', stepper_info.instances)
info.status.publish('loadscheme', stepper_info.loadscheme)
info.status.publish('loop_count', stepper_info.loop_count)
info.status.publish('steps', stepper_info.steps)
info.status.publish('duration', stepper_info.duration)
info.status.ammo_count = stepper_info.ammo_count
else:
if self.force_stepping and os.path.exists(self.__si_filename()):
os.remove(self.__si_filename())
self.__make_stpd_file()
stepper_info = info.status.get_info()
self.__write_cached_options(stepper_info)
self.ammo_count = stepper_info.ammo_count
self.duration = stepper_info.duration
self.loop_count = stepper_info.loop_count
self.loadscheme = stepper_info.loadscheme
self.steps = stepper_info.steps
self.instances = stepper_info.instances
def __si_filename(self):
'''Return name for stepper_info json file'''
return "%s_si.json" % self.stpd
def __get_stpd_filename(self):
''' Choose the name for stepped data file '''
if self.use_caching:
sep = "|"
hasher = hashlib.md5()
hashed_str = "cache version 3" + sep + \
';'.join(self.instances_schedule) + sep + str(self.loop_limit)
hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(
self.rps_schedule) + sep + str(self.autocases)
hashed_str += sep + \
";".join(self.uris) + sep + ";".join(
self.headers) + sep + self.http_ver
if self.ammo_file:
if not os.path.exists(self.ammo_file):
raise RuntimeError(
"Ammo file not found: %s" % self.ammo_file)
hashed_str += sep + os.path.realpath(self.ammo_file)
stat = os.stat(self.ammo_file)
cnt = 0
for stat_option in stat:
if cnt == 7: # skip access time
continue
cnt += 1
hashed_str += ";" + str(stat_option)
hashed_str += ";" + str(os.path.getmtime(self.ammo_file))
else:
if not self.uris:
raise RuntimeError(
"Neither ammofile nor uris specified")
hashed_str += sep + \
';'.join(self.uris) + sep + ';'.join(self.headers)
self.log.debug("stpd-hash source: %s", hashed_str)
hasher.update(hashed_str)
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
stpd = self.cache_dir + '/' + \
os.path.basename(self.ammo_file) + \
"_" + hasher.hexdigest() + ".stpd"
else:
stpd = os.path.realpath("ammo.stpd")
self.log.debug("Generated cache file name: %s", stpd)
return stpd
def __read_cached_options(self):
'''
Read stepper info from json
'''
self.log.debug("Reading cached stepper info: %s", self.__si_filename())
with open(self.__si_filename(), 'r') as si_file:
si = info.StepperInfo(**json.load(si_file))
return si
def __write_cached_options(self, si):
'''
Write stepper info to json
'''
# TODO: format json
self.log.debug("Saving stepper info: %s", self.__si_filename())
with open(self.__si_filename(), 'w') as si_file:
json.dump(si._asdict(), si_file, indent=4)
def __make_stpd_file(self):
''' stpd generation using Stepper class '''
self.log.info("Making stpd-file: %s", self.stpd)
stepper = Stepper(
rps_schedule=self.rps_schedule,
http_ver=self.http_ver,
ammo_file=self.ammo_file,
instances_schedule=self.instances_schedule,
instances=self.instances,
loop_limit=self.loop_limit,
ammo_limit=self.ammo_limit,
uris=self.uris,
headers=[header.strip('[]') for header in self.headers],
autocases=self.autocases,
ammo_type=self.ammo_type,
)
with open(self.stpd, 'w', self.file_cache) as os:
stepper.write(os)