2014-03-13 09:36:16 +00:00
|
|
|
""" Core module to calculate aggregate data """
|
2012-09-24 11:01:45 +00:00
|
|
|
import copy
|
2012-09-12 13:18:58 +00:00
|
|
|
import datetime
|
|
|
|
import logging
|
2012-09-24 10:33:20 +00:00
|
|
|
import math
|
2012-09-24 11:01:45 +00:00
|
|
|
import time
|
2012-09-12 13:18:58 +00:00
|
|
|
|
2015-02-02 17:24:32 +00:00
|
|
|
from yandextank.core import AbstractPlugin
|
|
|
|
import yandextank.core as tankcore
|
2014-03-13 09:36:16 +00:00
|
|
|
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
class AggregateResultListener:
|
2014-03-13 09:36:16 +00:00
|
|
|
""" listener to be notified about aggregate data """
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2014-03-13 09:36:16 +00:00
|
|
|
def __init__(self):
|
|
|
|
pass
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
def aggregate_second(self, second_aggregate_data):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" notification about new aggregate data """
|
2012-10-23 17:24:58 +00:00
|
|
|
raise NotImplementedError("Abstract method needs to be overridden")
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
class AggregatorPlugin(AbstractPlugin):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" Plugin that manages aggregation """
|
|
|
|
default_time_periods = "1ms 2 3 4 5 6 7 8 9 10 20 30 40 50 60 70 80 90 100 " \
|
|
|
|
"150 200 250 300 350 400 450 500 600 650 700 750 800 850 900 950 1s " \
|
|
|
|
"1500 2s 2500 3s 3500 4s 4500 5s 5500 6s 6500 7s 7500 8s 8500 9s 9500 10s 11s"
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
SECTION = 'aggregator'
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
@staticmethod
|
|
|
|
def get_key():
|
|
|
|
return __file__
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
def __init__(self, core):
|
2012-09-13 10:19:33 +00:00
|
|
|
AbstractPlugin.__init__(self, core)
|
2012-09-12 13:18:58 +00:00
|
|
|
self.process = None
|
|
|
|
self.second_data_listeners = []
|
|
|
|
self.preproc_out_offset = 0
|
|
|
|
self.buffer = []
|
|
|
|
self.second_data_draft = []
|
|
|
|
self.preproc_out_filename = None
|
2012-09-19 07:17:56 +00:00
|
|
|
self.cumulative_data = SecondAggregateDataTotalItem()
|
2012-09-20 06:55:45 +00:00
|
|
|
self.reader = None
|
2013-08-21 12:29:23 +00:00
|
|
|
self.time_periods = [tankcore.expand_to_milliseconds(x)
|
|
|
|
for x in self.default_time_periods.split(' ')]
|
2012-09-24 11:01:45 +00:00
|
|
|
self.last_sample_time = 0
|
2013-03-15 10:37:37 +00:00
|
|
|
self.precise_cumulative = 1
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2013-03-22 13:55:13 +00:00
|
|
|
def get_available_options(self):
|
|
|
|
return ["time_periods", "precise_cumulative"]
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
def configure(self):
|
2013-08-21 12:29:23 +00:00
|
|
|
periods = self.get_option(
|
|
|
|
"time_periods", self.default_time_periods).split(" ")
|
|
|
|
self.time_periods = [
|
|
|
|
tankcore.expand_to_milliseconds(x) for x in periods]
|
|
|
|
self.core.set_option(
|
|
|
|
self.SECTION, "time_periods", " ".join([str(x) for x in periods]))
|
|
|
|
self.precise_cumulative = int(
|
|
|
|
self.get_option("precise_cumulative", '1'))
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
def start_test(self):
|
2012-09-20 06:55:45 +00:00
|
|
|
if not self.reader:
|
2013-03-14 12:17:56 +00:00
|
|
|
self.log.warning("No one had set reader for aggregate data yet...")
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-10-18 10:46:50 +00:00
|
|
|
def is_test_finished(self):
|
2013-02-21 12:57:46 +00:00
|
|
|
# read up to 2 samples in single pass
|
2013-08-21 12:29:23 +00:00
|
|
|
self.__read_samples(2)
|
2012-10-18 10:46:50 +00:00
|
|
|
return -1
|
|
|
|
|
|
|
|
def end_test(self, retcode):
|
|
|
|
self.__read_samples(force=True)
|
2012-10-22 11:41:59 +00:00
|
|
|
if self.reader:
|
|
|
|
self.reader.close_files()
|
2013-08-21 12:29:23 +00:00
|
|
|
return retcode
|
|
|
|
|
2012-10-18 10:46:50 +00:00
|
|
|
def add_result_listener(self, listener):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" add object to data listeners """
|
2013-03-14 12:17:56 +00:00
|
|
|
self.second_data_listeners.append(listener)
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-10-18 10:46:50 +00:00
|
|
|
def __notify_listeners(self, data):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" notify all listeners about aggregate data """
|
2013-08-21 12:29:23 +00:00
|
|
|
self.log.debug(
|
|
|
|
"Notifying listeners about second: %s , %s/%s req/responses",
|
|
|
|
data.time, data.overall.planned_requests, data.overall.RPS)
|
2012-10-18 10:46:50 +00:00
|
|
|
for listener in self.second_data_listeners:
|
|
|
|
listener.aggregate_second(data)
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-10-18 10:46:50 +00:00
|
|
|
def get_timeout(self):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" get timeout based on time_periods last val """
|
2012-10-18 10:46:50 +00:00
|
|
|
return self.time_periods[-1:][0]
|
2012-09-24 11:01:45 +00:00
|
|
|
|
|
|
|
def __generate_zero_samples(self, data):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" fill timeline gaps with zero samples """
|
2012-09-24 11:01:45 +00:00
|
|
|
if not data:
|
2013-08-21 12:29:23 +00:00
|
|
|
return
|
2012-09-24 11:01:45 +00:00
|
|
|
while self.last_sample_time and int(time.mktime(data.time.timetuple())) - self.last_sample_time > 1:
|
|
|
|
self.last_sample_time += 1
|
2012-11-20 10:59:07 +00:00
|
|
|
self.log.warning("Adding zero sample: %s", self.last_sample_time)
|
2013-08-21 12:29:23 +00:00
|
|
|
zero = self.reader.get_zero_sample(
|
|
|
|
datetime.datetime.fromtimestamp(self.last_sample_time))
|
2012-09-24 11:01:45 +00:00
|
|
|
self.__notify_listeners(zero)
|
|
|
|
self.last_sample_time = int(time.mktime(data.time.timetuple()))
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-23 13:21:13 +00:00
|
|
|
def __read_samples(self, limit=0, force=False):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" call reader object to read next sample set """
|
2012-09-20 06:55:45 +00:00
|
|
|
if self.reader:
|
|
|
|
self.reader.check_open_files()
|
2012-09-20 10:16:27 +00:00
|
|
|
data = self.reader.get_next_sample(force)
|
2012-09-20 06:55:45 +00:00
|
|
|
count = 0
|
2012-11-20 10:59:07 +00:00
|
|
|
while data:
|
2012-11-20 10:21:18 +00:00
|
|
|
self.last_sample_time = int(time.mktime(data.time.timetuple()))
|
2012-09-24 11:01:45 +00:00
|
|
|
self.__generate_zero_samples(data)
|
2012-09-23 13:21:13 +00:00
|
|
|
self.__notify_listeners(data)
|
2012-11-20 10:59:07 +00:00
|
|
|
if limit < 1 or count < limit:
|
|
|
|
data = self.reader.get_next_sample(force)
|
|
|
|
else:
|
|
|
|
data = None
|
2012-09-24 09:37:52 +00:00
|
|
|
count += 1
|
2012-09-12 13:18:58 +00:00
|
|
|
|
2013-08-21 12:29:23 +00:00
|
|
|
|
|
|
|
# ===============================================================
|
2012-09-12 13:18:58 +00:00
|
|
|
class SecondAggregateData:
|
2014-03-13 09:36:16 +00:00
|
|
|
""" class holds aggregate data for the second """
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2014-09-11 12:28:22 +00:00
|
|
|
def __init__(self, cumulative_item=None):
|
2012-09-12 13:18:58 +00:00
|
|
|
self.cases = {}
|
|
|
|
self.time = None
|
2012-09-20 10:16:27 +00:00
|
|
|
self.overall = SecondAggregateDataItem()
|
2014-09-11 12:28:22 +00:00
|
|
|
self.cumulative = cumulative_item
|
2012-09-12 13:18:58 +00:00
|
|
|
|
2012-11-20 10:21:18 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return "SecondAggregateData[%s][%s]" % (self.time, time.mktime(self.time.timetuple()))
|
|
|
|
|
2014-09-11 12:28:22 +00:00
|
|
|
def __getstate__(self):
|
|
|
|
return {
|
|
|
|
'cases': self.cases,
|
|
|
|
'overall': self.overall,
|
|
|
|
'cumulative': self.cumulative,
|
|
|
|
}
|
|
|
|
|
2013-01-30 13:30:59 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
class SecondAggregateDataItem:
|
2014-03-13 09:36:16 +00:00
|
|
|
""" overall and case items has this type """
|
2013-11-19 12:48:01 +00:00
|
|
|
QUANTILES = [0.25, 0.50, 0.75, 0.80, 0.85, 0.90, 0.95, 0.98, 0.99, 1.00]
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 10:16:27 +00:00
|
|
|
def __init__(self):
|
2012-09-12 13:18:58 +00:00
|
|
|
self.case = None
|
2012-09-20 10:46:55 +00:00
|
|
|
self.planned_requests = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
self.active_threads = 0
|
2012-09-20 10:46:55 +00:00
|
|
|
self.selfload = 0
|
|
|
|
self.RPS = 0
|
2012-09-12 13:18:58 +00:00
|
|
|
self.http_codes = {}
|
|
|
|
self.net_codes = {}
|
|
|
|
self.times_dist = []
|
|
|
|
self.quantiles = {}
|
2012-09-24 11:01:45 +00:00
|
|
|
self.dispersion = 0
|
2012-09-20 10:46:55 +00:00
|
|
|
self.input = 0
|
|
|
|
self.output = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
self.avg_connect_time = 0
|
|
|
|
self.avg_send_time = 0
|
|
|
|
self.avg_latency = 0
|
|
|
|
self.avg_receive_time = 0
|
|
|
|
self.avg_response_time = 0
|
2012-09-12 13:18:58 +00:00
|
|
|
|
2014-09-11 12:28:22 +00:00
|
|
|
def __getstate__(self):
|
|
|
|
return {
|
2014-10-14 14:12:07 +00:00
|
|
|
# "case": self.case,
|
2014-09-11 12:28:22 +00:00
|
|
|
"planned_requests": self.planned_requests,
|
|
|
|
"active_threads": self.active_threads,
|
2014-10-15 13:10:22 +00:00
|
|
|
# "selfload": self.selfload,
|
2014-09-11 12:28:22 +00:00
|
|
|
"RPS": self.RPS,
|
|
|
|
"http_codes": self.http_codes,
|
|
|
|
"net_codes": self.net_codes,
|
2014-09-19 16:23:05 +00:00
|
|
|
# "times_dist": self.times_dist,
|
2014-09-11 12:28:22 +00:00
|
|
|
"quantiles": self.quantiles,
|
2014-10-15 13:10:22 +00:00
|
|
|
# "dispersion": self.dispersion,
|
|
|
|
# "input": self.input,
|
|
|
|
# "output": self.output,
|
|
|
|
# "avg_connect_time": self.avg_connect_time,
|
|
|
|
# "avg_send_time": self.avg_send_time,
|
|
|
|
# "avg_latency": self.avg_latency,
|
|
|
|
# "avg_receive_time": self.avg_receive_time,
|
|
|
|
# "avg_response_time": self.avg_response_time,
|
2014-09-11 12:28:22 +00:00
|
|
|
}
|
2013-01-30 13:30:59 +00:00
|
|
|
|
2012-09-19 07:17:56 +00:00
|
|
|
class SecondAggregateDataTotalItem:
|
2014-03-13 09:36:16 +00:00
|
|
|
""" total cumulative data item """
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-19 07:17:56 +00:00
|
|
|
def __init__(self):
|
|
|
|
self.avg_connect_time = 0
|
|
|
|
self.avg_send_time = 0
|
|
|
|
self.avg_latency = 0
|
|
|
|
self.avg_receive_time = 0
|
|
|
|
self.avg_response_time = 0
|
|
|
|
self.total_count = 0
|
|
|
|
self.times_dist = {}
|
2013-03-14 12:17:56 +00:00
|
|
|
self.quantiles = {}
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2014-09-11 12:28:22 +00:00
|
|
|
def __getstate__(self):
|
|
|
|
return {
|
2014-10-15 13:10:22 +00:00
|
|
|
# "avg_connect_time": self.avg_connect_time,
|
|
|
|
# "avg_send_time": self.avg_send_time,
|
|
|
|
# "avg_latency": self.avg_latency,
|
|
|
|
# "avg_receive_time": self.avg_receive_time,
|
|
|
|
# "avg_response_time": self.avg_response_time,
|
2014-09-11 12:28:22 +00:00
|
|
|
"total_count": self.total_count,
|
2014-10-15 13:10:22 +00:00
|
|
|
#"times_dist": self.times_dist,
|
2014-09-11 12:28:22 +00:00
|
|
|
"quantiles": self.quantiles,
|
|
|
|
}
|
|
|
|
|
2012-09-19 07:17:56 +00:00
|
|
|
def add_data(self, overall_item):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" add data to total """
|
2012-09-19 07:17:56 +00:00
|
|
|
for time_item in overall_item.times_dist:
|
|
|
|
self.total_count += time_item['count']
|
2013-03-14 12:17:56 +00:00
|
|
|
timing = int(time_item['from'])
|
|
|
|
if timing in self.times_dist.keys():
|
|
|
|
self.times_dist[timing]['count'] += time_item['count']
|
2012-09-19 07:17:56 +00:00
|
|
|
else:
|
2013-03-14 12:17:56 +00:00
|
|
|
self.times_dist[timing] = time_item
|
2013-03-15 10:37:37 +00:00
|
|
|
|
|
|
|
def add_raw_data(self, times_dist):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" add data to total """
|
2013-03-15 10:37:37 +00:00
|
|
|
for time_item in times_dist:
|
|
|
|
self.total_count += 1
|
|
|
|
timing = int(time_item)
|
2013-08-21 12:29:23 +00:00
|
|
|
dist_item = self.times_dist.get(
|
|
|
|
timing, {'from': timing, 'to': timing, 'count': 0})
|
2013-03-15 10:37:37 +00:00
|
|
|
dist_item['count'] += 1
|
|
|
|
self.times_dist[timing] = dist_item
|
|
|
|
logging.debug("Total times len: %s", len(self.times_dist))
|
2012-09-19 07:17:56 +00:00
|
|
|
|
2013-03-14 12:17:56 +00:00
|
|
|
def calculate_total_quantiles(self):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" calculate total quantiles based on times dist """
|
2013-03-14 12:17:56 +00:00
|
|
|
self.quantiles = {}
|
2013-08-21 12:29:23 +00:00
|
|
|
quantiles = reversed(copy.copy(SecondAggregateDataItem.QUANTILES))
|
2013-03-14 12:17:56 +00:00
|
|
|
timings = sorted(self.times_dist.keys(), reverse=True)
|
|
|
|
level = 1.0
|
2014-03-13 09:36:16 +00:00
|
|
|
timing = 0
|
2013-03-14 12:17:56 +00:00
|
|
|
for quan in quantiles:
|
|
|
|
while level >= quan:
|
|
|
|
timing = timings.pop(0)
|
2013-08-21 12:29:23 +00:00
|
|
|
level -= float(
|
|
|
|
self.times_dist[timing]['count']) / self.total_count
|
2013-03-14 12:17:56 +00:00
|
|
|
self.quantiles[quan * 100] = self.times_dist[timing]['to']
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2013-03-14 12:17:56 +00:00
|
|
|
logging.debug("Total quantiles: %s", self.quantiles)
|
|
|
|
return self.quantiles
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-19 07:17:56 +00:00
|
|
|
|
2012-09-20 06:55:45 +00:00
|
|
|
# ===============================================================
|
2012-11-20 10:59:07 +00:00
|
|
|
|
2012-09-20 06:55:45 +00:00
|
|
|
class AbstractReader:
|
2014-03-13 09:36:16 +00:00
|
|
|
"""
|
2012-09-20 06:55:45 +00:00
|
|
|
Parent class for all source reading adapters
|
2014-03-13 09:36:16 +00:00
|
|
|
"""
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 06:55:45 +00:00
|
|
|
def __init__(self, owner):
|
2012-09-20 12:32:00 +00:00
|
|
|
self.aggregator = owner
|
2012-09-20 06:55:45 +00:00
|
|
|
self.log = logging.getLogger(__name__)
|
2012-09-20 16:21:42 +00:00
|
|
|
self.cumulative = SecondAggregateDataTotalItem()
|
2012-09-25 11:22:09 +00:00
|
|
|
self.data_queue = []
|
|
|
|
self.data_buffer = {}
|
2012-09-20 06:55:45 +00:00
|
|
|
|
|
|
|
def check_open_files(self):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" open files if necessary """
|
2012-09-20 06:55:45 +00:00
|
|
|
pass
|
2012-10-22 11:11:19 +00:00
|
|
|
|
|
|
|
def close_files(self):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" Close opened handlers to avoid fd leak """
|
2012-10-22 11:11:19 +00:00
|
|
|
pass
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 10:16:27 +00:00
|
|
|
def get_next_sample(self, force):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" read next sample from file """
|
2012-09-20 06:55:45 +00:00
|
|
|
pass
|
2012-09-20 10:16:27 +00:00
|
|
|
|
2012-09-20 10:46:55 +00:00
|
|
|
def parse_second(self, next_time, data):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" parse buffered data to aggregate item """
|
2012-09-20 10:46:55 +00:00
|
|
|
self.log.debug("Parsing second: %s", next_time)
|
2013-08-21 12:29:23 +00:00
|
|
|
result = self.get_zero_sample(
|
|
|
|
datetime.datetime.fromtimestamp(next_time))
|
2013-03-15 10:37:37 +00:00
|
|
|
time_before = time.time()
|
2012-09-20 10:46:55 +00:00
|
|
|
for item in data:
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__append_sample(result.overall, item)
|
2012-09-20 10:46:55 +00:00
|
|
|
marker = item[0]
|
|
|
|
if marker:
|
|
|
|
if not marker in result.cases.keys():
|
|
|
|
result.cases[marker] = SecondAggregateDataItem()
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__append_sample(result.cases[marker], item)
|
2012-09-20 10:46:55 +00:00
|
|
|
|
2013-03-15 10:37:37 +00:00
|
|
|
# at this phase we have raw response times in result.overall.times_dist
|
|
|
|
if self.aggregator.precise_cumulative:
|
|
|
|
self.cumulative.add_raw_data(result.overall.times_dist)
|
|
|
|
|
2013-08-21 12:29:23 +00:00
|
|
|
self.log.debug(
|
|
|
|
"Calculate aggregates for %s requests", result.overall.RPS)
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__calculate_aggregates(result.overall)
|
2012-09-20 10:46:55 +00:00
|
|
|
for case in result.cases.values():
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__calculate_aggregates(case)
|
2012-09-20 12:32:00 +00:00
|
|
|
|
2013-03-15 10:37:37 +00:00
|
|
|
if not self.aggregator.precise_cumulative:
|
|
|
|
self.cumulative.add_data(result.overall)
|
2013-03-14 12:17:56 +00:00
|
|
|
self.cumulative.calculate_total_quantiles()
|
2012-09-20 12:32:00 +00:00
|
|
|
|
2013-03-15 10:37:37 +00:00
|
|
|
spent = time.time() - time_before
|
|
|
|
if spent:
|
2013-08-21 12:29:23 +00:00
|
|
|
self.log.debug(
|
|
|
|
"Aggregating speed: %s lines/sec", int(len(data) / spent))
|
2012-09-20 10:46:55 +00:00
|
|
|
return result
|
|
|
|
|
2012-09-25 11:22:09 +00:00
|
|
|
def __calculate_aggregates(self, item):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" calculate aggregates on raw data """
|
2012-09-20 10:46:55 +00:00
|
|
|
if item.RPS:
|
2013-08-30 12:44:57 +00:00
|
|
|
item.selfload = 100 * item.selfload / item.RPS
|
2013-08-21 12:29:23 +00:00
|
|
|
item.avg_connect_time /= item.RPS
|
|
|
|
item.avg_send_time /= item.RPS
|
2012-09-20 10:46:55 +00:00
|
|
|
item.avg_latency /= item.RPS
|
|
|
|
item.avg_receive_time /= item.RPS
|
|
|
|
item.avg_response_time /= item.RPS
|
2012-09-20 11:52:59 +00:00
|
|
|
|
|
|
|
item.times_dist.sort()
|
|
|
|
count = 0.0
|
|
|
|
quantiles = copy.copy(SecondAggregateDataItem.QUANTILES)
|
2012-09-20 12:32:00 +00:00
|
|
|
times = copy.copy(self.aggregator.time_periods)
|
2012-09-20 13:14:22 +00:00
|
|
|
time_from = 0
|
2012-09-21 10:15:06 +00:00
|
|
|
time_to = times.pop(0)
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft = []
|
2013-08-21 12:29:23 +00:00
|
|
|
times_dist_item = {'from': time_from, 'to': time_to, 'count': 0}
|
2012-09-24 10:33:20 +00:00
|
|
|
deviation = 0.0
|
2012-10-23 17:24:58 +00:00
|
|
|
timing = 0
|
2012-09-20 11:52:59 +00:00
|
|
|
for timing in item.times_dist:
|
|
|
|
count += 1
|
|
|
|
if quantiles and (count / item.RPS) >= quantiles[0]:
|
|
|
|
level = quantiles.pop(0)
|
|
|
|
item.quantiles[level * 100] = timing
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 13:14:22 +00:00
|
|
|
while times and timing > time_to:
|
|
|
|
time_from = time_to
|
|
|
|
time_to = times.pop(0)
|
2012-09-20 15:45:27 +00:00
|
|
|
if times_dist_item['count']:
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft.append(times_dist_item)
|
2013-08-21 12:29:23 +00:00
|
|
|
times_dist_item = {
|
|
|
|
'from': time_from, 'to': time_to, 'count': 0}
|
|
|
|
|
2012-09-20 15:45:27 +00:00
|
|
|
times_dist_item['count'] += 1
|
2012-09-24 10:33:20 +00:00
|
|
|
deviation += math.pow(item.avg_response_time - timing, 2)
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-21 10:15:06 +00:00
|
|
|
while quantiles:
|
|
|
|
level = quantiles.pop(0)
|
|
|
|
item.quantiles[level * 100] = timing
|
2013-08-21 12:29:23 +00:00
|
|
|
|
|
|
|
if times_dist_item['count']:
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft.append(times_dist_item)
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-24 10:33:20 +00:00
|
|
|
item.dispersion = deviation / item.RPS
|
2013-08-21 12:29:23 +00:00
|
|
|
item.times_dist = times_dist_draft
|
2012-09-20 11:52:59 +00:00
|
|
|
|
2012-09-25 11:22:09 +00:00
|
|
|
def __append_sample(self, result, item):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" add single sample to aggregator buffer """
|
2013-03-26 16:09:31 +00:00
|
|
|
for check in item:
|
2012-10-03 11:38:25 +00:00
|
|
|
if check < 0:
|
2012-10-03 12:44:09 +00:00
|
|
|
self.log.error("Problem item: %s", item)
|
2012-10-03 11:38:25 +00:00
|
|
|
raise ValueError("One of the sample items has negative value")
|
2013-03-26 16:09:31 +00:00
|
|
|
|
2013-08-21 12:29:23 +00:00
|
|
|
(marker, threads, overall_rt, http_code, net_code, sent_bytes,
|
|
|
|
received_bytes, connect, send, latency, receive, accuracy) = item
|
2012-09-20 10:16:27 +00:00
|
|
|
result.case = marker
|
|
|
|
result.active_threads = threads
|
2012-09-21 13:40:29 +00:00
|
|
|
result.planned_requests = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
result.RPS += 1
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 11:52:59 +00:00
|
|
|
if http_code and http_code != '0':
|
|
|
|
if not http_code in result.http_codes.keys():
|
|
|
|
result.http_codes[http_code] = 0
|
|
|
|
result.http_codes[http_code] += 1
|
|
|
|
if not net_code in result.net_codes.keys():
|
2012-09-21 10:15:06 +00:00
|
|
|
result.net_codes[net_code] = 0
|
2012-09-20 11:52:59 +00:00
|
|
|
result.net_codes[net_code] += 1
|
2013-08-21 12:29:23 +00:00
|
|
|
|
2012-09-20 10:16:27 +00:00
|
|
|
result.input += received_bytes
|
|
|
|
result.output += sent_bytes
|
2013-08-21 12:29:23 +00:00
|
|
|
|
|
|
|
result.avg_connect_time += connect
|
|
|
|
result.avg_send_time += send
|
2012-09-20 10:16:27 +00:00
|
|
|
result.avg_latency += latency
|
|
|
|
result.avg_receive_time += receive
|
|
|
|
result.avg_response_time += overall_rt
|
|
|
|
result.selfload += accuracy
|
2012-09-20 10:46:55 +00:00
|
|
|
|
2013-08-21 12:29:23 +00:00
|
|
|
result.times_dist.append(overall_rt)
|
2012-10-17 08:25:44 +00:00
|
|
|
|
2012-09-24 11:01:45 +00:00
|
|
|
def get_zero_sample(self, date_time):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" instantiate new aggregate data item """
|
2012-09-24 11:01:45 +00:00
|
|
|
res = SecondAggregateData(self.cumulative)
|
|
|
|
res.time = date_time
|
|
|
|
return res
|
2012-09-25 11:22:09 +00:00
|
|
|
|
|
|
|
def pop_second(self):
|
2014-03-13 09:36:16 +00:00
|
|
|
""" pop from out queue new aggregate data item """
|
2013-09-11 15:11:58 +00:00
|
|
|
self.data_queue.sort()
|
2012-09-25 11:22:09 +00:00
|
|
|
next_time = self.data_queue.pop(0)
|
|
|
|
data = self.data_buffer[next_time]
|
|
|
|
del self.data_buffer[next_time]
|
|
|
|
res = self.parse_second(next_time, data)
|
|
|
|
return res
|