This commit is contained in:
Arseniy Fomchenko 2017-03-22 19:30:18 +03:00
parent c8e5b41ea9
commit d0f4716066
7 changed files with 39 additions and 735 deletions

View File

@ -15,7 +15,7 @@ from urlparse import urljoin
from datetime import datetime
import pkg_resources
from .client import KSHMAPIClient
from .client import APIClient
from .plugin import LPJob
CONFIG_FILE = 'saved_conf.ini'
@ -130,7 +130,7 @@ def post_loader():
config = read_config(shooting_dir)
lp_config = get_lp_config(config)
api_client = KSHMAPIClient(base_url=lp_config['api_address'],
api_client = APIClient(base_url=lp_config['api_address'],
user_agent='Lunapark/{}'.format(
pkg_resources.require('yatank-internal-lunapark')[0].version)
# todo: add timeouts

View File

@ -15,7 +15,7 @@ requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__) # pylint: disable=C0103
class KSHMAPIClient(object):
class APIClient(object):
def __init__(
self,
@ -60,7 +60,7 @@ class KSHMAPIClient(object):
self._base_url = url
class UnderMaintenance(Exception):
message = "KSHM is under maintenance"
message = "API is under maintenance"
class NotAvailable(Exception):
desc = "API is not available"
@ -174,8 +174,7 @@ class KSHMAPIClient(object):
try:
timeout = next(maintenance_timeouts)
logger.warn(
"KSHM is under maintenance, will retry in %ss..." %
timeout)
"%s is under maintenance, will retry in %ss..." % (self._base_url, timeout))
time.sleep(timeout)
continue
except StopIteration:
@ -586,7 +585,7 @@ class KSHMAPIClient(object):
self.__post_raw(addr, {"configinfo": config}, trace=trace)
class OverloadClient(KSHMAPIClient):
class OverloadClient(APIClient):
def send_status(self, jobno, upload_token, status, trace=False):
return

View File

@ -22,7 +22,7 @@ from ...common.interfaces import AbstractPlugin, \
from ...common.util import expand_to_seconds
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from .client import KSHMAPIClient, OverloadClient
from .client import APIClient, OverloadClient
logger = logging.getLogger(__name__) # pylint: disable=C0103
@ -48,7 +48,6 @@ class BackendTypes(object):
class Plugin(AbstractPlugin, AggregateResultListener,
MonitoringDataListener):
"""API Client class for Yandex KSHM web service"""
SECTION = 'meta'
RC_STOP_FROM_WEB = 8
VERSION = '3.0'
@ -211,7 +210,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
self.check_task_is_open()
self.lp_job.create()
self.make_symlink(self.lp_job.number)
except (KSHMAPIClient.JobNotCreated, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e:
except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:
logger.error(e.message)
logger.error(
'Failed to connect to Lunapark, disabling DataUploader')
@ -385,11 +384,11 @@ class Plugin(AbstractPlugin, AggregateResultListener,
try:
self.lp_job.send_status(self.core.status)
time.sleep(self.send_status_period)
except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable) as e:
except (APIClient.NetworkError, APIClient.NotAvailable) as e:
logger.warn('Failed to send status')
logger.debug(e.message)
break
except KSHMAPIClient.StoppedFromOnline:
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
@ -411,7 +410,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
lp_job.push_test_data(data, stats)
except Empty:
continue
except KSHMAPIClient.StoppedFromOnline:
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
@ -436,11 +435,11 @@ class Plugin(AbstractPlugin, AggregateResultListener,
break
except Empty:
continue
except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable, KSHMAPIClient.UnderMaintenance) as e:
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push monitoring data')
logger.warn(e.message)
break
except KSHMAPIClient.StoppedFromOnline:
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
@ -537,7 +536,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
def __get_api_client(self):
if self.backend_type == BackendTypes.LUNAPARK:
client = KSHMAPIClient
client = APIClient
self._api_token = None
elif self.backend_type == BackendTypes.OVERLOAD:
client = OverloadClient
@ -667,7 +666,7 @@ class LPJob(object):
detailed_time=None,
load_scheme=None):
"""
:param client: KSHMAPIClient
:param client: APIClient
:param log_data_requests: bool
:param log_other_request: bool
:param log_status_requests: bool
@ -701,7 +700,7 @@ class LPJob(object):
try:
self.api_client.push_test_data(
self.number, self.token, data, stats, trace=self.log_data_requests)
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError, KSHMAPIClient.UnderMaintenance):
except (APIClient.NotAvailable, APIClient.NetworkError, APIClient.UnderMaintenance):
logger.warn('Failed to push test data')
self.is_alive = False
@ -729,8 +728,8 @@ class LPJob(object):
is_starred=is_starred,
tank_type=tank_type,
trace=self.log_other_requests)
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NetworkError,
KSHMAPIClient.UnderMaintenance) as e:
except (APIClient.NotAvailable, APIClient.StoppedFromOnline, APIClient.NetworkError,
APIClient.UnderMaintenance) as e:
logger.warn('Failed to edit job metainfo on Lunapark')
logger.warn(e.message)
@ -799,7 +798,7 @@ class LPJob(object):
lock_target_duration,
trace=self.log_other_requests)
return True
except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline) as e:
except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e:
logger.info('Target is not locked due to %s', e.message)
if ignore:
logger.info('ignore_target_locks = 1')
@ -809,7 +808,7 @@ class LPJob(object):
else:
logger.info('strict_lock = 0')
return False
except KSHMAPIClient.UnderMaintenance:
except APIClient.UnderMaintenance:
logger.info('Target is locked')
logger.info("Manual unlock link: %s%s", self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target))
@ -827,10 +826,10 @@ class LPJob(object):
try:
return self.api_client.is_target_locked(
host, trace=self.log_other_requests)
except KSHMAPIClient.UnderMaintenance:
except APIClient.UnderMaintenance:
logger.info('Target is locked, retrying...')
continue
except (KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e:
except (APIClient.StoppedFromOnline, APIClient.NotAvailable, APIClient.NetworkError) as e:
logger.warn('Can\'t check whether target is locked\n')
if strict:
logger.warn('Stopping test due to strict_lock')

View File

@ -4,7 +4,7 @@ import time
import pytest
from mock import patch, call, MagicMock
from requests import ConnectionError
from yandextank.plugins.DataUploader.client import KSHMAPIClient
from yandextank.plugins.DataUploader.client import APIClient
from yandextank.plugins.DataUploader.plugin import online_uploader, LPJob
@ -21,7 +21,7 @@ class TestOnlineUploader(object):
queue = Queue()
job = LPJob(job_number, token)
with patch.object(KSHMAPIClient, 'push_data') as push_data_mock:
with patch.object(APIClient, 'push_data') as push_data_mock:
thread = threading.Thread(
target=online_uploader,
@ -49,8 +49,8 @@ class TestOnlineUploader(object):
queue = Queue()
job = LPJob()
with patch.object(KSHMAPIClient, 'new_job', return_value=(job_number, token)) as new_job_mock:
with patch.object(KSHMAPIClient, 'push_data') as push_data_mock:
with patch.object(APIClient, 'new_job', return_value=(job_number, token)) as new_job_mock:
with patch.object(APIClient, 'push_data') as push_data_mock:
thread = threading.Thread(
target=online_uploader,
@ -139,7 +139,7 @@ class TestClient(object):
TEST_STATS = {'metrics': {'instances': 0, 'reqps': 0}, 'ts': 1476446024}
def test_new_job(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
mock_response = MagicMock()
@ -156,7 +156,7 @@ class TestClient(object):
upload_token)
def test_new_job_retry_maintenance(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/',
maintenance_timeout=2)
with patch('requests.Session.send') as send_mock:
@ -177,7 +177,7 @@ class TestClient(object):
upload_token)
def test_new_job_retry_network(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
expected_response = MagicMock()
@ -198,7 +198,7 @@ class TestClient(object):
upload_token)
def test_new_job_retry_api(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
@ -218,7 +218,7 @@ class TestClient(object):
upload_token)
def test_new_job_unavailable(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/',
api_attempts=3,
api_timeout=1)
@ -234,7 +234,7 @@ class TestClient(object):
bad_response,
good_response]
with pytest.raises(KSHMAPIClient.JobNotCreated):
with pytest.raises(APIClient.JobNotCreated):
client.new_job(
'LOAD-204',
'fomars',
@ -243,7 +243,7 @@ class TestClient(object):
1234)
def test_push_data(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
mock_response = MagicMock()
@ -254,7 +254,7 @@ class TestClient(object):
job_nr, self.TEST_DATA, self.TEST_STATS) == 1
def test_push_data_retry_network(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
expected_response = MagicMock()
@ -270,7 +270,7 @@ class TestClient(object):
assert result == 1
def test_push_data_retry_api(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/')
with patch('requests.Session.send') as send_mock:
bad_response = MagicMock()
@ -285,7 +285,7 @@ class TestClient(object):
assert result == 1
def test_push_data_api_exception(self, job_nr, upload_token):
client = KSHMAPIClient(
client = APIClient(
base_url='https://lunapark.test.yandex-team.ru/',
api_timeout=1,
api_attempts=3)

View File

@ -1 +0,0 @@
from .plugin import Plugin # noqa:F401

View File

@ -1,319 +0,0 @@
import datetime
import json
import time
import urllib
import requests
import logging
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__) # pylint: disable=C0103
class OverloadClient(object):
def __init__(self):
self.address = None
self.token = None
self.upload_token = ''
self.api_token = ""
self.api_timeout = None
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({"User-Agent": "tank"})
if "https" in requests.utils.getproxies():
logger.info("Connecting via proxy %s" % requests.utils.getproxies()['https'])
self.session.proxies = requests.utils.getproxies()
else:
logger.info("Proxy not set")
def set_api_address(self, addr):
self.address = addr
def set_api_timeout(self, timeout):
self.api_timeout = float(timeout)
def set_api_token(self, api_token):
self.api_token = api_token
def get_raw(self, addr):
if not self.address:
raise ValueError("Can't request unknown address")
addr = self.address + addr
logger.debug("Making request to: %s", addr)
req = requests.Request('GET', addr)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
resp_data = resp.content.strip()
logger.debug("Raw response: %s", resp_data)
return resp_data
def get(self, addr):
resp = self.get_raw(addr)
response = json.loads(resp)
logger.debug("Response: %s", response)
return response
def post_raw(self, addr, txt_data):
if not self.address:
raise ValueError("Can't request unknown address")
addr = self.address + addr
logger.debug("Making POST request to: %s", addr)
req = requests.Request("POST", addr, data=txt_data)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
logger.debug("Response: %s", resp.content)
return resp.content
def post(self, addr, data):
addr = self.address + addr
json_data = json.dumps(data, indent=2)
logger.debug("Making POST request to: %s\n%s", addr, json_data)
req = requests.Request("POST", addr, data=json_data)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
logger.debug("Response: %s", resp.content)
return resp.json()
def get_task_data(self, task):
return self.get("api/task/" + task + "/summary.json")
def new_job(
self, task, person, tank, target_host, target_port, loadscheme,
detailed_time, notify_list):
data = {
'task': task,
'person': person,
'tank': tank,
'host': target_host,
'port': target_port,
'loadscheme': loadscheme,
'detailed_time': detailed_time,
'notify': notify_list,
}
logger.debug("Job create request: %s", data)
while True:
try:
response = self.post(
"api/job/create.json?api_token=" + self.api_token, data)
self.upload_token = response[0].get('upload_token', '')
return response[0]['job']
except requests.exceptions.HTTPError as ex:
logger.debug("Got error for job create request: %s", ex)
if ex.response.status_code == 423:
logger.warn(
"Overload is under maintenance, will retry in 5s...")
time.sleep(5)
else:
raise ex
raise RuntimeError("Unreachable point hit")
def get_job_summary(self, jobno):
result = self.get(
'api/job/' + str(jobno) + "/summary.json?api_token=" +
self.api_token)
return result[0]
def close_job(self, jobno, retcode):
params = {
'exitcode': str(retcode),
'api_token': self.api_token,
}
result = self.get(
'api/job/' + str(jobno) + '/close.json?' + urllib.urlencode(params))
return result[0]['success']
def edit_job_metainfo(
self, jobno, job_name, job_dsc, instances, ammo_path, loop_count,
version_tested, is_regression, component, tank_type, cmdline,
is_starred):
data = {
'name': job_name,
'description': job_dsc,
'instances': str(instances),
'ammo': ammo_path,
'loop': loop_count,
'version': version_tested,
'regression': str(is_regression),
'component': component,
'tank_type': int(tank_type),
'command_line': cmdline,
'starred': int(is_starred),
}
response = self.post(
'api/job/' + str(jobno) + "/edit.json?api_token=" + self.api_token,
data)
return response
def set_imbalance_and_dsc(self, jobno, rps, comment):
data = {}
if rps:
data['imbalance'] = rps
if comment:
data['description'] = comment.strip()
response = self.post(
'api/job/' + str(jobno) + "/set_imbalance.json?api_token=" +
self.api_token, data)
return response
def second_data_to_push_item(self, data, stat, timestamp, overall, case):
"""
@data: SecondAggregateDataItem
"""
api_data = {
'overall': overall,
'case': case,
'net_codes': [],
'http_codes': [],
'time_intervals': [],
'trail': {
'time': str(timestamp),
'reqps': stat["metrics"]["reqps"],
'resps': data["interval_real"]["len"],
'expect': data["interval_real"]["total"] / 1000.0 /
data["interval_real"]["len"],
'disper': 0,
'self_load':
0, # TODO abs(round(100 - float(data.selfload), 2)),
'input': data["size_in"]["total"],
'output': data["size_out"]["total"],
'connect_time': data["connect_time"]["total"] / 1000.0 /
data["connect_time"]["len"],
'send_time':
data["send_time"]["total"] / 1000.0 / data["send_time"]["len"],
'latency':
data["latency"]["total"] / 1000.0 / data["latency"]["len"],
'receive_time': data["receive_time"]["total"] / 1000.0 /
data["receive_time"]["len"],
'threads': stat["metrics"]["instances"], # TODO
}
}
for q, value in zip(
data["interval_real"]["q"]["q"],
data["interval_real"]["q"]["value"]):
api_data['trail']['q' + str(q)] = value / 1000.0
for code, cnt in data["net_code"]["count"].iteritems():
api_data['net_codes'].append({'code': int(code), 'count': int(cnt)})
for code, cnt in data["proto_code"]["count"].iteritems():
api_data['http_codes'].append({
'code': int(code),
'count': int(cnt)
})
api_data['time_intervals'] = self.convert_hist(
data["interval_real"]["hist"])
return api_data
def convert_hist(self, hist):
data = hist['data']
bins = hist['bins']
return [
{
"from": 0, # deprecated
"to": b / 1000.0,
"count": count,
} for b, count in zip(bins, data)
]
def push_test_data(self, jobno, data_item, stat_item):
items = []
uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(
jobno, self.upload_token)
ts = datetime.datetime.fromtimestamp(data_item["ts"])
for case_name, case_data in data_item["tagged"].iteritems():
if case_name == "":
case_name = "__EMPTY__"
if (len(case_name)) > 128:
raise RuntimeError('tag (case) name is too long: ' + case_name)
push_item = self.second_data_to_push_item(
case_data, stat_item, ts, 0, case_name)
items.append(push_item)
overall = self.second_data_to_push_item(
data_item["overall"], stat_item, ts, 1, '')
items.append(overall)
while True:
try:
res = self.post(uri, items)
break
except requests.exceptions.HTTPError as ex:
if ex.response.status_code == 400:
logger.error('Bad request to %s: %s', uri, ex)
return 0
elif ex.response.status_code == 410:
logger.info("Test has been stopped by Overload server")
return 0
else:
logger.warn(
"Unknown HTTP error while sending second data. "
"Retry in 10 sec: %s", ex)
time.sleep(10) # FIXME this makes all plugins freeze
except requests.exceptions.RequestException as ex:
logger.warn(
"Failed to push second data to API,"
" retry in 10 sec: %s", ex)
time.sleep(10) # FIXME this makes all plugins freeze
except Exception: # pylint: disable=W0703
# something nasty happened, but we don't want to fail here
logger.exception(
"Unknown exception while pushing second data to API")
return 0
try:
success = int(res[0]['success'])
except Exception: # pylint: disable=W0703
logger.warning("Malformed response from API: %s", res)
success = 0
return success
def push_monitoring_data(self, jobno, send_data):
if send_data:
addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (
jobno, self.upload_token)
while True:
try:
self.post_raw(addr, send_data)
return
except requests.exceptions.HTTPError as ex:
if ex.response.status_code == 400:
logger.error('Bad request to %s: %s', addr, ex)
break
elif ex.response.status_code == 410:
logger.info("Test has been stopped by Overload server")
return
else:
logger.warning(
'Unknown http code while sending monitoring data,'
' retry in 10s: %s', ex)
time.sleep(10) # FIXME this makes all plugins freeze
except requests.exceptions.RequestException as ex:
logger.warning(
'Problems sending monitoring data,'
' retry in 10s: %s', ex)
time.sleep(10) # FIXME this makes all plugins freeze
except Exception: # pylint: disable=W0703
# something irrecoverable happened
logger.exception(
"Unknown exception while pushing monitoring data to API")
return
def send_console(self, jobno, console):
logger.debug(
"Sending console view [%s]: %s", len(console), console[:64])
addr = ("api/job/%s/console.txt?api_token=" % jobno) + self.api_token,
self.post_raw(addr, {"console": console, })
def send_config_snapshot(self, jobno, config):
logger.debug("Sending config snapshot")
addr = ("api/job/%s/configinfo.txt?api_token=" % jobno) + self.api_token
self.post_raw(addr, {"configinfo": config, })

View File

@ -1,374 +0,0 @@
# TODO: make the next two lines unnecessary
# pylint: disable=line-too-long
# pylint: disable=missing-docstring
import StringIO
import copy
import json
import logging
import os
import pwd
import socket
import sys
from .client import OverloadClient
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from ..JMeter import Plugin as JMeterPlugin
from ..Monitoring import Plugin as MonitoringPlugin
# from ..Pandora import Plugin as PandoraPlugin
from ..Phantom import Plugin as PhantomPlugin
from ...common.interfaces import AbstractPlugin,\
MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
logger = logging.getLogger(__name__) # pylint: disable=C0103
class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
"""
Yandex Overload analytics service client (https://overload.yandex.net)
"""
SECTION = 'overload'
RC_STOP_FROM_WEB = 8
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.locks_list_dict = {}
self.api_client = OverloadClient()
self.jobno = None
self.operator = ''
self.retcode = -1
self.copy_config = None
self.jobno_file = None
self.target = None
self.lock_target_duration = None
self.locks_list_cfg = None
self.task = None
self.job_name = None
self.job_dsc = None
self.notify_list = None
self.version_tested = None
self.regression_component = None
self.is_regression = None
self.ignore_target_lock = None
self.port = None
self.mon = None
@staticmethod
def get_key():
return __file__
def get_available_options(self):
opts = [
"api_address",
"task",
"job_name",
"job_dsc",
"notify",
"ver",
]
opts += [
"component",
"regress",
"operator",
"copy_config_to",
"jobno_file",
]
opts += ["token_file"]
return opts
@staticmethod
def read_token(filename):
if filename:
logger.debug("Trying to read token from %s", filename)
try:
with open(filename, 'r') as handle:
data = handle.read().strip()
logger.info(
"Read authentication token from %s, "
"token length is %d bytes", filename, len(str(data)))
except IOError:
logger.error(
"Failed to read Overload API token from %s", filename)
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
return data
else:
logger.error("Overload API token filename is not defined")
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
def configure(self):
aggregator = self.core.job.aggregator_plugin
aggregator.add_result_listener(self)
self.api_client.set_api_address(self.get_option("api_address"))
self.api_client.set_api_timeout(self.get_option("api_timeout", 30))
self.api_client.set_api_token(
self.read_token(self.get_option("token_file", "")))
self.task = self.get_option("task", "DEFAULT")
self.job_name = unicode(
self.get_option("job_name", "none").decode("utf8"))
if self.job_name == "ask" and sys.stdin.isatty():
self.job_name = unicode(
raw_input("Please, enter job_name: ").decode("utf8"))
self.job_dsc = unicode(self.get_option("job_dsc", "").decode("utf8"))
if self.job_dsc == "ask" and sys.stdin.isatty():
self.job_dsc = unicode(
raw_input("Please, enter job_dsc: ").decode("utf8"))
self.notify_list = self.get_option("notify", "").split(" ")
self.version_tested = unicode(self.get_option("ver", ""))
self.regression_component = unicode(self.get_option("component", ""))
self.is_regression = self.get_option("regress", "0")
self.operator = self.get_option("operator", self.operator)
if not self.operator:
try:
# Clouds and some virtual envs may fail this
self.operator = pwd.getpwuid(os.geteuid())[0]
except:
logger.warning('failed to getpwuid.', exc_into=True)
self.operator = 'unknown'
self.copy_config = self.get_option("copy_config_to", "")
self.jobno_file = self.get_option("jobno_file", "")
if self.core.job.monitoring_plugin:
self.mon = self.core.job.monitoring_plugin
if self.mon.monitoring:
self.mon.monitoring.add_listener(self)
self.__save_conf()
def prepare_test(self):
try:
console = self.core.get_plugin_of_type(ConsolePlugin)
except KeyError:
logger.debug("Console plugin not found", exc_info=True)
console = None
if console:
console.add_info_widget(JobInfoWidget(self))
console.remote_translator = self
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
info = phantom.get_info()
self.target = info.address
except (KeyError, AttributeError) as ex:
logger.debug("No phantom plugin to get target info: %s", ex)
self.target = socket.getfqdn()
self.__save_conf()
def start_test(self):
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
info = phantom.get_info()
self.target = info.address
port = info.port
instances = info.instances
tank_type = 1 if info.tank_type == "http" else 2
# FIXME why don't we use resource_opener here?
if info.ammo_file.startswith(
"http://") or info.ammo_file.startswith("https://"):
ammo_path = info.ammo_file
else:
ammo_path = os.path.realpath(info.ammo_file)
loadscheme = [] if isinstance(
info.rps_schedule, unicode) else info.rps_schedule
loop_count = info.loop_count
except (KeyError, AttributeError) as ex:
logger.debug("No phantom plugin to get target info: %s", ex)
self.target = socket.getfqdn()
port = 80
instances = 1
tank_type = 1
ammo_path = ''
loadscheme = []
loop_count = 0
try:
jmeter = self.core.get_plugin_of_type(JMeterPlugin)
ammo_path = jmeter.original_jmx
except KeyError as ex:
logger.debug("No jmeter plugin to get info: %s", ex)
# try:
# pandora = self.core.get_plugin_of_type(PandoraPlugin)
# # TODO: get info from Pandora here
# except KeyError as ex:
# logger.debug("No pandora plugin to get info: %s", ex)
detailed_field = "interval_real"
logger.info("Detected target: %s", self.target)
self.jobno = self.api_client.new_job(
self.task, self.operator,
socket.getfqdn(), self.target, port, loadscheme, detailed_field,
self.notify_list)
web_link = "%s%s" % (self.api_client.address, self.jobno)
logger.info("Web link: %s", web_link)
self.publish("jobno", self.jobno)
self.publish("web_link", web_link)
self.make_symlink(self.jobno)
self.set_option("jobno", str(self.jobno))
if self.jobno_file:
logger.debug("Saving jobno to: %s", self.jobno_file)
fdes = open(self.jobno_file, 'w')
fdes.write(str(self.jobno))
fdes.close()
self.api_client.edit_job_metainfo(
self.jobno, self.job_name, self.job_dsc, instances, ammo_path,
loop_count, self.version_tested, self.is_regression,
self.regression_component, tank_type, " ".join(sys.argv), 0)
self.__save_conf()
def is_test_finished(self):
return self.retcode
def end_test(self, retcode):
self.__save_conf()
return retcode
def post_process(self, rc):
if self.jobno:
try:
self.api_client.close_job(self.jobno, rc)
except Exception: # pylint: disable=W0703
logger.warning("Failed to close job", exc_info=True)
logger.info("Web link: %s%s", self.api_client.address, self.jobno)
autostop = None
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
except KeyError:
logger.debug("No autostop plugin loaded", exc_info=True)
if autostop and autostop.cause_criterion:
rps = 0
if autostop.cause_criterion.cause_second:
rps = autostop.cause_criterion.cause_second[1]["metrics"][
"reqps"]
if not rps:
rps = autostop.cause_criterion.cause_second[0][
"overall"]["interval_real"]["len"]
self.api_client.set_imbalance_and_dsc(
self.jobno, rps, autostop.cause_criterion.explain())
else:
logger.debug("No autostop cause detected")
self.__save_conf()
return rc
def __send_data(self, data_item, stat_item):
if self.retcode < 0 and not self.api_client.push_test_data(
self.jobno, data_item, stat_item):
logger.warn("The test was stopped from Web interface")
self.retcode = self.RC_STOP_FROM_WEB
def on_aggregated_data(self, data, stats):
"""
@data: aggregated data
@stats: stats about gun
"""
if not self.jobno:
logger.warning("No jobNo gained yet")
return
self.__send_data(data, stats)
def monitoring_data(self, data_list):
if not self.jobno:
logger.debug("No jobNo gained yet")
return
if self.retcode < 0:
if "Telegraf" in self.core.job.monitoring_plugin.__module__:
self.api_client.push_monitoring_data(
self.jobno, json.dumps(data_list))
elif "Monitoring" in self.core.job.monitoring_plugin.__module__:
[
self.api_client.push_monitoring_data(self.jobno, data)
for data in data_list if data
]
else:
logger.warn("The test was stopped from Web interface")
def __save_conf(self):
if self.copy_config:
self.core.config.flush(self.copy_config)
config = copy.copy(self.core.config.config)
try:
mon = self.core.get_plugin_of_type(MonitoringPlugin)
config_filename = mon.config
if config_filename and config_filename not in ['none', 'auto']:
with open(config_filename) as config_file:
config.set(
MonitoringPlugin.SECTION, "config_contents",
config_file.read())
except Exception: # pylint: disable=W0703
logger.debug("Can't get monitoring config", exc_info=True)
output = StringIO.StringIO()
config.write(output)
if self.jobno:
try:
self.api_client.send_config_snapshot(
self.jobno, output.getvalue())
except Exception: # pylint: disable=W0703
logger.debug("Can't send config snapshot: %s", exc_info=True)
def send_console(self, text):
try:
self.api_client.send_console(self.jobno, text)
except Exception: # pylint: disable=W0703
logger.debug("Can't send console snapshot: %s", exc_info=True)
def make_symlink(self, name):
PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, self.SECTION)
if not os.path.exists(PLUGIN_DIR):
os.makedirs(PLUGIN_DIR)
os.symlink(self.core.artifacts_dir, os.path.join(PLUGIN_DIR, str(name)))
def _core_with_tank_api(self):
"""
Return True if we are running under Tank API
"""
api_found = False
try:
import yandex_tank_api.worker # pylint: disable=F0401
except ImportError:
logger.debug("Attempt to import yandex_tank_api.worker failed")
else:
api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)
logger.debug(
"We are%s running under API server", ""
if api_found else " likely not")
return api_found
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):
AbstractInfoWidget.__init__(self)
self.owner = sender
def get_index(self):
return 1
def render(self, screen):
template = "Author: " + screen.markup.RED + "%s" + \
screen.markup.RESET + \
"%s\n Job: %s %s\n Web: %s%s"
data = (
self.owner.operator[:1], self.owner.operator[1:], self.owner.jobno,
self.owner.job_name, self.owner.api_client.address,
self.owner.jobno)
return template % data