yandex-tank/Tank/Plugins/Aggregator.py
Andrey Pohilko d1bb6c85e0 Use AB's concurrency as threads count
Fix mistake in uploader config
Remove unnecessary options from jmeter and phantom
2012-09-26 18:02:55 +04:00

262 lines
9.2 KiB
Python

from Tank import Utils
from Tank.Core import AbstractPlugin
import copy
import datetime
import logging
import math
import time
class AggregateResultListener:
def aggregate_second(self, second_aggregate_data):
raise TypeError("Abstract method needs to be overridden")
class AggregatorPlugin(AbstractPlugin):
default_time_periods = "1ms 2 3 4 5 6 7 8 9 10 20 30 40 50 60 70 80 90 100 150 200 250 300 350 400 450 500 600 650 700 750 800 850 900 950 1s 1500 2s 2500 3s 3500 4s 4500 5s 5500 6s 6500 7s 7500 8s 8500 9s 9500 10s 11s"
SECTION = 'aggregator'
@staticmethod
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
self.process = None
self.second_data_listeners = []
self.preproc_out_offset = 0
self.buffer = []
self.second_data_draft = []
self.preproc_out_filename = None
self.cumulative_data = SecondAggregateDataTotalItem()
self.reader = None
self.time_periods = [ Utils.expand_to_milliseconds(x) for x in self.default_time_periods.split(' ') ]
self.last_sample_time = 0
def configure(self):
periods = self.get_option("time_periods", self.default_time_periods).split(" ")
self.time_periods = [ Utils.expand_to_milliseconds(x) for x in periods ]
self.core.set_option(self.SECTION, "time_periods", " ".join([ str(x) for x in periods ]))
def start_test(self):
if not self.reader:
self.log.warning("No one set reader for aggregator yet")
def __generate_zero_samples(self, data):
if not data:
return
while self.last_sample_time and int(time.mktime(data.time.timetuple())) - self.last_sample_time > 1:
self.last_sample_time += 1
self.log.debug("Adding zero sample: %s", self.last_sample_time)
zero = self.reader.get_zero_sample(datetime.datetime.fromtimestamp(self.last_sample_time))
self.__notify_listeners(zero)
self.last_sample_time = int(time.mktime(data.time.timetuple()))
def __read_samples(self, limit=0, force=False):
if self.reader:
self.reader.check_open_files()
data = self.reader.get_next_sample(force)
count = 0
while data and (limit < 1 or count < limit):
self.__generate_zero_samples(data)
self.__notify_listeners(data)
data = self.reader.get_next_sample(force)
count += 1
def is_test_finished(self):
self.__read_samples(5)
return -1
def end_test(self, retcode):
self.__read_samples(force=True)
return retcode
def add_result_listener(self, listener):
self.second_data_listeners += [listener]
def __notify_listeners(self, data):
self.log.debug("Notifying listeners about second: %s , %s responses", data.time, data.overall.RPS)
for listener in self.second_data_listeners:
listener.aggregate_second(data)
def get_timeout(self):
return self.time_periods[-1:][0]
class SecondAggregateData:
def __init__(self, cimulative_item=None):
self.cases = {}
self.time = None
# @type self.overall: SecondAggregateDataItem
self.overall = SecondAggregateDataItem()
self.cumulative = cimulative_item
class SecondAggregateDataItem:
QUANTILES = [0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.98, 0.99, 1.00]
def __init__(self):
self.log = logging.getLogger(__name__)
self.case = None
self.planned_requests = 0
self.active_threads = 0
self.selfload = 0
self.RPS = 0
self.http_codes = {}
self.net_codes = {}
self.times_dist = []
self.quantiles = {}
self.dispersion = 0
self.input = 0
self.output = 0
self.avg_connect_time = 0
self.avg_send_time = 0
self.avg_latency = 0
self.avg_receive_time = 0
self.avg_response_time = 0
class SecondAggregateDataTotalItem:
def __init__(self):
self.avg_connect_time = 0
self.avg_send_time = 0
self.avg_latency = 0
self.avg_receive_time = 0
self.avg_response_time = 0
self.total_count = 0
self.times_dist = {}
def add_data(self, overall_item):
for time_item in overall_item.times_dist:
self.total_count += time_item['count']
if time_item['from'] in self.times_dist.keys():
self.times_dist[time_item['from']]['count'] += time_item['count']
else:
self.times_dist[time_item['from']] = time_item
# ===============================================================
class AbstractReader:
'''
Parent class for all source reading adapters
'''
def __init__(self, owner):
self.aggregator = owner
self.log = logging.getLogger(__name__)
self.cumulative = SecondAggregateDataTotalItem()
self.data_queue = []
self.data_buffer = {}
def check_open_files(self):
pass
def get_next_sample(self, force):
pass
def parse_second(self, next_time, data):
self.log.debug("Parsing second: %s", next_time)
result = self.get_zero_sample(datetime.datetime.fromtimestamp(next_time))
for item in data:
self.__append_sample(result.overall, item)
marker = item[0]
if marker:
if not marker in result.cases.keys():
result.cases[marker] = SecondAggregateDataItem()
self.__append_sample(result.cases[marker], item)
self.__calculate_aggregates(result.overall)
for case in result.cases.values():
self.__calculate_aggregates(case)
self.cumulative.add_data(result.overall)
return result
def __calculate_aggregates(self, item):
if item.RPS:
if item.avg_response_time:
item.selfload = 100 * item.selfload / item.RPS
item.avg_connect_time /= item.RPS
item.avg_send_time /= item.RPS
item.avg_latency /= item.RPS
item.avg_receive_time /= item.RPS
item.avg_response_time /= item.RPS
item.times_dist.sort()
count = 0.0
quantiles = copy.copy(SecondAggregateDataItem.QUANTILES)
times = copy.copy(self.aggregator.time_periods)
time_from = 0
time_to = times.pop(0)
times_dist_draft = []
times_dist_item = {'from': time_from, 'to': time_to, 'count':0}
deviation = 0.0
for timing in item.times_dist:
count += 1
if quantiles and (count / item.RPS) >= quantiles[0]:
level = quantiles.pop(0)
item.quantiles[level * 100] = timing
while times and timing > time_to:
time_from = time_to
time_to = times.pop(0)
if times_dist_item['count']:
times_dist_draft.append(times_dist_item)
times_dist_item = {'from': time_from, 'to': time_to, 'count':0}
times_dist_item['count'] += 1
deviation += math.pow(item.avg_response_time - timing, 2)
while quantiles:
level = quantiles.pop(0)
item.quantiles[level * 100] = timing
if times_dist_item['count']:
times_dist_draft.append(times_dist_item)
item.dispersion = deviation / item.RPS
item.times_dist = times_dist_draft
def __append_sample(self, result, item):
(marker, threads, overall_rt, http_code, net_code, sent_bytes, received_bytes, connect, send, latency, receive, accuracy) = item
result.case = marker
result.active_threads = threads
result.planned_requests = 0
result.RPS += 1
if http_code and http_code != '0':
if not http_code in result.http_codes.keys():
result.http_codes[http_code] = 0
result.http_codes[http_code] += 1
if not net_code in result.net_codes.keys():
result.net_codes[net_code] = 0
result.net_codes[net_code] += 1
result.input += received_bytes
result.output += sent_bytes
result.avg_connect_time += connect
result.avg_send_time += send
result.avg_latency += latency
result.avg_receive_time += receive
result.avg_response_time += overall_rt
result.selfload += accuracy
result.times_dist.append(overall_rt)
def get_zero_sample(self, date_time):
res = SecondAggregateData(self.cumulative)
res.time = date_time
return res
def pop_second(self):
# FIXME: 2 add empty samples for non-responsive seconds
next_time = self.data_queue.pop(0)
data = self.data_buffer[next_time]
del self.data_buffer[next_time]
res = self.parse_second(next_time, data)
return res