yandex-tank/Tank/Plugins/Aggregator.py

407 lines
14 KiB
Python
Raw Normal View History

2014-03-13 09:36:16 +00:00
""" Core module to calculate aggregate data """
2012-09-24 11:01:45 +00:00
import copy
2012-09-12 13:18:58 +00:00
import datetime
import logging
2012-09-24 10:33:20 +00:00
import math
2012-09-24 11:01:45 +00:00
import time
2012-09-12 13:18:58 +00:00
2014-03-13 09:36:16 +00:00
from tankcore import AbstractPlugin
import tankcore
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
class AggregateResultListener:
2014-03-13 09:36:16 +00:00
""" listener to be notified about aggregate data """
2013-08-21 12:29:23 +00:00
2014-03-13 09:36:16 +00:00
def __init__(self):
pass
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
def aggregate_second(self, second_aggregate_data):
2014-03-13 09:36:16 +00:00
""" notification about new aggregate data """
raise NotImplementedError("Abstract method needs to be overridden")
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
class AggregatorPlugin(AbstractPlugin):
2014-03-13 09:36:16 +00:00
""" Plugin that manages aggregation """
default_time_periods = "1ms 2 3 4 5 6 7 8 9 10 20 30 40 50 60 70 80 90 100 " \
"150 200 250 300 350 400 450 500 600 650 700 750 800 850 900 950 1s " \
"1500 2s 2500 3s 3500 4s 4500 5s 5500 6s 6500 7s 7500 8s 8500 9s 9500 10s 11s"
2012-09-12 13:18:58 +00:00
SECTION = 'aggregator'
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
@staticmethod
def get_key():
return __file__
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
def __init__(self, core):
2012-09-13 10:19:33 +00:00
AbstractPlugin.__init__(self, core)
2012-09-12 13:18:58 +00:00
self.process = None
self.second_data_listeners = []
self.preproc_out_offset = 0
self.buffer = []
self.second_data_draft = []
self.preproc_out_filename = None
2012-09-19 07:17:56 +00:00
self.cumulative_data = SecondAggregateDataTotalItem()
2012-09-20 06:55:45 +00:00
self.reader = None
2013-08-21 12:29:23 +00:00
self.time_periods = [tankcore.expand_to_milliseconds(x)
for x in self.default_time_periods.split(' ')]
2012-09-24 11:01:45 +00:00
self.last_sample_time = 0
2013-03-15 10:37:37 +00:00
self.precise_cumulative = 1
2013-08-21 12:29:23 +00:00
2013-03-22 13:55:13 +00:00
def get_available_options(self):
return ["time_periods", "precise_cumulative"]
2013-08-21 12:29:23 +00:00
2012-09-12 13:18:58 +00:00
def configure(self):
2013-08-21 12:29:23 +00:00
periods = self.get_option(
"time_periods", self.default_time_periods).split(" ")
self.time_periods = [
tankcore.expand_to_milliseconds(x) for x in periods]
self.core.set_option(
self.SECTION, "time_periods", " ".join([str(x) for x in periods]))
self.precise_cumulative = int(
self.get_option("precise_cumulative", '1'))
2012-09-12 13:18:58 +00:00
def start_test(self):
2012-09-20 06:55:45 +00:00
if not self.reader:
2013-03-14 12:17:56 +00:00
self.log.warning("No one had set reader for aggregate data yet...")
2013-08-21 12:29:23 +00:00
2012-10-18 10:46:50 +00:00
def is_test_finished(self):
2013-02-21 12:57:46 +00:00
# read up to 2 samples in single pass
2013-08-21 12:29:23 +00:00
self.__read_samples(2)
2012-10-18 10:46:50 +00:00
return -1
def end_test(self, retcode):
self.__read_samples(force=True)
2012-10-22 11:41:59 +00:00
if self.reader:
self.reader.close_files()
2013-08-21 12:29:23 +00:00
return retcode
2012-10-18 10:46:50 +00:00
def add_result_listener(self, listener):
2014-03-13 09:36:16 +00:00
""" add object to data listeners """
2013-03-14 12:17:56 +00:00
self.second_data_listeners.append(listener)
2013-08-21 12:29:23 +00:00
2012-10-18 10:46:50 +00:00
def __notify_listeners(self, data):
2014-03-13 09:36:16 +00:00
""" notify all listeners about aggregate data """
2013-08-21 12:29:23 +00:00
self.log.debug(
"Notifying listeners about second: %s , %s/%s req/responses",
data.time, data.overall.planned_requests, data.overall.RPS)
2012-10-18 10:46:50 +00:00
for listener in self.second_data_listeners:
listener.aggregate_second(data)
2013-08-21 12:29:23 +00:00
2012-10-18 10:46:50 +00:00
def get_timeout(self):
2014-03-13 09:36:16 +00:00
""" get timeout based on time_periods last val """
2012-10-18 10:46:50 +00:00
return self.time_periods[-1:][0]
2012-09-24 11:01:45 +00:00
def __generate_zero_samples(self, data):
2014-03-13 09:36:16 +00:00
""" fill timeline gaps with zero samples """
2012-09-24 11:01:45 +00:00
if not data:
2013-08-21 12:29:23 +00:00
return
2012-09-24 11:01:45 +00:00
while self.last_sample_time and int(time.mktime(data.time.timetuple())) - self.last_sample_time > 1:
self.last_sample_time += 1
2012-11-20 10:59:07 +00:00
self.log.warning("Adding zero sample: %s", self.last_sample_time)
2013-08-21 12:29:23 +00:00
zero = self.reader.get_zero_sample(
datetime.datetime.fromtimestamp(self.last_sample_time))
2012-09-24 11:01:45 +00:00
self.__notify_listeners(zero)
self.last_sample_time = int(time.mktime(data.time.timetuple()))
2013-08-21 12:29:23 +00:00
2012-09-23 13:21:13 +00:00
def __read_samples(self, limit=0, force=False):
2014-03-13 09:36:16 +00:00
""" call reader object to read next sample set """
2012-09-20 06:55:45 +00:00
if self.reader:
self.reader.check_open_files()
2012-09-20 10:16:27 +00:00
data = self.reader.get_next_sample(force)
2012-09-20 06:55:45 +00:00
count = 0
2012-11-20 10:59:07 +00:00
while data:
2012-11-20 10:21:18 +00:00
self.last_sample_time = int(time.mktime(data.time.timetuple()))
2012-09-24 11:01:45 +00:00
self.__generate_zero_samples(data)
2012-09-23 13:21:13 +00:00
self.__notify_listeners(data)
2012-11-20 10:59:07 +00:00
if limit < 1 or count < limit:
data = self.reader.get_next_sample(force)
else:
data = None
count += 1
2012-09-12 13:18:58 +00:00
2013-08-21 12:29:23 +00:00
# ===============================================================
2012-09-12 13:18:58 +00:00
class SecondAggregateData:
2014-03-13 09:36:16 +00:00
""" class holds aggregate data for the second """
2013-08-21 12:29:23 +00:00
2014-09-11 12:28:22 +00:00
def __init__(self, cumulative_item=None):
2012-09-12 13:18:58 +00:00
self.cases = {}
self.time = None
2012-09-20 10:16:27 +00:00
self.overall = SecondAggregateDataItem()
2014-09-11 12:28:22 +00:00
self.cumulative = cumulative_item
2012-09-12 13:18:58 +00:00
2012-11-20 10:21:18 +00:00
def __repr__(self):
return "SecondAggregateData[%s][%s]" % (self.time, time.mktime(self.time.timetuple()))
2014-09-11 12:28:22 +00:00
def __getstate__(self):
return {
'cases': self.cases,
'overall': self.overall,
'cumulative': self.cumulative,
}
2013-01-30 13:30:59 +00:00
2012-09-12 13:18:58 +00:00
class SecondAggregateDataItem:
2014-03-13 09:36:16 +00:00
""" overall and case items has this type """
2013-11-19 12:48:01 +00:00
QUANTILES = [0.25, 0.50, 0.75, 0.80, 0.85, 0.90, 0.95, 0.98, 0.99, 1.00]
2013-08-21 12:29:23 +00:00
2012-09-20 10:16:27 +00:00
def __init__(self):
2012-09-12 13:18:58 +00:00
self.case = None
self.planned_requests = 0
2012-09-20 10:16:27 +00:00
self.active_threads = 0
self.selfload = 0
self.RPS = 0
2012-09-12 13:18:58 +00:00
self.http_codes = {}
self.net_codes = {}
self.times_dist = []
self.quantiles = {}
2012-09-24 11:01:45 +00:00
self.dispersion = 0
self.input = 0
self.output = 0
2012-09-20 10:16:27 +00:00
self.avg_connect_time = 0
self.avg_send_time = 0
self.avg_latency = 0
self.avg_receive_time = 0
self.avg_response_time = 0
2012-09-12 13:18:58 +00:00
2014-09-11 12:28:22 +00:00
def __getstate__(self):
return {
"case": self.case,
"planned_requests": self.planned_requests,
"active_threads": self.active_threads,
"selfload": self.selfload,
"RPS": self.RPS,
"http_codes": self.http_codes,
"net_codes": self.net_codes,
"times_dist": self.times_dist,
"quantiles": self.quantiles,
"dispersion": self.dispersion,
"input": self.input,
"output": self.output,
"avg_connect_time": self.avg_connect_time,
"avg_send_time": self.avg_send_time,
"avg_latency": self.avg_latency,
"avg_receive_time": self.avg_receive_time,
"avg_response_time": self.avg_response_time,
}
2013-01-30 13:30:59 +00:00
2012-09-19 07:17:56 +00:00
class SecondAggregateDataTotalItem:
2014-03-13 09:36:16 +00:00
""" total cumulative data item """
2013-08-21 12:29:23 +00:00
2012-09-19 07:17:56 +00:00
def __init__(self):
self.avg_connect_time = 0
self.avg_send_time = 0
self.avg_latency = 0
self.avg_receive_time = 0
self.avg_response_time = 0
self.total_count = 0
self.times_dist = {}
2013-03-14 12:17:56 +00:00
self.quantiles = {}
2013-08-21 12:29:23 +00:00
2014-09-11 12:28:22 +00:00
def __getstate__(self):
return {
"avg_connect_time": self.avg_connect_time,
"avg_send_time": self.avg_send_time,
"avg_latency": self.avg_latency,
"avg_receive_time": self.avg_receive_time,
"avg_response_time": self.avg_response_time,
"total_count": self.total_count,
"times_dist": self.times_dist,
"quantiles": self.quantiles,
}
2012-09-19 07:17:56 +00:00
def add_data(self, overall_item):
2014-03-13 09:36:16 +00:00
""" add data to total """
2012-09-19 07:17:56 +00:00
for time_item in overall_item.times_dist:
self.total_count += time_item['count']
2013-03-14 12:17:56 +00:00
timing = int(time_item['from'])
if timing in self.times_dist.keys():
self.times_dist[timing]['count'] += time_item['count']
2012-09-19 07:17:56 +00:00
else:
2013-03-14 12:17:56 +00:00
self.times_dist[timing] = time_item
2013-03-15 10:37:37 +00:00
def add_raw_data(self, times_dist):
2014-03-13 09:36:16 +00:00
""" add data to total """
2013-03-15 10:37:37 +00:00
for time_item in times_dist:
self.total_count += 1
timing = int(time_item)
2013-08-21 12:29:23 +00:00
dist_item = self.times_dist.get(
timing, {'from': timing, 'to': timing, 'count': 0})
2013-03-15 10:37:37 +00:00
dist_item['count'] += 1
self.times_dist[timing] = dist_item
logging.debug("Total times len: %s", len(self.times_dist))
2012-09-19 07:17:56 +00:00
2013-03-14 12:17:56 +00:00
def calculate_total_quantiles(self):
2014-03-13 09:36:16 +00:00
""" calculate total quantiles based on times dist """
2013-03-14 12:17:56 +00:00
self.quantiles = {}
2013-08-21 12:29:23 +00:00
quantiles = reversed(copy.copy(SecondAggregateDataItem.QUANTILES))
2013-03-14 12:17:56 +00:00
timings = sorted(self.times_dist.keys(), reverse=True)
level = 1.0
2014-03-13 09:36:16 +00:00
timing = 0
2013-03-14 12:17:56 +00:00
for quan in quantiles:
while level >= quan:
timing = timings.pop(0)
2013-08-21 12:29:23 +00:00
level -= float(
self.times_dist[timing]['count']) / self.total_count
2013-03-14 12:17:56 +00:00
self.quantiles[quan * 100] = self.times_dist[timing]['to']
2013-08-21 12:29:23 +00:00
2013-03-14 12:17:56 +00:00
logging.debug("Total quantiles: %s", self.quantiles)
return self.quantiles
2013-08-21 12:29:23 +00:00
2012-09-19 07:17:56 +00:00
2012-09-20 06:55:45 +00:00
# ===============================================================
2012-11-20 10:59:07 +00:00
2012-09-20 06:55:45 +00:00
class AbstractReader:
2014-03-13 09:36:16 +00:00
"""
2012-09-20 06:55:45 +00:00
Parent class for all source reading adapters
2014-03-13 09:36:16 +00:00
"""
2013-08-21 12:29:23 +00:00
2012-09-20 06:55:45 +00:00
def __init__(self, owner):
self.aggregator = owner
2012-09-20 06:55:45 +00:00
self.log = logging.getLogger(__name__)
2012-09-20 16:21:42 +00:00
self.cumulative = SecondAggregateDataTotalItem()
self.data_queue = []
self.data_buffer = {}
2012-09-20 06:55:45 +00:00
def check_open_files(self):
2014-03-13 09:36:16 +00:00
""" open files if necessary """
2012-09-20 06:55:45 +00:00
pass
2012-10-22 11:11:19 +00:00
def close_files(self):
2014-03-13 09:36:16 +00:00
""" Close opened handlers to avoid fd leak """
2012-10-22 11:11:19 +00:00
pass
2013-08-21 12:29:23 +00:00
2012-09-20 10:16:27 +00:00
def get_next_sample(self, force):
2014-03-13 09:36:16 +00:00
""" read next sample from file """
2012-09-20 06:55:45 +00:00
pass
2012-09-20 10:16:27 +00:00
def parse_second(self, next_time, data):
2014-03-13 09:36:16 +00:00
""" parse buffered data to aggregate item """
self.log.debug("Parsing second: %s", next_time)
2013-08-21 12:29:23 +00:00
result = self.get_zero_sample(
datetime.datetime.fromtimestamp(next_time))
2013-03-15 10:37:37 +00:00
time_before = time.time()
for item in data:
self.__append_sample(result.overall, item)
marker = item[0]
if marker:
if not marker in result.cases.keys():
result.cases[marker] = SecondAggregateDataItem()
self.__append_sample(result.cases[marker], item)
2013-03-15 10:37:37 +00:00
# at this phase we have raw response times in result.overall.times_dist
if self.aggregator.precise_cumulative:
self.cumulative.add_raw_data(result.overall.times_dist)
2013-08-21 12:29:23 +00:00
self.log.debug(
"Calculate aggregates for %s requests", result.overall.RPS)
self.__calculate_aggregates(result.overall)
for case in result.cases.values():
self.__calculate_aggregates(case)
2013-03-15 10:37:37 +00:00
if not self.aggregator.precise_cumulative:
self.cumulative.add_data(result.overall)
2013-03-14 12:17:56 +00:00
self.cumulative.calculate_total_quantiles()
2013-03-15 10:37:37 +00:00
spent = time.time() - time_before
if spent:
2013-08-21 12:29:23 +00:00
self.log.debug(
"Aggregating speed: %s lines/sec", int(len(data) / spent))
return result
def __calculate_aggregates(self, item):
2014-03-13 09:36:16 +00:00
""" calculate aggregates on raw data """
if item.RPS:
2013-08-30 12:44:57 +00:00
item.selfload = 100 * item.selfload / item.RPS
2013-08-21 12:29:23 +00:00
item.avg_connect_time /= item.RPS
item.avg_send_time /= item.RPS
item.avg_latency /= item.RPS
item.avg_receive_time /= item.RPS
item.avg_response_time /= item.RPS
2012-09-20 11:52:59 +00:00
item.times_dist.sort()
count = 0.0
quantiles = copy.copy(SecondAggregateDataItem.QUANTILES)
times = copy.copy(self.aggregator.time_periods)
2012-09-20 13:14:22 +00:00
time_from = 0
2012-09-21 10:15:06 +00:00
time_to = times.pop(0)
2012-09-20 13:14:22 +00:00
times_dist_draft = []
2013-08-21 12:29:23 +00:00
times_dist_item = {'from': time_from, 'to': time_to, 'count': 0}
2012-09-24 10:33:20 +00:00
deviation = 0.0
timing = 0
2012-09-20 11:52:59 +00:00
for timing in item.times_dist:
count += 1
if quantiles and (count / item.RPS) >= quantiles[0]:
level = quantiles.pop(0)
item.quantiles[level * 100] = timing
2013-08-21 12:29:23 +00:00
2012-09-20 13:14:22 +00:00
while times and timing > time_to:
time_from = time_to
time_to = times.pop(0)
2012-09-20 15:45:27 +00:00
if times_dist_item['count']:
2012-09-20 13:14:22 +00:00
times_dist_draft.append(times_dist_item)
2013-08-21 12:29:23 +00:00
times_dist_item = {
'from': time_from, 'to': time_to, 'count': 0}
2012-09-20 15:45:27 +00:00
times_dist_item['count'] += 1
2012-09-24 10:33:20 +00:00
deviation += math.pow(item.avg_response_time - timing, 2)
2013-08-21 12:29:23 +00:00
2012-09-21 10:15:06 +00:00
while quantiles:
level = quantiles.pop(0)
item.quantiles[level * 100] = timing
2013-08-21 12:29:23 +00:00
if times_dist_item['count']:
2012-09-20 13:14:22 +00:00
times_dist_draft.append(times_dist_item)
2013-08-21 12:29:23 +00:00
2012-09-24 10:33:20 +00:00
item.dispersion = deviation / item.RPS
2013-08-21 12:29:23 +00:00
item.times_dist = times_dist_draft
2012-09-20 11:52:59 +00:00
def __append_sample(self, result, item):
2014-03-13 09:36:16 +00:00
""" add single sample to aggregator buffer """
2013-03-26 16:09:31 +00:00
for check in item:
if check < 0:
2012-10-03 12:44:09 +00:00
self.log.error("Problem item: %s", item)
raise ValueError("One of the sample items has negative value")
2013-03-26 16:09:31 +00:00
2013-08-21 12:29:23 +00:00
(marker, threads, overall_rt, http_code, net_code, sent_bytes,
received_bytes, connect, send, latency, receive, accuracy) = item
2012-09-20 10:16:27 +00:00
result.case = marker
result.active_threads = threads
result.planned_requests = 0
2012-09-20 10:16:27 +00:00
result.RPS += 1
2013-08-21 12:29:23 +00:00
2012-09-20 11:52:59 +00:00
if http_code and http_code != '0':
if not http_code in result.http_codes.keys():
result.http_codes[http_code] = 0
result.http_codes[http_code] += 1
if not net_code in result.net_codes.keys():
2012-09-21 10:15:06 +00:00
result.net_codes[net_code] = 0
2012-09-20 11:52:59 +00:00
result.net_codes[net_code] += 1
2013-08-21 12:29:23 +00:00
2012-09-20 10:16:27 +00:00
result.input += received_bytes
result.output += sent_bytes
2013-08-21 12:29:23 +00:00
result.avg_connect_time += connect
result.avg_send_time += send
2012-09-20 10:16:27 +00:00
result.avg_latency += latency
result.avg_receive_time += receive
result.avg_response_time += overall_rt
result.selfload += accuracy
2013-08-21 12:29:23 +00:00
result.times_dist.append(overall_rt)
2012-10-17 08:25:44 +00:00
2012-09-24 11:01:45 +00:00
def get_zero_sample(self, date_time):
2014-03-13 09:36:16 +00:00
""" instantiate new aggregate data item """
2012-09-24 11:01:45 +00:00
res = SecondAggregateData(self.cumulative)
res.time = date_time
return res
def pop_second(self):
2014-03-13 09:36:16 +00:00
""" pop from out queue new aggregate data item """
self.data_queue.sort()
next_time = self.data_queue.pop(0)
data = self.data_buffer[next_time]
del self.data_buffer[next_time]
res = self.parse_second(next_time, data)
return res