2012-10-03 16:49:13 +00:00
|
|
|
from tankcore import AbstractPlugin
|
2012-09-24 11:01:45 +00:00
|
|
|
import copy
|
2012-09-12 13:18:58 +00:00
|
|
|
import datetime
|
|
|
|
import logging
|
2012-09-24 10:33:20 +00:00
|
|
|
import math
|
2012-10-03 16:49:13 +00:00
|
|
|
import tankcore
|
2012-09-24 11:01:45 +00:00
|
|
|
import time
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
class AggregateResultListener:
|
|
|
|
def aggregate_second(self, second_aggregate_data):
|
|
|
|
raise TypeError("Abstract method needs to be overridden")
|
|
|
|
|
|
|
|
|
|
|
|
class AggregatorPlugin(AbstractPlugin):
|
|
|
|
|
2012-09-26 14:02:55 +00:00
|
|
|
default_time_periods = "1ms 2 3 4 5 6 7 8 9 10 20 30 40 50 60 70 80 90 100 150 200 250 300 350 400 450 500 600 650 700 750 800 850 900 950 1s 1500 2s 2500 3s 3500 4s 4500 5s 5500 6s 6500 7s 7500 8s 8500 9s 9500 10s 11s"
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
SECTION = 'aggregator'
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_key():
|
|
|
|
return __file__
|
|
|
|
|
|
|
|
def __init__(self, core):
|
2012-09-13 10:19:33 +00:00
|
|
|
AbstractPlugin.__init__(self, core)
|
2012-09-12 13:18:58 +00:00
|
|
|
self.process = None
|
|
|
|
self.second_data_listeners = []
|
|
|
|
self.preproc_out_offset = 0
|
|
|
|
self.buffer = []
|
|
|
|
self.second_data_draft = []
|
|
|
|
self.preproc_out_filename = None
|
2012-09-19 07:17:56 +00:00
|
|
|
self.cumulative_data = SecondAggregateDataTotalItem()
|
2012-09-20 06:55:45 +00:00
|
|
|
self.reader = None
|
2012-10-03 16:49:13 +00:00
|
|
|
self.time_periods = [ tankcore.expand_to_milliseconds(x) for x in self.default_time_periods.split(' ') ]
|
2012-09-24 11:01:45 +00:00
|
|
|
self.last_sample_time = 0
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
def configure(self):
|
2012-09-13 10:19:33 +00:00
|
|
|
periods = self.get_option("time_periods", self.default_time_periods).split(" ")
|
2012-10-03 16:49:13 +00:00
|
|
|
self.time_periods = [ tankcore.expand_to_milliseconds(x) for x in periods ]
|
2012-09-20 13:14:22 +00:00
|
|
|
self.core.set_option(self.SECTION, "time_periods", " ".join([ str(x) for x in periods ]))
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
def start_test(self):
|
2012-09-20 06:55:45 +00:00
|
|
|
if not self.reader:
|
|
|
|
self.log.warning("No one set reader for aggregator yet")
|
|
|
|
|
2012-10-18 10:46:50 +00:00
|
|
|
def is_test_finished(self):
|
2012-10-22 11:11:19 +00:00
|
|
|
# read up to 5 samples in single pass
|
2012-10-18 10:46:50 +00:00
|
|
|
self.__read_samples(5)
|
|
|
|
return -1
|
|
|
|
|
|
|
|
def end_test(self, retcode):
|
|
|
|
self.__read_samples(force=True)
|
2012-10-22 11:41:59 +00:00
|
|
|
if self.reader:
|
|
|
|
self.reader.close_files()
|
2012-10-18 10:46:50 +00:00
|
|
|
return retcode
|
|
|
|
|
|
|
|
def add_result_listener(self, listener):
|
|
|
|
self.second_data_listeners += [listener]
|
|
|
|
|
|
|
|
def __notify_listeners(self, data):
|
2012-10-23 13:59:51 +00:00
|
|
|
self.log.debug("Notifying listeners about second: %s , %s/%s req/responses", data.time, data.overall.planned_requests, data.overall.RPS)
|
2012-10-18 10:46:50 +00:00
|
|
|
for listener in self.second_data_listeners:
|
|
|
|
listener.aggregate_second(data)
|
|
|
|
|
|
|
|
def get_timeout(self):
|
|
|
|
return self.time_periods[-1:][0]
|
2012-09-24 11:01:45 +00:00
|
|
|
|
|
|
|
def __generate_zero_samples(self, data):
|
|
|
|
if not data:
|
|
|
|
return
|
|
|
|
while self.last_sample_time and int(time.mktime(data.time.timetuple())) - self.last_sample_time > 1:
|
|
|
|
self.last_sample_time += 1
|
2012-09-24 13:19:44 +00:00
|
|
|
self.log.debug("Adding zero sample: %s", self.last_sample_time)
|
2012-09-24 11:01:45 +00:00
|
|
|
zero = self.reader.get_zero_sample(datetime.datetime.fromtimestamp(self.last_sample_time))
|
|
|
|
self.__notify_listeners(zero)
|
|
|
|
self.last_sample_time = int(time.mktime(data.time.timetuple()))
|
|
|
|
|
|
|
|
|
2012-09-23 13:21:13 +00:00
|
|
|
def __read_samples(self, limit=0, force=False):
|
2012-09-20 06:55:45 +00:00
|
|
|
if self.reader:
|
|
|
|
self.reader.check_open_files()
|
2012-09-20 10:16:27 +00:00
|
|
|
data = self.reader.get_next_sample(force)
|
2012-09-20 06:55:45 +00:00
|
|
|
count = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
while data and (limit < 1 or count < limit):
|
2012-09-24 11:01:45 +00:00
|
|
|
self.__generate_zero_samples(data)
|
2012-09-23 13:21:13 +00:00
|
|
|
self.__notify_listeners(data)
|
2012-09-20 10:16:27 +00:00
|
|
|
data = self.reader.get_next_sample(force)
|
2012-09-24 09:37:52 +00:00
|
|
|
count += 1
|
2012-09-20 10:16:27 +00:00
|
|
|
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
class SecondAggregateData:
|
2012-09-20 15:45:27 +00:00
|
|
|
def __init__(self, cimulative_item=None):
|
2012-09-12 13:18:58 +00:00
|
|
|
self.cases = {}
|
|
|
|
self.time = None
|
|
|
|
# @type self.overall: SecondAggregateDataItem
|
2012-09-20 10:16:27 +00:00
|
|
|
self.overall = SecondAggregateDataItem()
|
2012-09-19 07:17:56 +00:00
|
|
|
self.cumulative = cimulative_item
|
2012-09-12 13:18:58 +00:00
|
|
|
|
|
|
|
class SecondAggregateDataItem:
|
2012-09-20 11:52:59 +00:00
|
|
|
QUANTILES = [0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.98, 0.99, 1.00]
|
2012-09-20 10:16:27 +00:00
|
|
|
def __init__(self):
|
2012-09-12 13:18:58 +00:00
|
|
|
self.log = logging.getLogger(__name__)
|
|
|
|
self.case = None
|
2012-09-20 10:46:55 +00:00
|
|
|
self.planned_requests = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
self.active_threads = 0
|
2012-09-20 10:46:55 +00:00
|
|
|
self.selfload = 0
|
|
|
|
self.RPS = 0
|
2012-09-12 13:18:58 +00:00
|
|
|
self.http_codes = {}
|
|
|
|
self.net_codes = {}
|
|
|
|
self.times_dist = []
|
|
|
|
self.quantiles = {}
|
2012-09-24 11:01:45 +00:00
|
|
|
self.dispersion = 0
|
2012-09-20 10:46:55 +00:00
|
|
|
self.input = 0
|
|
|
|
self.output = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
self.avg_connect_time = 0
|
|
|
|
self.avg_send_time = 0
|
|
|
|
self.avg_latency = 0
|
|
|
|
self.avg_receive_time = 0
|
|
|
|
self.avg_response_time = 0
|
2012-09-12 13:18:58 +00:00
|
|
|
|
2012-09-19 07:17:56 +00:00
|
|
|
class SecondAggregateDataTotalItem:
|
|
|
|
def __init__(self):
|
|
|
|
self.avg_connect_time = 0
|
|
|
|
self.avg_send_time = 0
|
|
|
|
self.avg_latency = 0
|
|
|
|
self.avg_receive_time = 0
|
|
|
|
self.avg_response_time = 0
|
|
|
|
self.total_count = 0
|
|
|
|
self.times_dist = {}
|
|
|
|
|
|
|
|
def add_data(self, overall_item):
|
|
|
|
for time_item in overall_item.times_dist:
|
|
|
|
self.total_count += time_item['count']
|
|
|
|
if time_item['from'] in self.times_dist.keys():
|
|
|
|
self.times_dist[time_item['from']]['count'] += time_item['count']
|
|
|
|
else:
|
|
|
|
self.times_dist[time_item['from']] = time_item
|
|
|
|
|
|
|
|
|
|
|
|
|
2012-09-20 06:55:45 +00:00
|
|
|
# ===============================================================
|
|
|
|
class AbstractReader:
|
|
|
|
'''
|
|
|
|
Parent class for all source reading adapters
|
|
|
|
'''
|
|
|
|
def __init__(self, owner):
|
2012-09-20 12:32:00 +00:00
|
|
|
self.aggregator = owner
|
2012-09-20 06:55:45 +00:00
|
|
|
self.log = logging.getLogger(__name__)
|
2012-09-20 16:21:42 +00:00
|
|
|
self.cumulative = SecondAggregateDataTotalItem()
|
2012-09-25 11:22:09 +00:00
|
|
|
self.data_queue = []
|
|
|
|
self.data_buffer = {}
|
2012-09-20 06:55:45 +00:00
|
|
|
|
|
|
|
def check_open_files(self):
|
|
|
|
pass
|
2012-10-22 11:11:19 +00:00
|
|
|
|
|
|
|
def close_files(self):
|
|
|
|
'''
|
|
|
|
Close opened handlers to avoid fd leak
|
|
|
|
'''
|
|
|
|
pass
|
|
|
|
|
2012-09-20 10:16:27 +00:00
|
|
|
def get_next_sample(self, force):
|
2012-09-20 06:55:45 +00:00
|
|
|
pass
|
2012-09-20 10:16:27 +00:00
|
|
|
|
2012-09-20 10:46:55 +00:00
|
|
|
def parse_second(self, next_time, data):
|
|
|
|
self.log.debug("Parsing second: %s", next_time)
|
2012-09-24 11:01:45 +00:00
|
|
|
result = self.get_zero_sample(datetime.datetime.fromtimestamp(next_time))
|
2012-09-20 10:46:55 +00:00
|
|
|
for item in data:
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__append_sample(result.overall, item)
|
2012-09-20 10:46:55 +00:00
|
|
|
marker = item[0]
|
|
|
|
if marker:
|
|
|
|
if not marker in result.cases.keys():
|
|
|
|
result.cases[marker] = SecondAggregateDataItem()
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__append_sample(result.cases[marker], item)
|
2012-09-20 10:46:55 +00:00
|
|
|
|
2012-10-17 08:25:44 +00:00
|
|
|
self.log.debug("Calculate aggregates for %s requests", result.overall.RPS)
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__calculate_aggregates(result.overall)
|
2012-09-20 10:46:55 +00:00
|
|
|
for case in result.cases.values():
|
2012-09-25 11:22:09 +00:00
|
|
|
self.__calculate_aggregates(case)
|
2012-09-20 12:32:00 +00:00
|
|
|
|
|
|
|
self.cumulative.add_data(result.overall)
|
|
|
|
|
2012-09-20 10:46:55 +00:00
|
|
|
return result
|
|
|
|
|
|
|
|
|
2012-09-25 11:22:09 +00:00
|
|
|
def __calculate_aggregates(self, item):
|
2012-10-01 12:24:53 +00:00
|
|
|
# TODO: 2 make total quantiles more precise
|
2012-09-20 10:46:55 +00:00
|
|
|
if item.RPS:
|
2012-09-20 15:45:27 +00:00
|
|
|
if item.avg_response_time:
|
2012-09-21 08:41:02 +00:00
|
|
|
item.selfload = 100 * item.selfload / item.RPS
|
2012-09-20 10:46:55 +00:00
|
|
|
item.avg_connect_time /= item.RPS
|
|
|
|
item.avg_send_time /= item.RPS
|
|
|
|
item.avg_latency /= item.RPS
|
|
|
|
item.avg_receive_time /= item.RPS
|
|
|
|
item.avg_response_time /= item.RPS
|
2012-09-20 11:52:59 +00:00
|
|
|
|
|
|
|
item.times_dist.sort()
|
|
|
|
count = 0.0
|
|
|
|
quantiles = copy.copy(SecondAggregateDataItem.QUANTILES)
|
2012-09-20 12:32:00 +00:00
|
|
|
times = copy.copy(self.aggregator.time_periods)
|
2012-09-20 13:14:22 +00:00
|
|
|
time_from = 0
|
2012-09-21 10:15:06 +00:00
|
|
|
time_to = times.pop(0)
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft = []
|
2012-09-20 15:45:27 +00:00
|
|
|
times_dist_item = {'from': time_from, 'to': time_to, 'count':0}
|
2012-09-24 10:33:20 +00:00
|
|
|
deviation = 0.0
|
2012-09-20 11:52:59 +00:00
|
|
|
for timing in item.times_dist:
|
|
|
|
count += 1
|
|
|
|
if quantiles and (count / item.RPS) >= quantiles[0]:
|
|
|
|
level = quantiles.pop(0)
|
|
|
|
item.quantiles[level * 100] = timing
|
2012-09-20 13:14:22 +00:00
|
|
|
|
|
|
|
while times and timing > time_to:
|
|
|
|
time_from = time_to
|
|
|
|
time_to = times.pop(0)
|
2012-09-20 15:45:27 +00:00
|
|
|
if times_dist_item['count']:
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft.append(times_dist_item)
|
|
|
|
times_dist_item = {'from': time_from, 'to': time_to, 'count':0}
|
|
|
|
|
2012-09-20 15:45:27 +00:00
|
|
|
times_dist_item['count'] += 1
|
2012-09-24 10:33:20 +00:00
|
|
|
deviation += math.pow(item.avg_response_time - timing, 2)
|
2012-09-20 15:45:27 +00:00
|
|
|
|
2012-09-21 10:15:06 +00:00
|
|
|
while quantiles:
|
|
|
|
level = quantiles.pop(0)
|
|
|
|
item.quantiles[level * 100] = timing
|
|
|
|
|
2012-09-20 15:45:27 +00:00
|
|
|
|
|
|
|
if times_dist_item['count']:
|
2012-09-20 13:14:22 +00:00
|
|
|
times_dist_draft.append(times_dist_item)
|
2012-09-20 11:52:59 +00:00
|
|
|
|
2012-09-24 10:33:20 +00:00
|
|
|
item.dispersion = deviation / item.RPS
|
2012-09-20 13:14:22 +00:00
|
|
|
item.times_dist = times_dist_draft
|
2012-09-20 11:52:59 +00:00
|
|
|
|
2012-09-20 10:46:55 +00:00
|
|
|
|
2012-09-25 11:22:09 +00:00
|
|
|
def __append_sample(self, result, item):
|
2012-09-20 10:16:27 +00:00
|
|
|
(marker, threads, overall_rt, http_code, net_code, sent_bytes, received_bytes, connect, send, latency, receive, accuracy) = item
|
2012-10-03 11:38:25 +00:00
|
|
|
for check in [threads, overall_rt, sent_bytes, received_bytes, connect, send, latency, receive, accuracy]:
|
|
|
|
if check < 0:
|
2012-10-03 12:44:09 +00:00
|
|
|
self.log.error("Problem item: %s", item)
|
2012-10-03 11:38:25 +00:00
|
|
|
raise ValueError("One of the sample items has negative value")
|
2012-09-20 10:16:27 +00:00
|
|
|
result.case = marker
|
|
|
|
result.active_threads = threads
|
2012-09-21 13:40:29 +00:00
|
|
|
result.planned_requests = 0
|
2012-09-20 10:16:27 +00:00
|
|
|
result.RPS += 1
|
|
|
|
|
2012-09-20 11:52:59 +00:00
|
|
|
if http_code and http_code != '0':
|
|
|
|
if not http_code in result.http_codes.keys():
|
|
|
|
result.http_codes[http_code] = 0
|
|
|
|
result.http_codes[http_code] += 1
|
|
|
|
if not net_code in result.net_codes.keys():
|
2012-09-21 10:15:06 +00:00
|
|
|
result.net_codes[net_code] = 0
|
2012-09-20 11:52:59 +00:00
|
|
|
result.net_codes[net_code] += 1
|
|
|
|
|
2012-09-20 10:16:27 +00:00
|
|
|
result.input += received_bytes
|
|
|
|
result.output += sent_bytes
|
|
|
|
|
|
|
|
result.avg_connect_time += connect
|
|
|
|
result.avg_send_time += send
|
|
|
|
result.avg_latency += latency
|
|
|
|
result.avg_receive_time += receive
|
|
|
|
result.avg_response_time += overall_rt
|
|
|
|
result.selfload += accuracy
|
|
|
|
|
2012-09-20 11:52:59 +00:00
|
|
|
result.times_dist.append(overall_rt)
|
2012-09-20 10:46:55 +00:00
|
|
|
|
2012-10-17 08:25:44 +00:00
|
|
|
|
2012-09-24 11:01:45 +00:00
|
|
|
def get_zero_sample(self, date_time):
|
|
|
|
res = SecondAggregateData(self.cumulative)
|
|
|
|
res.time = date_time
|
|
|
|
return res
|
2012-10-17 08:25:44 +00:00
|
|
|
|
2012-09-25 11:22:09 +00:00
|
|
|
|
|
|
|
def pop_second(self):
|
|
|
|
next_time = self.data_queue.pop(0)
|
|
|
|
data = self.data_buffer[next_time]
|
|
|
|
del self.data_buffer[next_time]
|
|
|
|
res = self.parse_second(next_time, data)
|
|
|
|
return res
|