configurable section name for plugins, uploader backend detection, tests

This commit is contained in:
Arseniy Fomchenko 2017-03-21 17:42:47 +03:00
parent 39111d6d4f
commit 436c0386da
30 changed files with 1975 additions and 45 deletions

View File

@ -13,13 +13,15 @@ class AbstractPlugin(object):
should point to __file__ magic constant """
raise TypeError("Abstract method needs to be overridden")
def __init__(self, core):
def __init__(self, core, config_section):
"""
@type core: TankCore
"""
self.log = logging.getLogger(__name__)
self.core = core
if config_section:
self.SECTION = config_section
def configure(self):
""" A stage to read config values and instantiate objects """

View File

@ -76,6 +76,14 @@ class Job(object):
self._phantom_info = info
def parse_plugin(s):
try:
plugin, config_section = s.split()
except ValueError:
plugin, config_section = s, None
return plugin, config_section
class TankCore(object):
"""
JMeter + dstat inspired :)
@ -153,7 +161,8 @@ class TankCore(object):
self.taskset_affinity = self.get_option(self.SECTION, 'affinity', '')
options = self.config.get_options(self.SECTION, self.PLUGIN_PREFIX)
for (plugin_name, plugin_path) in options:
for (plugin_name, plugin) in options:
plugin_path, config_section = parse_plugin(plugin)
if not plugin_path:
logger.debug("Seems the plugin '%s' was disabled", plugin_name)
continue
@ -197,7 +206,7 @@ class TankCore(object):
logger.warning("Patched plugin path: %s", plugin_path)
plugin = il.import_module(plugin_path)
try:
instance = getattr(plugin, 'Plugin')(self)
instance = getattr(plugin, 'Plugin')(self, config_section)
except:
logger.warning(
"Deprecated plugin classname: %s. Should be 'Plugin'",

View File

@ -130,6 +130,15 @@ class DataPoller(object):
time.sleep(self.poll_period)
def to_utc(ts):
# dst = daylight saving time
is_dst = time.daylight and time.localtime().tm_isdst > 0
offset = (time.altzone if is_dst else time.timezone)
import pdb
pdb.set_trace()
return ts + offset
class Aggregator(object):
def __init__(self, source, config, verbose_histogram):
self.worker = Worker(config, verbose_histogram)

View File

@ -44,8 +44,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.listeners = [] # [LoggingListener()]
self.reader = None
self.stats_reader = None

View File

@ -12,8 +12,8 @@ class Plugin(AbstractPlugin):
SECTION = "appium"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.appium_cmd = None
self.appium_log = None
self.appium_port = None

View File

@ -17,8 +17,8 @@ class Plugin(AbstractPlugin, AggregateResultListener):
""" Plugin that accepts criterion classes and triggers autostop """
SECTION = 'autostop'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
AggregateResultListener.__init__(self)
self.cause_criterion = None

View File

@ -16,8 +16,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.logfile = None
self.default_target = None
self.device_id = None

View File

@ -17,7 +17,7 @@ requests.packages.urllib3.disable_warnings()
class AbstractGun(AbstractPlugin):
def __init__(self, core):
super(AbstractGun, self).__init__(core)
super(AbstractGun, self).__init__(core, None)
self.results = None
@contextmanager

View File

@ -17,9 +17,9 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
''' Big Fucking Gun plugin '''
SECTION = 'bfg'
def __init__(self, core):
def __init__(self, core, config_section):
self.log = logging.getLogger(__name__)
AbstractPlugin.__init__(self, core)
AbstractPlugin.__init__(self, core, config_section)
self.gun_type = None
self.start_time = time.time()
self.stepper_wrapper = StepperWrapper(self.core, Plugin.SECTION)

View File

@ -16,8 +16,8 @@ class Plugin(AbstractPlugin, AggregateResultListener):
''' Console plugin '''
SECTION = 'console'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.screen = None
self.render_exception = None
self.console_markup = None

View File

@ -0,0 +1 @@
from .plugin import Plugin # noqa:F401

View File

@ -0,0 +1,167 @@
import ConfigParser
import argparse
import glob
import json
import logging
import os
import socket
import sys
import pwd
from StringIO import StringIO
from urlparse import urljoin
from datetime import datetime
import pkg_resources
from .client import KSHMAPIClient
from .plugin import LPJob
CONFIG_FILE = 'saved_conf.ini'
DATA_LOG = 'test_data.log'
MONITORING_LOG = 'monitoring.log'
SECTION = 'meta'
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
verbose_handler = logging.FileHandler(
datetime.now().strftime("post_loader_%Y-%m-%d_%H-%M-%S.log"), 'w')
verbose_handler.setLevel(logging.DEBUG)
logger.addHandler(verbose_handler)
def read_config(shooting_dir):
config_file = glob.glob(os.path.join(shooting_dir, CONFIG_FILE))[0]
logger.info('Config file found: %s' % config_file)
config = ConfigParser.ConfigParser()
config.read(config_file)
return config
def get_lp_config(config):
"""
looks for config file in shooting_dir,
returns config dict of section 'meta'
:rtype: dict
"""
lp_config = dict(config.items(SECTION))
for key in sorted(lp_config.keys()):
logger.debug('%s: %s' % (key, lp_config[key]))
return lp_config
def check_log(log_name):
assert os.path.exists(log_name), \
'Data log {} not found\n'.format(log_name) + \
'JsonReport plugin should be enabled when launching Yandex-tank'
def upload_data(shooting_dir, log_name, lp_job):
data_log = os.path.join(shooting_dir, log_name)
check_log(data_log)
sys.stdout.write('Uploading test data')
with open(data_log, 'r') as f:
for line in f:
data = json.loads(line.strip())
lp_job.push_test_data(data['data'], data['stats'])
sys.stdout.write('.')
sys.stdout.flush()
sys.stdout.write('\n')
def upload_monitoring(shooting_dir, log_name, lp_job):
data_log = os.path.join(shooting_dir, log_name)
check_log(data_log)
sys.stdout.write('Uploading monitoring data')
with open(data_log, 'r') as f:
for line in f:
lp_job.push_monitoring_data(line.strip())
sys.stdout.write('.')
sys.stdout.flush()
sys.stdout.write('\n')
def send_config_snapshot(config, lp_job):
config.set(SECTION, 'launched_from', 'post-loader')
output = StringIO()
config.write(output)
lp_job.send_config_snapshot(output.getvalue())
def edit_metainfo(lp_config, lp_job):
lp_job.edit_metainfo(is_regression=lp_config.get('regress'),
regression_component=lp_config.get('component'),
cmdline=lp_config.get('cmdline'),
ammo_path=lp_config.get('ammo_path'),
loop_count=lp_config.get('loop_count'))
def get_plugin_dir(shooting_dir):
DIRNAME = 'lunapark'
parent = os.path.abspath(os.path.join(shooting_dir, os.pardir))
if os.path.basename(parent) == DIRNAME:
return parent
else:
plugin_dir = os.path.join(parent, DIRNAME)
if not os.path.exists(plugin_dir):
os.makedirs(plugin_dir)
return plugin_dir
def make_symlink(shooting_dir, name):
plugin_dir = get_plugin_dir(shooting_dir)
link_name = os.path.join(plugin_dir, str(name))
os.symlink(os.path.relpath(shooting_dir, plugin_dir), link_name)
logger.info('Symlink created: {}'.format(os.path.abspath(link_name)))
def post_loader():
parser = argparse.ArgumentParser()
parser.add_argument('shooting_dir',
help='Directory containing shooting artifacts')
shooting_dir = parser.parse_args().shooting_dir
assert os.path.exists(shooting_dir), 'Directory not found'
config = read_config(shooting_dir)
lp_config = get_lp_config(config)
api_client = KSHMAPIClient(base_url=lp_config['api_address'],
user_agent='Lunapark/{}'.format(
pkg_resources.require('yatank-internal-lunapark')[0].version)
# todo: add timeouts
)
lp_job = LPJob(
client=api_client,
target_host=lp_config.get('target_host'),
target_port=lp_config.get('target_port'),
person=lp_config.get(
'operator',
'') or pwd.getpwuid(
os.geteuid())[0],
task=lp_config['task'],
name=lp_config['job_name'],
description=lp_config['job_dsc'],
tank=socket.getfqdn())
edit_metainfo(lp_config, lp_job)
upload_data(shooting_dir, DATA_LOG, lp_job)
send_config_snapshot(config, lp_job)
try:
upload_monitoring(shooting_dir, MONITORING_LOG, lp_job)
except AssertionError as e:
logger.error(e)
lp_job.close(0)
make_symlink(shooting_dir, lp_job.number)
logger.info(
'LP job created: {}'.format(
urljoin(
api_client.base_url, str(
lp_job.number))))
if __name__ == '__main__':
post_loader()

View File

@ -0,0 +1,586 @@
import datetime
import json
import time
import traceback
import urllib
from future.moves.urllib.parse import urljoin
from builtins import range
import requests
import logging
from requests.exceptions import ConnectionError, Timeout
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__) # pylint: disable=C0103
class KSHMAPIClient(object):
def __init__(
self,
base_url=None,
writer_url=None,
network_attempts=10,
api_attempts=10,
maintenance_attempts=40,
network_timeout=2,
api_timeout=5,
maintenance_timeout=15,
connection_timeout=5.0,
user_agent=None):
self.user_agent = user_agent
self.connection_timeout = connection_timeout
self._base_url = base_url
self.writer_url = writer_url
self.retry_timeout = 10
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({"User-Agent": "tank"})
self.network_attempts = network_attempts
self.network_timeout = network_timeout
self.api_attempts = api_attempts
self.api_timeout = api_timeout
self.maintenance_attempts = maintenance_attempts
self.maintenance_timeout = maintenance_timeout
@property
def base_url(self):
if not self._base_url:
raise ValueError("Base url is not set")
else:
return self._base_url
@base_url.setter
def base_url(self, url):
self._base_url = url
class UnderMaintenance(Exception):
message = "KSHM is under maintenance"
class NotAvailable(Exception):
desc = "API is not available"
def __init__(self, request, response):
self.message = "%s\n%s\n%s" % (self.desc, request, response)
super(self.__class__, self).__init__(self.message)
class StoppedFromOnline(Exception):
message = "Shooting is stopped from online"
class JobNotCreated(Exception):
pass
class NetworkError(Exception):
pass
def set_api_timeout(self, timeout):
self.api_timeout = float(timeout)
def network_timeouts(self):
return (self.network_timeout for _ in range(self.network_attempts - 1))
def api_timeouts(self):
return (self.api_timeout for _ in range(self.api_attempts - 1))
def maintenance_timeouts(self):
return (
self.maintenance_timeout for _ in range(
self.maintenance_attempts - 1))
@staticmethod
def filter_headers(headers):
boring = ['X-Content-Security-Policy', 'Content-Security-Policy',
'Strict-Transport-Security', 'X-WebKit-CSP', 'Set-Cookie',
'X-DNS-Prefetch-Control', 'X-Frame-Options', 'P3P',
'X-Content-Type-Options', 'X-Download-Options',
'Surrogate-Control']
for h in boring:
if h in headers:
del (headers[h])
return headers
def __send_single_request(self, req, trace=False):
p = self.session.prepare_request(req)
if trace:
logger.debug("Making request: %s %s Headers: %s Body: %s",
p.method, p.url, p.headers, p.body)
resp = self.session.send(p, timeout=self.connection_timeout)
if trace:
logger.debug("Got response in %ss: %s %s Headers: %s Body: %s",
resp.elapsed.total_seconds(), resp.reason,
resp.status_code, self.filter_headers(resp.headers),
resp.content)
if resp.status_code in [500, 502, 503, 504]:
raise self.NotAvailable(
request="request: %s %s\n\tHeaders: %s\n\tBody: %s" %
(p.method,
p.url,
p.headers,
p.body),
response="Got response in %ss: %s %s\n\tHeaders: %s\n\tBody: %s" %
(resp.elapsed.total_seconds(),
resp.reason,
resp.status_code,
self.filter_headers(
resp.headers),
resp.content))
elif resp.status_code == 410:
raise self.StoppedFromOnline
elif resp.status_code == 423:
raise self.UnderMaintenance
else:
resp.raise_for_status()
return resp
def __make_api_request(
self,
http_method,
path,
data=None,
response_callback=lambda x: x,
writer=False,
trace=False,
json=None):
url = urljoin(self.base_url, path)
if json:
request = requests.Request(
http_method, url, json=json, headers={
'User-Agent': self.user_agent})
else:
request = requests.Request(
http_method, url, data=data, headers={
'User-Agent': self.user_agent})
network_timeouts = self.network_timeouts()
maintenance_timeouts = self.maintenance_timeouts()
while True:
try:
response = self.__send_single_request(request, trace=trace)
return response_callback(response)
except (Timeout, ConnectionError):
logger.warn(traceback.format_exc())
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
logger.warn(
"KSHM is under maintenance, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def __make_writer_request(
self,
params=None,
json=None,
http_method="POST",
trace=False):
'''
Send request to writer service.
'''
request = requests.Request(
http_method,
self.writer_url,
params=params,
json=json,
headers={
'User-Agent': self.user_agent})
network_timeouts = self.network_timeouts()
maintenance_timeouts = self.maintenance_timeouts()
while True:
try:
response = self.__send_single_request(request, trace=trace)
return response
except (Timeout, ConnectionError):
logger.warn(traceback.format_exc())
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
logger.warn(
"Writer is under maintenance, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def __get(self, addr, trace=False):
return self.__make_api_request(
'GET',
addr,
trace=trace,
response_callback=lambda r: json.loads(
r.content.decode('utf8')))
def __post_raw(self, addr, txt_data, trace=False):
return self.__make_api_request(
'POST', addr, txt_data, lambda r: r.content, trace=trace)
def __post(self, addr, data, trace=False):
return self.__make_api_request(
'POST',
addr,
json=data,
response_callback=lambda r: r.json(),
trace=trace)
def __put(self, addr, data, trace=False):
return self.__make_api_request(
'PUT',
addr,
json=data,
response_callback=lambda r: r.text,
trace=trace)
def __patch(self, addr, data, trace=False):
return self.__make_api_request(
'PATCH',
addr,
json=data,
response_callback=lambda r: r.text,
trace=trace)
def get_task_data(self, task, trace=False):
return self.__get("api/task/" + task + "/summary.json", trace=trace)
def new_job(
self,
task,
person,
tank,
target_host,
target_port,
loadscheme=None,
detailed_time=None,
notify_list=None,
trace=False):
"""
:return: job_nr, upload_token
:rtype: tuple
"""
if not notify_list:
notify_list = []
data = {
'task': task,
'person': person,
'tank': tank,
'host': target_host,
'port': target_port,
'loadscheme': loadscheme,
'detailed_time': detailed_time,
'notify': notify_list
}
logger.debug("Job create request: %s", data)
api_timeouts = self.api_timeouts()
while True:
try:
response = self.__post(
"api/job/create.json", data, trace=trace)[0]
# [{"upload_token": "1864a3b2547d40f19b5012eb038be6f6", "job": 904317}]
return response['job'], response['upload_token']
except (self.NotAvailable, self.StoppedFromOnline) as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss..." % timeout)
time.sleep(timeout)
continue
except StopIteration:
logger.warn('Failed to create job on lunapark')
raise self.JobNotCreated(e.message)
except Exception as e:
logger.warn('Failed to create job on lunapark')
logger.warn(e.message)
raise self.JobNotCreated()
def get_job_summary(self, jobno):
result = self.__get('api/job/' + str(jobno) + '/summary.json')
return result[0]
def close_job(self, jobno, retcode, trace=False):
params = {'exitcode': str(retcode)}
result = self.__get('api/job/' + str(jobno) + '/close.json?' +
urllib.urlencode(params), trace=trace)
return result[0]['success']
def edit_job_metainfo(
self,
jobno,
job_name,
job_dsc,
instances,
ammo_path,
loop_count,
version_tested,
is_regression,
component,
cmdline,
is_starred,
tank_type=0,
trace=False):
data = {
'name': job_name,
'description': job_dsc,
'instances': str(instances),
'ammo': ammo_path,
'loop': loop_count,
'version': version_tested,
'regression': str(is_regression),
'component': component,
'tank_type': int(tank_type),
'command_line': cmdline,
'starred': int(is_starred)
}
response = self.__post(
'api/job/' +
str(jobno) +
'/edit.json',
data,
trace=trace)
return response
def set_imbalance_and_dsc(self, jobno, rps, comment):
data = {}
if rps:
data['imbalance'] = rps
if comment:
res = self.get_job_summary(jobno)
data['description'] = (res['dsc'] + "\n" + comment).strip()
response = self.__post('api/job/' + str(jobno) + '/edit.json', data)
return response
def second_data_to_push_item(self, data, stat, timestamp, overall, case):
"""
@data: SecondAggregateDataItem
"""
api_data = {
'overall': overall,
'case': case,
'net_codes': [],
'http_codes': [],
'time_intervals': [],
'trail': {
'time': str(timestamp),
'reqps': stat["metrics"]["reqps"],
'resps': data["interval_real"]["len"],
'expect': data["interval_real"]["total"] / 1000.0 /
data["interval_real"]["len"],
'disper': 0,
'self_load':
0, # TODO abs(round(100 - float(data.selfload), 2)),
'input': data["size_in"]["total"],
'output': data["size_out"]["total"],
'connect_time': data["connect_time"]["total"] / 1000.0 /
data["connect_time"]["len"],
'send_time':
data["send_time"]["total"] / \
1000.0 / data["send_time"]["len"],
'latency':
data["latency"]["total"] / 1000.0 / data["latency"]["len"],
'receive_time': data["receive_time"]["total"] / 1000.0 /
data["receive_time"]["len"],
'threads': stat["metrics"]["instances"], # TODO
}
}
for q, value in zip(data["interval_real"]["q"]["q"],
data["interval_real"]["q"]["value"]):
api_data['trail']['q' + str(q)] = value / 1000.0
for code, cnt in data["net_code"]["count"].items():
api_data['net_codes'].append({'code': int(code),
'count': int(cnt)})
for code, cnt in data["proto_code"]["count"].items():
api_data['http_codes'].append({'code': int(code),
'count': int(cnt)})
api_data['time_intervals'] = self.convert_hist(data["interval_real"][
"hist"])
return api_data
def convert_hist(self, hist):
data = hist['data']
bins = hist['bins']
return [
{
"from": 0, # deprecated
"to": b / 1000.0,
"count": count,
} for b, count in zip(bins, data)
]
def push_test_data(
self,
jobno,
upload_token,
data_item,
stat_item,
trace=False):
items = []
uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(
jobno, upload_token)
ts = data_item["ts"]
for case_name, case_data in data_item["tagged"].items():
if case_name == "":
case_name = "__NOTAG__"
if (len(case_name)) > 128:
raise RuntimeError('tag (case) name is too long: ' + case_name)
push_item = self.second_data_to_push_item(case_data, stat_item, ts,
0, case_name)
items.append(push_item)
overall = self.second_data_to_push_item(data_item["overall"],
stat_item, ts, 1, '')
items.append(overall)
api_timeouts = self.api_timeouts()
while True:
try:
if self.writer_url:
res = self.__make_writer_request(
params={
"jobno": jobno,
"upload_token": upload_token,
},
json={
"trail": items,
},
trace=trace)
logger.debug("Writer response: %s", res.text)
return res.json()["success"]
else:
res = self.__post(uri, items, trace=trace)
logger.debug("API response: %s", res)
success = int(res[0]['success'])
return success
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def push_monitoring_data(
self,
jobno,
upload_token,
send_data,
trace=False):
if send_data:
addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (
jobno, upload_token)
api_timeouts = self.api_timeouts()
while True:
try:
if self.writer_url:
res = self.__make_writer_request(
params={
"jobno": jobno,
"upload_token": upload_token,
},
json={
"monitoring": send_data,
},
trace=trace)
logger.debug("Writer response: %s", res.text)
return res.json()["success"]
else:
res = self.__post_raw(
addr, json.dumps(send_data), trace=trace)
logger.debug("API response: %s", res)
success = res == 'ok'
return success
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def send_status(self, jobno, upload_token, status, trace=False):
addr = "api/v2/jobs/%s/?upload_token=%s" % (jobno, upload_token)
status_line = status.get("core", {}).get("stage", "unknown")
if "stepper" in status:
status_line += " %s" % status["stepper"].get("progress")
api_timeouts = self.api_timeouts()
while True:
try:
self.__patch(addr, {"status": status_line}, trace=trace)
return
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def send_console(self, jobno, console, trace=False):
if trace:
logger.debug("Sending console view [%s]: %s", len(console),
console[:64])
addr = "api/job/%s/console.txt" % jobno
self.__post_raw(addr, console, trace=trace)
def is_target_locked(self, target, trace=False):
addr = "api/server/lock.json?action=check&address=%s" % target
res = self.__get(addr, trace=trace)
return res[0]
def lock_target(self, target, duration, trace=False):
addr = "api/server/lock.json?action=lock&" + \
"address=%s&duration=%s&jobno=None" % \
(target, int(duration))
res = self.__get(addr, trace=trace)
return res[0]
def unlock_target(self, target):
addr = self.get_manual_unlock_link(target)
res = self.__get(addr)
return res[0]
def get_virtual_host_info(self, hostname):
addr = "api/server/virtual_host.json?hostname=%s" % hostname
res = self.__get(addr)
try:
return res[0]
except KeyError:
raise Exception(res['error'])
@staticmethod
def get_manual_unlock_link(target):
return "api/server/lock.json?action=unlock&address=%s" % target
def send_config_snapshot(self, jobno, config, trace=False):
logger.debug("Sending config snapshot")
addr = "api/job/%s/configinfo.txt" % jobno
self.__post_raw(addr, {"configinfo": config}, trace=trace)

View File

@ -0,0 +1,821 @@
# coding=utf-8
# TODO: make the next two lines unnecessary
# pylint: disable=line-too-long
# pylint: disable=missing-docstring
import copy
import datetime
import logging
import os
import pwd
import re
import sys
import time
from StringIO import StringIO
import pkg_resources
from queue import Empty, Queue
from builtins import str
import requests
import threading
from ...common.interfaces import AbstractPlugin,\
MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
from ...common.util import expand_to_seconds
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from .client import KSHMAPIClient
logger = logging.getLogger(__name__) # pylint: disable=C0103
class BackendTypes(object):
OVERLOAD = 'OVERLOAD'
LUNAPARK = 'LUNAPARK'
@classmethod
def identify_backend(cls, section_name):
patterns = [
(r'^overload\d*$', cls.OVERLOAD),
(r'^(meta|lunapark|lp)\d*$', cls.LUNAPARK)
]
for pattern, backend_type in patterns:
if re.match(pattern, section_name):
return backend_type
else:
raise KeyError('Config section name doesn\'t match any of the patterns:\n%s' %
'\n'.join([ptrn[0] for ptrn in patterns]))
pass
class Plugin(AbstractPlugin, AggregateResultListener,
MonitoringDataListener):
"""API Client class for Yandex KSHM web service"""
SECTION = 'meta'
RC_STOP_FROM_WEB = 8
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.data_queue = Queue()
self.monitoring_queue = Queue()
self.ignore_target_lock = None
self.jobno_file = None
self.lock_target_duration = None
self.mon = None
self.regression_component = None
self.retcode = -1
self.target = None
self.task_name = ''
self.token_file = None
self.version_tested = None
self.send_status_period = 10
self._generator_info = None
self._is_telegraf = None
@staticmethod
def get_key():
return __file__
def get_available_options(self):
opts = [
"api_address",
"writer_endpoint",
"task",
"job_name",
"job_dsc",
"notify",
"ver", "component",
"regress",
"operator",
"copy_config_to",
"jobno_file",
"ignore_target_lock",
"target_lock_duration",
"lock_targets",
"jobno",
"upload_token",
'connection_timeout',
'network_attempts',
'api_attempts',
'maintenance_attempts',
'network_timeout',
'api_timeout',
'maintenance_timeout',
'strict_lock',
'send_status_period',
'log_data_requests',
'log_monitoring_requests',
'log_status_requests',
'log_other_requests',
'threads_timeout'
]
return opts
def configure(self):
self.mon = self.core.job.monitoring_plugin
self.jobno_file = self.get_option("jobno_file", '')
self.regression_component = str(self.get_option("component", ''))
ignore_all_locks = self.core.get_option(self.core.SECTION,
"ignore_locks", '0')
self.ignore_target_lock = int(self.get_option("ignore_target_lock",
ignore_all_locks))
self.lock_target_duration = expand_to_seconds(self.get_option(
"target_lock_duration", "30m"))
self.send_status_period = expand_to_seconds(
self.get_option('send_status_period', '10'))
def check_task_is_open(self, task):
TASK_TIP = 'The task should be connected to Lunapark. Open startrek task page, click "actions" -> "load testing".'
logger.debug("Check if task %s is open", task)
try:
task_data = self.lp_job.get_task_data(task)[0]
try:
task_status = task_data['status']
if task_status == 'Open':
logger.info("Task %s is ok", task)
self.task_name = str(task_data['name'])
else:
logger.info("Task %s:" % task)
logger.info(task_data)
raise RuntimeError("Task is not open")
except KeyError:
try:
error = task_data['error']
raise RuntimeError(
"Task %s error: %s\n%s" %
(task, error, TASK_TIP))
except KeyError:
raise RuntimeError(
'Unknown task data format:\n{}'.format(task_data))
except requests.exceptions.HTTPError as ex:
logger.error("Failed to check task status for '%s': %s", task, ex)
if ex.response.status_code == 404:
raise RuntimeError("Task not found: %s\n%s" % (task, TASK_TIP))
elif ex.response.status_code == 500 or ex.response.status_code == 400:
raise RuntimeError(
"Unable to check task staus, id: %s, error code: %s" %
(task, ex.response.status_code))
raise ex
@staticmethod
def search_task_from_cwd(cwd):
issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")
while cwd:
logger.debug("Checking if dir is named like JIRA issue: %s", cwd)
if issue.match(os.path.basename(cwd)):
res = re.search(issue, os.path.basename(cwd))
return res.group(1).upper()
newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))
if newdir == cwd:
break
else:
cwd = newdir
raise RuntimeError(
"task=dir requested, but no JIRA issue name in cwd: %s" %
os.getcwd())
def prepare_test(self):
task = self.core.job.task
if task == 'dir':
task = self.search_task_from_cwd(os.getcwd())
info = self.core.job.generator_plugin.get_info()
self.target = info.address
logger.info("Detected target: %s", self.target)
port = info.port
instances = info.instances
if info.ammo_file.startswith(
"http://") or info.ammo_file.startswith("https://"):
ammo_path = info.ammo_file
else:
ammo_path = os.path.realpath(info.ammo_file)
loadscheme = [] if isinstance(info.rps_schedule,
str) else info.rps_schedule
duration = int(info.duration)
if duration:
self.lock_target_duration = duration
loop_count = info.loop_count
self.lp_job = self.__get_lp_job(self.target, port, loadscheme)
self.locked_targets = self.check_and_lock_targets(strict=bool(
int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)
try:
if self.lp_job._number:
self.make_symlink(self.lp_job._number)
self.check_task_is_open(task)
else:
self.check_task_is_open(task)
self.lp_job.create()
self.make_symlink(self.lp_job.number)
except (KSHMAPIClient.JobNotCreated, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e:
logger.error(e.message)
logger.error(
'Failed to connect to Lunapark, disabling DataUploader')
self.start_test = lambda *a, **kw: None
self.post_process = lambda *a, **kw: None
self.on_aggregated_data = lambda *a, **kw: None
self.monitoring_data = lambda *a, **kw: None
return
cmdline = ' '.join(sys.argv)
self.lp_job.edit_metainfo(
instances=instances,
ammo_path=ammo_path,
loop_count=loop_count,
is_regression=self.get_option(
'regress',
'0'),
regression_component=self.regression_component,
cmdline=cmdline,
) # todo: tanktype?
self.core.job.subscribe_plugin(self)
try:
console = self.core.get_plugin_of_type(ConsolePlugin)
except KeyError:
logger.debug("Console plugin not found", exc_info=True)
console = None
if console:
console.add_info_widget(JobInfoWidget(self))
console.remote_translator = self
self.set_option('target_host', self.target)
self.set_option('target_port', port)
self.set_option('cmdline', cmdline)
self.set_option('ammo_path', ammo_path)
self.set_option('loop_count', loop_count)
self.__save_conf()
def start_test(self):
self.on_air = True
status_sender = threading.Thread(target=self.__send_status)
status_sender.daemon = True
status_sender.start()
self.status_sender = status_sender
upload = threading.Thread(target=self.__data_uploader)
upload.daemon = True
upload.start()
self.upload = upload
monitoring = threading.Thread(target=self.__monitoring_uploader)
monitoring.daemon = True
monitoring.start()
self.monitoring = monitoring
web_link = "%s%s" % (
self.lp_job.api_client.base_url, self.lp_job.number)
logger.info("Web link: %s", web_link)
self.publish("jobno", self.lp_job.number)
self.publish("web_link", web_link)
self.set_option("jobno", str(self.lp_job.number))
if self.jobno_file:
logger.debug("Saving jobno to: %s", self.jobno_file)
fdes = open(self.jobno_file, 'w')
fdes.write(str(self.lp_job.number))
fdes.close()
self.__save_conf()
def is_test_finished(self):
return self.retcode
def end_test(self, retcode):
self.on_air = False
self.monitoring_queue.put(None)
self.data_queue.put(None)
self.__save_conf()
timeout = int(self.get_option('threads_timeout', '60'))
logger.info(
'Waiting for sender threads to join for {} seconds ("meta.threads_timeout" config option)'.format(timeout))
self.monitoring.join(timeout=timeout)
if self.monitoring.isAlive():
logger.error('Monitoring thread joining timed out. Terminating.')
self.upload.join(timeout=timeout)
if self.upload.isAlive():
logger.error('Upload thread joining timed out. Terminating.')
self.unlock_targets(self.locked_targets)
return retcode
def post_process(self, rc):
try:
self.lp_job.close(rc)
except Exception: # pylint: disable=W0703
logger.warning("Failed to close job", exc_info=True)
logger.info(
"Web link: %s%s",
self.lp_job.api_client.base_url,
self.lp_job.number)
autostop = None
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
except KeyError:
logger.debug("No autostop plugin loaded", exc_info=True)
if autostop and autostop.cause_criterion:
rps = 0
if autostop.cause_criterion.cause_second:
rps = autostop.cause_criterion.cause_second[
1]["metrics"]["reqps"]
if not rps:
rps = autostop.cause_criterion.cause_second[0][
"overall"]["interval_real"]["len"]
self.lp_job.set_imbalance_and_dsc(
int(rps), autostop.cause_criterion.explain())
else:
logger.debug("No autostop cause detected")
self.__save_conf()
return rc
def on_aggregated_data(self, data, stats):
"""
@data: aggregated data
@stats: stats about gun
"""
if self.lp_job.is_alive:
self.data_queue.put((data, stats))
def monitoring_data(self, data_list):
if self.lp_job.is_alive:
if len(data_list) > 0:
if self.is_telegraf:
# telegraf
self.monitoring_queue.put(data_list)
else:
# monitoring
[self.monitoring_queue.put(data) for data in data_list]
@property
def is_telegraf(self):
if self._is_telegraf is None:
self._is_telegraf = 'Telegraf' in self.core.job.monitoring_plugin.__module__
return self._is_telegraf
def _core_with_tank_api(self):
"""
Return True if we are running under Tank API
"""
api_found = False
try:
import yandex_tank_api.worker # pylint: disable=F0401
except ImportError:
logger.debug("Attempt to import yandex_tank_api.worker failed")
else:
api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)
logger.debug("We are%s running under API server", '' if api_found else
' likely not')
return api_found
def __send_status(self):
logger.info('Status sender thread started')
lp_job = self.lp_job
while lp_job.is_alive and self.on_air:
try:
self.lp_job.send_status(self.core.status)
time.sleep(self.send_status_period)
except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable) as e:
logger.warn('Failed to send status')
logger.debug(e.message)
break
except KSHMAPIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
logger.info("Closing Status sender thread")
def __data_uploader(self):
logger.info('Data uploader thread started')
lp_job = self.lp_job
queue = self.data_queue
while lp_job.is_alive:
try:
entry = queue.get(timeout=1)
if entry is not None:
data, stats = entry
else:
logger.info("Data uploader queue returned None")
break
lp_job.push_test_data(data, stats)
except Empty:
continue
except KSHMAPIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
except Exception as e:
logger.info("Mysterious exception: %s", e)
self.retcode = 8
break
logger.info("Closing Data uploader thread")
def __monitoring_uploader(self):
logger.info('Monitoring uploader thread started')
lp_job = self.lp_job
queue = self.monitoring_queue
while lp_job.is_alive:
try:
data = queue.get(timeout=1)
if data is not None:
lp_job.push_monitoring_data(data)
else:
logger.info('Monitoring queue returned None')
break
except Empty:
continue
except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable, KSHMAPIClient.UnderMaintenance) as e:
logger.warn('Failed to push monitoring data')
logger.warn(e.message)
break
except KSHMAPIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
logger.info('Closing Monitoring uploader thread')
def __save_conf(self):
config_copy = self.core.job.config_copy
if config_copy:
self.core.config.flush(config_copy)
config = copy.copy(self.core.config.config)
try:
config_filename = self.core.job.monitoring_plugin.config
if config_filename and config_filename not in ['none', 'auto']:
with open(config_filename) as config_file:
config.set(
self.core.job.monitoring_plugin.SECTION,
"config_contents",
config_file.read())
except Exception: # pylint: disable=W0703
logger.warning("Can't get monitoring config", exc_info=True)
output = StringIO()
config.write(output)
self.lp_job.send_config_snapshot(output.getvalue())
with open(os.path.join(self.core.artifacts_dir, 'saved_conf.ini'), 'w') as f:
config.write(f)
def send_console(self, text):
try:
self.lp_job.send_console(text)
except Exception: # pylint: disable=W0703
logger.debug("Can't send console snapshot: %s", exc_info=True)
def parse_lock_targets(self):
# prepare target lock list
locks_list_cfg = self.get_option('lock_targets', 'auto').strip()
def no_target():
logging.warn("Target lock set to 'auto', but no target info available")
return ''
locks_list = (self.target or no_target() if locks_list_cfg.lower() == 'auto' else locks_list_cfg).split('\n')
targets_to_lock = [host for host in locks_list if host]
return targets_to_lock
def lock_targets(self, targets_to_lock, ignore, strict):
locked_targets = [target for target in targets_to_lock
if self.lp_job.lock_target(target, self.lock_target_duration, ignore, strict)]
return locked_targets
def unlock_targets(self, locked_targets):
logger.info("Unlocking targets: %s", locked_targets)
for target in locked_targets:
logger.info(target)
self.lp_job.api_client.unlock_target(target)
def check_and_lock_targets(self, strict, ignore):
targets_list = self.parse_lock_targets()
logger.info('Locking targets: %s', targets_list)
locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)
logger.info('Locked targets: %s', locked_targets)
return locked_targets
def make_symlink(self, name):
PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')
if not os.path.exists(PLUGIN_DIR):
os.makedirs(PLUGIN_DIR)
os.symlink(
os.path.relpath(
self.core.artifacts_dir,
PLUGIN_DIR),
os.path.join(
PLUGIN_DIR,
str(name)))
def _get_user_agent(self):
plugin_agent = 'Lunapark/{}'.format(
pkg_resources.require('yatank-internal-lunapark')[0].version)
return ' '.join((plugin_agent,
self.core.get_user_agent()))
def __get_operator(self):
try:
return self.get_option(
'operator',
'') or pwd.getpwuid(
os.geteuid())[0]
except:
logger.error(
"Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config file.")
raise
def __get_api_client(self):
return KSHMAPIClient(base_url=self.get_option('api_address'),
writer_url=self.get_option('writer_endpoint', ""),
network_attempts=int(self.get_option('network_attempts', 60)),
api_attempts=int(self.get_option('api_attempts', 60)),
maintenance_attempts=int(self.get_option('maintenance_attempts', 10)),
network_timeout=int(self.get_option('network_timeout', 10)),
api_timeout=int(self.get_option('api_timeout', 10)),
maintenance_timeout=int(self.get_option('maintenance_timeout', 60)),
connection_timeout=int(self.get_option('connection_timeout', 30)),
user_agent=self._get_user_agent())
def __get_lp_job(self, target, port, loadscheme):
api_client = self.__get_api_client()
return LPJob.from_core_job(self.core.job,
api_client,
target_host=target,
target_port=port,
number=self.get_option('jobno', ''),
token=self.get_option('upload_token', ''),
operator=self.__get_operator(),
notify_list=self.get_option("notify", '').split(' '),
loadscheme=loadscheme,
log_data_requests=bool(int(self.get_option('log_data_requests', '0'))),
log_monitoring_requests=bool(int(self.get_option('log_monitoring_requests', '0'))),
log_status_requests=bool(int(self.get_option('log_status_requests', '0'))),
log_other_requests=bool(int(self.get_option('log_other_requests', '0'))),
)
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):
# type: (Plugin) -> object
AbstractInfoWidget.__init__(self)
self.owner = sender
def get_index(self):
return 1
def render(self, screen):
template = "Author: " + screen.markup.RED + "%s" + \
screen.markup.RESET + \
"%s\n Job: %s %s\n Task: %s %s\n Web: %s%s"
data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],
self.owner.lp_job.number, self.owner.lp_job.name, self.owner.core.job.task,
# todo: task_name from api_client.get_task_data()
self.owner.core.job.task, self.owner.lp_job.api_client.base_url,
self.owner.lp_job.number)
return template % data
class LPJob(object):
def __init__(
self,
client,
target_host,
target_port,
person,
task,
name,
description,
tank,
log_data_requests=False,
log_other_requests=False,
log_status_requests=False,
log_monitoring_requests=False,
number=None,
token=None,
is_alive=True,
notify_list=None,
version=None,
detailed_time=None,
load_scheme=None):
"""
:param client: KSHMAPIClient
:param log_data_requests: bool
:param log_other_request: bool
:param log_status_requests: bool
:param log_monitoring_requests: bool
"""
assert bool(number) == bool(
token), 'Job number and upload token should come together'
self.log_other_requests = log_other_requests
self.log_data_requests = log_data_requests
self.log_status_requests = log_status_requests
self.log_monitoring_requests = log_monitoring_requests
self.name = name
self.tank = tank
self.target_host = target_host
self.target_port = target_port
self.person = person
self.task = task
self.is_alive = is_alive
self._number = number
self._token = token
self.api_client = client
self.notify_list = notify_list
self.description = description
self.version = version
self.detailed_time = detailed_time
self.load_scheme = load_scheme
self.is_finished = False
def push_test_data(self, data, stats):
if self.is_alive:
try:
self.api_client.push_test_data(
self.number, self.token, data, stats, trace=self.log_data_requests)
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError, KSHMAPIClient.UnderMaintenance):
logger.warn('Failed to push test data')
self.is_alive = False
def edit_metainfo(
self,
instances=0,
ammo_path=None,
loop_count=None,
is_regression=None,
regression_component=None,
cmdline=None,
is_starred=False,
tank_type=1):
try:
self.api_client.edit_job_metainfo(jobno=self.number,
job_name=self.name,
job_dsc=self.description,
instances=instances,
ammo_path=ammo_path,
loop_count=loop_count,
version_tested=self.version,
is_regression=is_regression,
component=regression_component,
cmdline=cmdline,
is_starred=is_starred,
tank_type=tank_type,
trace=self.log_other_requests)
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NetworkError,
KSHMAPIClient.UnderMaintenance) as e:
logger.warn('Failed to edit job metainfo on Lunapark')
logger.warn(e.message)
@property
def number(self):
if not self._number:
self.create()
return self._number
@property
def token(self):
if not self._token:
self.create()
return self._token
def close(self, rc):
return self.api_client.close_job(
self.number, rc, trace=self.log_other_requests)
def create(self):
self._number, self._token = self.api_client.new_job(task=self.task,
person=self.person,
tank=self.tank,
loadscheme=self.load_scheme,
target_host=self.target_host,
target_port=self.target_port,
detailed_time=self.detailed_time,
notify_list=self.notify_list,
trace=self.log_other_requests)
logger.info('Job created: {}'.format(self._number))
@classmethod
def from_core_job(
cls,
job,
client,
target_host,
target_port,
operator,
log_data_requests,
log_other_requests,
log_status_requests,
log_monitoring_requests,
number=None,
token=None,
is_alive=True,
notify_list=None,
loadscheme=None):
# type: (Job, KSHMAPIClient) -> LPJob
return cls(
target_host=target_host,
target_port=target_port,
number=number,
token=token,
is_alive=is_alive,
client=client,
person=operator,
task=job.task,
notify_list=notify_list,
tank=job.tank,
description=job.description,
name=job.name,
version=job.version,
load_scheme=loadscheme,
log_data_requests=log_data_requests,
log_other_requests=log_other_requests,
log_status_requests=log_status_requests,
log_monitoring_requests=log_monitoring_requests)
def send_status(self, status):
if self._number and self.is_alive:
self.api_client.send_status(
self.number,
self.token,
status,
trace=self.log_status_requests)
def get_task_data(self, task):
return self.api_client.get_task_data(
task, trace=self.log_other_requests)
def send_config_snapshot(self, config):
try:
if self._number:
self.api_client.send_config_snapshot(
self.number, config, trace=self.log_other_requests)
except Exception:
logger.debug("Can't send config snapshot: %s", exc_info=True)
def push_monitoring_data(self, data):
if self.is_alive:
self.api_client.push_monitoring_data(
self.number, self.token, data, trace=self.log_monitoring_requests)
def send_console(self, text):
return self.api_client.send_console(
self.number, text, trace=self.log_other_requests)
def lock_target(self, lock_target, lock_target_duration, ignore, strict):
while True:
try:
self.api_client.lock_target(
lock_target,
lock_target_duration,
trace=self.log_other_requests)
return True
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline) as e:
logger.info('Target is not locked due to %s', e.message)
if ignore:
logger.info('ignore_target_locks = 1')
return False
elif strict:
raise e
else:
logger.info('strict_lock = 0')
return False
except KSHMAPIClient.UnderMaintenance:
logger.info('Target is locked')
logger.info("Manual unlock link: %s%s", self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target))
if ignore:
logger.info('ignore_target_locks = 1')
return False
time.sleep(10)
continue
def set_imbalance_and_dsc(self, rps, comment):
return self.api_client.set_imbalance_and_dsc(self.number, rps, comment)
def is_target_locked(self, host, strict):
while True:
try:
return self.api_client.is_target_locked(
host, trace=self.log_other_requests)
except KSHMAPIClient.UnderMaintenance:
logger.info('Target is locked, retrying...')
continue
except (KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e:
logger.warn('Can\'t check whether target is locked\n')
if strict:
logger.warn('Stopping test due to strict_lock')
raise
else:
logger.warn('strict_lock is False, proceeding')
return {'status': 'ok'}

View File

@ -0,0 +1,304 @@
import threading
from queue import Queue
import time
import pytest
from mock import patch, call, MagicMock
from requests import ConnectionError
from yandextank.plugins.DataUploader.client import KSHMAPIClient
from yandextank.plugins.DataUploader.plugin import online_uploader, LPJob
@pytest.mark.parametrize('job_number, token', [
(42, 'fooToken'),
(314, 'tokenBar')
])
class TestOnlineUploader(object):
def test_with_job_number(self, job_number, token):
data_set = ['data%d' % i for i in range(100)]
queue = Queue()
job = LPJob(job_number, token)
with patch.object(KSHMAPIClient, 'push_data') as push_data_mock:
thread = threading.Thread(
target=online_uploader,
name='Online uploader',
args=(queue, job))
thread.daemon = True
thread.start()
for data in data_set:
if job.is_alive:
queue.put(data)
else:
break
time.sleep(1)
push_data_mock.assert_has_calls(
calls=[
call(
data,
job_number,
token) for data in data_set])
def test_without_job_number(self, job_number, token):
data_set = ['data%d' % i for i in range(100)]
queue = Queue()
job = LPJob()
with patch.object(KSHMAPIClient, 'new_job', return_value=(job_number, token)) as new_job_mock:
with patch.object(KSHMAPIClient, 'push_data') as push_data_mock:
thread = threading.Thread(
target=online_uploader,
name='Online uploader',
args=(
queue,
job))
thread.daemon = True
thread.start()
for data in data_set:
if job.is_alive:
queue.put(data)
else:
break
time.sleep(1)
new_job_mock.assert_called_once_with(*[None] * 8)
push_data_mock.assert_has_calls(
calls=[call(data, job_number, token) for data in data_set])
@pytest.mark.parametrize('job_nr, upload_token', [
('101', 'hfh39fj'),
])
class TestClient(object):
TEST_DATA = {'tagged': {'case1': {u'size_in': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'latency': {u'max': 0, u'total': 0, u'len': 4, u'min': 0}, u'interval_real': {
u'q': {'q': [50, 75, 80, 85, 90, 95, 98, 99, 100],
'value': [484467.0, 688886.75, 736398.80000000005, 783910.84999999998, 831422.90000000002,
878934.94999999995, 907442.17999999993, 916944.58999999985, 926447.0]}, u'min': 196934,
u'max': 926447, u'len': 4,
u'hist': {'data': [1, 1, 1, 1], 'bins': [197000.0, 360000.0, 610000.0, 930000.0]}, u'total': 2092315},
u'interval_event': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'receive_time': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'connect_time': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'proto_code': {u'count': {'200': 4}},
u'size_out': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'send_time': {u'max': 0, u'total': 0, u'len': 4, u'min': 0},
u'net_code': {u'count': {'0': 4}}},
'default': {u'size_in': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'latency': {u'max': 0, u'total': 0, u'len': 3, u'min': 0}, u'interval_real': {
u'q': {'q': [50, 75, 80, 85, 90, 95, 98, 99, 100],
'value': [247863.0, 419279.5, 453562.80000000005, 487846.09999999998,
522129.40000000002, 556412.69999999995, 576982.68000000005,
583839.33999999997, 590696.0]}, u'min': 128669, u'max': 590696,
u'len': 3, u'hist': {'data': [1, 1, 1], 'bins': [129000.0, 248000.0, 595000.0]},
u'total': 967228},
u'interval_event': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'receive_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'connect_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'proto_code': {u'count': {'200': 3}},
u'size_out': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'send_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'net_code': {u'count': {'0': 3}}},
'case2': {u'size_in': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'latency': {u'max': 0, u'total': 0, u'len': 3, u'min': 0}, u'interval_real': {
u'q': {'q': [50, 75, 80, 85, 90, 95, 98, 99, 100],
'value': [366638.0, 431245.0, 444166.40000000002, 457087.79999999999,
470009.20000000001, 482930.59999999998, 490683.44,
493267.71999999997, 495852.0]}, u'min': 328929, u'max': 495852,
u'len': 3, u'hist': {'data': [1, 1, 1], 'bins': [329000.0, 367000.0, 496000.0]},
u'total': 1191419},
u'interval_event': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'receive_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'connect_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'proto_code': {u'count': {'200': 3}},
u'size_out': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'send_time': {u'max': 0, u'total': 0, u'len': 3, u'min': 0},
u'net_code': {u'count': {'0': 3}}}},
'overall': {u'size_in': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'latency': {u'max': 0, u'total': 0, u'len': 10, u'min': 0}, u'interval_real': {
u'q': {'q': [50, 75, 80, 85, 90, 95, 98, 99, 100],
'value': [362936.0, 566985.0, 594496.79999999993, 603048.59999999998,
641374.69999999995, 783910.84999999963, 869432.54000000004,
897939.77000000002, 926447.0]}, u'min': 128669, u'max': 926447, u'len': 10,
u'hist': {'data': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
'bins': [129000.0, 197000.0, 248000.0, 329000.0, 360000.0, 367000.0, 496000.0,
595000.0, 610000.0, 930000.0]}, u'total': 4250962},
u'interval_event': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'receive_time': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'connect_time': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'proto_code': {u'count': {'200': 10}},
u'size_out': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'send_time': {u'max': 0, u'total': 0, u'len': 10, u'min': 0},
u'net_code': {u'count': {'0': 10}}}, 'ts': 1476377527}
TEST_STATS = {'metrics': {'instances': 0, 'reqps': 0}, 'ts': 1476446024}
def test_new_job(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
mock_response = MagicMock()
mock_response.json.return_value = [
{"upload_token": upload_token, "job": job_nr}]
send_mock.return_value = mock_response
assert client.new_job(
'LOAD-204',
'fomars',
'tank',
'target.host',
1234) == (
job_nr,
upload_token)
def test_new_job_retry_maintenance(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/',
maintenance_timeout=2)
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
bad_response.status_code = 423
good_response = MagicMock()
good_response.json.return_value = [
{"upload_token": upload_token, "job": job_nr}]
send_mock.side_effect = [bad_response, good_response]
assert client.new_job(
'LOAD-204',
'fomars',
'tank',
'target.host',
1234) == (
job_nr,
upload_token)
def test_new_job_retry_network(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
expected_response = MagicMock()
expected_response.json.return_value = [
{"upload_token": upload_token, "job": job_nr}]
send_mock.side_effect = [
ConnectionError,
ConnectionError,
expected_response]
assert client.new_job(
'LOAD-204',
'fomars',
'tank',
'target.host',
1234) == (
job_nr,
upload_token)
def test_new_job_retry_api(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
bad_response.status_code = 500
good_response = MagicMock()
good_response.json.return_value = [
{"upload_token": upload_token, "job": job_nr}]
send_mock.side_effect = [bad_response, good_response]
assert client.new_job(
'LOAD-204',
'fomars',
'tank',
'target.host',
1234) == (
job_nr,
upload_token)
def test_new_job_unavailable(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/',
api_attempts=3,
api_timeout=1)
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
bad_response.status_code = 500
good_response = MagicMock()
good_response.json.return_value = [
{"upload_token": upload_token, "job": job_nr}]
send_mock.side_effect = [
bad_response,
bad_response,
bad_response,
good_response]
with pytest.raises(KSHMAPIClient.JobNotCreated):
client.new_job(
'LOAD-204',
'fomars',
'tank',
'target.host',
1234)
def test_push_data(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
mock_response = MagicMock()
mock_response.json.return_value = [{"success": 1}]
send_mock.return_value = mock_response
assert client.push_test_data(
job_nr, self.TEST_DATA, self.TEST_STATS) == 1
def test_push_data_retry_network(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
expected_response = MagicMock()
expected_response.json.return_value = [{"success": 1}]
send_mock.side_effect = [
ConnectionError,
ConnectionError,
expected_response]
result = client.push_test_data(
job_nr, self.TEST_DATA, self.TEST_STATS)
send_mock.assert_called()
assert result == 1
def test_push_data_retry_api(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
bad_response.status_code = 500
good_response = MagicMock()
good_response.json.return_value = [{"success": 1}]
send_mock.side_effect = [bad_response, good_response]
result = client.push_test_data(
job_nr, self.TEST_DATA, self.TEST_STATS)
send_mock.assert_called()
assert result == 1
def test_push_data_api_exception(self, job_nr, upload_token):
client = KSHMAPIClient(
base_url='https://lunapark.test.yandex-team.ru/',
api_timeout=1,
api_attempts=3)
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
bad_response.status_code = 500
good_response = MagicMock()
good_response.json.return_value = [{"success": 1}]
send_mock.side_effect = [
bad_response,
bad_response,
bad_response,
good_response]
assert client.push_test_data(
job_nr, self.TEST_DATA, self.TEST_STATS) == 0

View File

@ -0,0 +1,31 @@
import pytest
from yandextank.plugins.DataUploader.plugin import BackendTypes
class TestBackendTypes(object):
@pytest.mark.parametrize('section_name, expected_type', [
('meta', BackendTypes.LUNAPARK),
('meta01', BackendTypes.LUNAPARK),
('lp', BackendTypes.LUNAPARK),
('lp01', BackendTypes.LUNAPARK),
('lunapark', BackendTypes.LUNAPARK),
('lunapark01', BackendTypes.LUNAPARK),
('overload', BackendTypes.OVERLOAD),
('overload01', BackendTypes.OVERLOAD)
])
def test_identify(self, section_name, expected_type):
assert BackendTypes.identify_backend(section_name) == expected_type
@pytest.mark.parametrize('section_name', [
'meta lunapark',
'meta ',
' lunapark',
'lp '
])
def test_exception(self, section_name):
with pytest.raises(KeyError) as excinfo:
BackendTypes.identify_backend(section_name)
assert 'section name' in str(excinfo.value)

View File

@ -22,8 +22,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
""" JMeter tank plugin """
SECTION = 'jmeter'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.jmeter_process = None
self.args = None
self.original_jmx = None

View File

@ -17,8 +17,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
# pylint:disable=R0902
SECTION = 'json_report'
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self._is_telegraf = None
def get_available_options(self):

View File

@ -22,10 +22,10 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
SECTION = "maven"
def __init__(self, core):
def __init__(self, core, config_section):
# FIXME python version 2.7 does not support this syntax. super() should
# have arguments in Python 2
super().__init__(core)
super(Plugin, self).__init__(core, config_section)
self.maven_cmd = "mvn"
self.process = None
self.process_stderr = None

View File

@ -24,8 +24,8 @@ class Plugin(AbstractPlugin):
SECTION = 'monitoring'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.jobno = None
self.default_target = None
self.config = None

View File

@ -30,8 +30,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
SECTION = 'overload'
RC_STOP_FROM_WEB = 8
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.locks_list_dict = {}
self.api_client = OverloadClient()
self.jobno = None

View File

@ -16,13 +16,13 @@ logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin, GeneratorPlugin):
''' Pandora load generator plugin '''
''' Pandora load generator plugin '''
OPTION_CONFIG = "config"
SECTION = "pandora"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.buffered_seconds = 2
self.enum_ammo = False
self.process = None

View File

@ -26,8 +26,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
OPTION_CONFIG = "config"
SECTION = PhantomConfig.SECTION
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.config = None
self.process = None

View File

@ -19,8 +19,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.hosts = []
self.port = None
self.logfile = None

View File

@ -5,8 +5,8 @@ from ...common.interfaces import AbstractPlugin
class Plugin(AbstractPlugin):
SECTION = 'rcassert'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.ok_codes = []
self.fail_code = 10

View File

@ -16,9 +16,9 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
def __init__(self, core, config_section):
''' Constructor '''
AbstractPlugin.__init__(self, core)
AbstractPlugin.__init__(self, core, config_section)
self.interval = "10s"
self.disk_limit = 2048 # 2 GB
self.mem_limit = 512 # 0.5 GB

View File

@ -12,8 +12,8 @@ class Plugin(AbstractPlugin):
'''
SECTION = 'shellexec'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.catch_out = False
self.end = None
self.poll = None

View File

@ -31,8 +31,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
SECTION = "svgreport"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
def get_available_options(self):
return [_REPORT_FILE_OPTION]

View File

@ -32,8 +32,8 @@ class Plugin(AbstractPlugin):
SECTION = 'telegraf' # may be redefined to 'monitoring' sometimes.
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.jobno = None
self.default_target = None
self.default_config = "{path}/config/monitoring_default_config.xml".format(

View File

@ -16,8 +16,8 @@ class Plugin(AbstractPlugin, AbstractInfoWidget):
'''
SECTION = 'tips'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
AbstractInfoWidget.__init__(self)
self.lines = [
l.decode('utf-8')