From d0f47160662a7629f69179f315d59e8bce6f7653 Mon Sep 17 00:00:00 2001 From: Arseniy Fomchenko Date: Wed, 22 Mar 2017 19:30:18 +0300 Subject: [PATCH] no kshm --- yandextank/plugins/DataUploader/cli.py | 10 +- yandextank/plugins/DataUploader/client.py | 9 +- yandextank/plugins/DataUploader/plugin.py | 33 +- .../plugins/DataUploader/tests/test_foo.py | 28 +- yandextank/plugins/Overload/__init__.py | 1 - yandextank/plugins/Overload/client.py | 319 --------------- yandextank/plugins/Overload/plugin.py | 374 ------------------ 7 files changed, 39 insertions(+), 735 deletions(-) delete mode 100644 yandextank/plugins/Overload/__init__.py delete mode 100644 yandextank/plugins/Overload/client.py delete mode 100644 yandextank/plugins/Overload/plugin.py diff --git a/yandextank/plugins/DataUploader/cli.py b/yandextank/plugins/DataUploader/cli.py index ffb04c5..eaf4a1e 100644 --- a/yandextank/plugins/DataUploader/cli.py +++ b/yandextank/plugins/DataUploader/cli.py @@ -15,7 +15,7 @@ from urlparse import urljoin from datetime import datetime import pkg_resources -from .client import KSHMAPIClient +from .client import APIClient from .plugin import LPJob CONFIG_FILE = 'saved_conf.ini' @@ -130,11 +130,11 @@ def post_loader(): config = read_config(shooting_dir) lp_config = get_lp_config(config) - api_client = KSHMAPIClient(base_url=lp_config['api_address'], - user_agent='Lunapark/{}'.format( + api_client = APIClient(base_url=lp_config['api_address'], + user_agent='Lunapark/{}'.format( pkg_resources.require('yatank-internal-lunapark')[0].version) - # todo: add timeouts - ) + # todo: add timeouts + ) lp_job = LPJob( client=api_client, target_host=lp_config.get('target_host'), diff --git a/yandextank/plugins/DataUploader/client.py b/yandextank/plugins/DataUploader/client.py index 24ac0f4..6480777 100644 --- a/yandextank/plugins/DataUploader/client.py +++ b/yandextank/plugins/DataUploader/client.py @@ -15,7 +15,7 @@ requests.packages.urllib3.disable_warnings() logger = logging.getLogger(__name__) # pylint: disable=C0103 -class KSHMAPIClient(object): +class APIClient(object): def __init__( self, @@ -60,7 +60,7 @@ class KSHMAPIClient(object): self._base_url = url class UnderMaintenance(Exception): - message = "KSHM is under maintenance" + message = "API is under maintenance" class NotAvailable(Exception): desc = "API is not available" @@ -174,8 +174,7 @@ class KSHMAPIClient(object): try: timeout = next(maintenance_timeouts) logger.warn( - "KSHM is under maintenance, will retry in %ss..." % - timeout) + "%s is under maintenance, will retry in %ss..." % (self._base_url, timeout)) time.sleep(timeout) continue except StopIteration: @@ -586,7 +585,7 @@ class KSHMAPIClient(object): self.__post_raw(addr, {"configinfo": config}, trace=trace) -class OverloadClient(KSHMAPIClient): +class OverloadClient(APIClient): def send_status(self, jobno, upload_token, status, trace=False): return diff --git a/yandextank/plugins/DataUploader/plugin.py b/yandextank/plugins/DataUploader/plugin.py index a0cf69a..eb7cbbb 100644 --- a/yandextank/plugins/DataUploader/plugin.py +++ b/yandextank/plugins/DataUploader/plugin.py @@ -22,7 +22,7 @@ from ...common.interfaces import AbstractPlugin, \ from ...common.util import expand_to_seconds from ..Autostop import Plugin as AutostopPlugin from ..Console import Plugin as ConsolePlugin -from .client import KSHMAPIClient, OverloadClient +from .client import APIClient, OverloadClient logger = logging.getLogger(__name__) # pylint: disable=C0103 @@ -48,7 +48,6 @@ class BackendTypes(object): class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener): - """API Client class for Yandex KSHM web service""" SECTION = 'meta' RC_STOP_FROM_WEB = 8 VERSION = '3.0' @@ -211,7 +210,7 @@ class Plugin(AbstractPlugin, AggregateResultListener, self.check_task_is_open() self.lp_job.create() self.make_symlink(self.lp_job.number) - except (KSHMAPIClient.JobNotCreated, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e: + except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e: logger.error(e.message) logger.error( 'Failed to connect to Lunapark, disabling DataUploader') @@ -385,11 +384,11 @@ class Plugin(AbstractPlugin, AggregateResultListener, try: self.lp_job.send_status(self.core.status) time.sleep(self.send_status_period) - except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable) as e: + except (APIClient.NetworkError, APIClient.NotAvailable) as e: logger.warn('Failed to send status') logger.debug(e.message) break - except KSHMAPIClient.StoppedFromOnline: + except APIClient.StoppedFromOnline: logger.info("Test stopped from Lunapark") lp_job.is_alive = False self.retcode = 8 @@ -411,7 +410,7 @@ class Plugin(AbstractPlugin, AggregateResultListener, lp_job.push_test_data(data, stats) except Empty: continue - except KSHMAPIClient.StoppedFromOnline: + except APIClient.StoppedFromOnline: logger.info("Test stopped from Lunapark") lp_job.is_alive = False self.retcode = 8 @@ -436,11 +435,11 @@ class Plugin(AbstractPlugin, AggregateResultListener, break except Empty: continue - except (KSHMAPIClient.NetworkError, KSHMAPIClient.NotAvailable, KSHMAPIClient.UnderMaintenance) as e: + except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e: logger.warn('Failed to push monitoring data') logger.warn(e.message) break - except KSHMAPIClient.StoppedFromOnline: + except APIClient.StoppedFromOnline: logger.info("Test stopped from Lunapark") lp_job.is_alive = False self.retcode = 8 @@ -537,7 +536,7 @@ class Plugin(AbstractPlugin, AggregateResultListener, def __get_api_client(self): if self.backend_type == BackendTypes.LUNAPARK: - client = KSHMAPIClient + client = APIClient self._api_token = None elif self.backend_type == BackendTypes.OVERLOAD: client = OverloadClient @@ -667,7 +666,7 @@ class LPJob(object): detailed_time=None, load_scheme=None): """ - :param client: KSHMAPIClient + :param client: APIClient :param log_data_requests: bool :param log_other_request: bool :param log_status_requests: bool @@ -701,7 +700,7 @@ class LPJob(object): try: self.api_client.push_test_data( self.number, self.token, data, stats, trace=self.log_data_requests) - except (KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError, KSHMAPIClient.UnderMaintenance): + except (APIClient.NotAvailable, APIClient.NetworkError, APIClient.UnderMaintenance): logger.warn('Failed to push test data') self.is_alive = False @@ -729,8 +728,8 @@ class LPJob(object): is_starred=is_starred, tank_type=tank_type, trace=self.log_other_requests) - except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NetworkError, - KSHMAPIClient.UnderMaintenance) as e: + except (APIClient.NotAvailable, APIClient.StoppedFromOnline, APIClient.NetworkError, + APIClient.UnderMaintenance) as e: logger.warn('Failed to edit job metainfo on Lunapark') logger.warn(e.message) @@ -799,7 +798,7 @@ class LPJob(object): lock_target_duration, trace=self.log_other_requests) return True - except (KSHMAPIClient.NotAvailable, KSHMAPIClient.StoppedFromOnline) as e: + except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e: logger.info('Target is not locked due to %s', e.message) if ignore: logger.info('ignore_target_locks = 1') @@ -809,7 +808,7 @@ class LPJob(object): else: logger.info('strict_lock = 0') return False - except KSHMAPIClient.UnderMaintenance: + except APIClient.UnderMaintenance: logger.info('Target is locked') logger.info("Manual unlock link: %s%s", self.api_client.base_url, self.api_client.get_manual_unlock_link(lock_target)) @@ -827,10 +826,10 @@ class LPJob(object): try: return self.api_client.is_target_locked( host, trace=self.log_other_requests) - except KSHMAPIClient.UnderMaintenance: + except APIClient.UnderMaintenance: logger.info('Target is locked, retrying...') continue - except (KSHMAPIClient.StoppedFromOnline, KSHMAPIClient.NotAvailable, KSHMAPIClient.NetworkError) as e: + except (APIClient.StoppedFromOnline, APIClient.NotAvailable, APIClient.NetworkError) as e: logger.warn('Can\'t check whether target is locked\n') if strict: logger.warn('Stopping test due to strict_lock') diff --git a/yandextank/plugins/DataUploader/tests/test_foo.py b/yandextank/plugins/DataUploader/tests/test_foo.py index 0244c70..2ade328 100644 --- a/yandextank/plugins/DataUploader/tests/test_foo.py +++ b/yandextank/plugins/DataUploader/tests/test_foo.py @@ -4,7 +4,7 @@ import time import pytest from mock import patch, call, MagicMock from requests import ConnectionError -from yandextank.plugins.DataUploader.client import KSHMAPIClient +from yandextank.plugins.DataUploader.client import APIClient from yandextank.plugins.DataUploader.plugin import online_uploader, LPJob @@ -21,7 +21,7 @@ class TestOnlineUploader(object): queue = Queue() job = LPJob(job_number, token) - with patch.object(KSHMAPIClient, 'push_data') as push_data_mock: + with patch.object(APIClient, 'push_data') as push_data_mock: thread = threading.Thread( target=online_uploader, @@ -49,8 +49,8 @@ class TestOnlineUploader(object): queue = Queue() job = LPJob() - with patch.object(KSHMAPIClient, 'new_job', return_value=(job_number, token)) as new_job_mock: - with patch.object(KSHMAPIClient, 'push_data') as push_data_mock: + with patch.object(APIClient, 'new_job', return_value=(job_number, token)) as new_job_mock: + with patch.object(APIClient, 'push_data') as push_data_mock: thread = threading.Thread( target=online_uploader, @@ -139,7 +139,7 @@ class TestClient(object): TEST_STATS = {'metrics': {'instances': 0, 'reqps': 0}, 'ts': 1476446024} def test_new_job(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: mock_response = MagicMock() @@ -156,7 +156,7 @@ class TestClient(object): upload_token) def test_new_job_retry_maintenance(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/', maintenance_timeout=2) with patch('requests.Session.send') as send_mock: @@ -177,7 +177,7 @@ class TestClient(object): upload_token) def test_new_job_retry_network(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: expected_response = MagicMock() @@ -198,7 +198,7 @@ class TestClient(object): upload_token) def test_new_job_retry_api(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: bad_response = MagicMock() @@ -218,7 +218,7 @@ class TestClient(object): upload_token) def test_new_job_unavailable(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/', api_attempts=3, api_timeout=1) @@ -234,7 +234,7 @@ class TestClient(object): bad_response, good_response] - with pytest.raises(KSHMAPIClient.JobNotCreated): + with pytest.raises(APIClient.JobNotCreated): client.new_job( 'LOAD-204', 'fomars', @@ -243,7 +243,7 @@ class TestClient(object): 1234) def test_push_data(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: mock_response = MagicMock() @@ -254,7 +254,7 @@ class TestClient(object): job_nr, self.TEST_DATA, self.TEST_STATS) == 1 def test_push_data_retry_network(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: expected_response = MagicMock() @@ -270,7 +270,7 @@ class TestClient(object): assert result == 1 def test_push_data_retry_api(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/') with patch('requests.Session.send') as send_mock: bad_response = MagicMock() @@ -285,7 +285,7 @@ class TestClient(object): assert result == 1 def test_push_data_api_exception(self, job_nr, upload_token): - client = KSHMAPIClient( + client = APIClient( base_url='https://lunapark.test.yandex-team.ru/', api_timeout=1, api_attempts=3) diff --git a/yandextank/plugins/Overload/__init__.py b/yandextank/plugins/Overload/__init__.py deleted file mode 100644 index eb4ac99..0000000 --- a/yandextank/plugins/Overload/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .plugin import Plugin # noqa:F401 diff --git a/yandextank/plugins/Overload/client.py b/yandextank/plugins/Overload/client.py deleted file mode 100644 index 172844e..0000000 --- a/yandextank/plugins/Overload/client.py +++ /dev/null @@ -1,319 +0,0 @@ -import datetime -import json -import time -import urllib -import requests -import logging - -requests.packages.urllib3.disable_warnings() -logger = logging.getLogger(__name__) # pylint: disable=C0103 - - -class OverloadClient(object): - def __init__(self): - self.address = None - self.token = None - self.upload_token = '' - self.api_token = "" - self.api_timeout = None - self.session = requests.Session() - self.session.verify = False - self.session.headers.update({"User-Agent": "tank"}) - if "https" in requests.utils.getproxies(): - logger.info("Connecting via proxy %s" % requests.utils.getproxies()['https']) - self.session.proxies = requests.utils.getproxies() - else: - logger.info("Proxy not set") - - def set_api_address(self, addr): - self.address = addr - - def set_api_timeout(self, timeout): - self.api_timeout = float(timeout) - - def set_api_token(self, api_token): - self.api_token = api_token - - def get_raw(self, addr): - if not self.address: - raise ValueError("Can't request unknown address") - - addr = self.address + addr - logger.debug("Making request to: %s", addr) - req = requests.Request('GET', addr) - prepared = self.session.prepare_request(req) - resp = self.session.send(prepared, timeout=self.api_timeout) - resp.raise_for_status() - resp_data = resp.content.strip() - logger.debug("Raw response: %s", resp_data) - return resp_data - - def get(self, addr): - resp = self.get_raw(addr) - response = json.loads(resp) - logger.debug("Response: %s", response) - return response - - def post_raw(self, addr, txt_data): - if not self.address: - raise ValueError("Can't request unknown address") - - addr = self.address + addr - logger.debug("Making POST request to: %s", addr) - req = requests.Request("POST", addr, data=txt_data) - prepared = self.session.prepare_request(req) - resp = self.session.send(prepared, timeout=self.api_timeout) - resp.raise_for_status() - logger.debug("Response: %s", resp.content) - return resp.content - - def post(self, addr, data): - addr = self.address + addr - json_data = json.dumps(data, indent=2) - logger.debug("Making POST request to: %s\n%s", addr, json_data) - req = requests.Request("POST", addr, data=json_data) - prepared = self.session.prepare_request(req) - resp = self.session.send(prepared, timeout=self.api_timeout) - resp.raise_for_status() - logger.debug("Response: %s", resp.content) - return resp.json() - - def get_task_data(self, task): - return self.get("api/task/" + task + "/summary.json") - - def new_job( - self, task, person, tank, target_host, target_port, loadscheme, - detailed_time, notify_list): - data = { - 'task': task, - 'person': person, - 'tank': tank, - 'host': target_host, - 'port': target_port, - 'loadscheme': loadscheme, - 'detailed_time': detailed_time, - 'notify': notify_list, - } - - logger.debug("Job create request: %s", data) - while True: - try: - response = self.post( - "api/job/create.json?api_token=" + self.api_token, data) - self.upload_token = response[0].get('upload_token', '') - return response[0]['job'] - except requests.exceptions.HTTPError as ex: - logger.debug("Got error for job create request: %s", ex) - if ex.response.status_code == 423: - logger.warn( - "Overload is under maintenance, will retry in 5s...") - time.sleep(5) - else: - raise ex - - raise RuntimeError("Unreachable point hit") - - def get_job_summary(self, jobno): - result = self.get( - 'api/job/' + str(jobno) + "/summary.json?api_token=" + - self.api_token) - return result[0] - - def close_job(self, jobno, retcode): - params = { - 'exitcode': str(retcode), - 'api_token': self.api_token, - } - - result = self.get( - 'api/job/' + str(jobno) + '/close.json?' + urllib.urlencode(params)) - return result[0]['success'] - - def edit_job_metainfo( - self, jobno, job_name, job_dsc, instances, ammo_path, loop_count, - version_tested, is_regression, component, tank_type, cmdline, - is_starred): - data = { - 'name': job_name, - 'description': job_dsc, - 'instances': str(instances), - 'ammo': ammo_path, - 'loop': loop_count, - 'version': version_tested, - 'regression': str(is_regression), - 'component': component, - 'tank_type': int(tank_type), - 'command_line': cmdline, - 'starred': int(is_starred), - } - - response = self.post( - 'api/job/' + str(jobno) + "/edit.json?api_token=" + self.api_token, - data) - return response - - def set_imbalance_and_dsc(self, jobno, rps, comment): - data = {} - if rps: - data['imbalance'] = rps - if comment: - data['description'] = comment.strip() - - response = self.post( - 'api/job/' + str(jobno) + "/set_imbalance.json?api_token=" + - self.api_token, data) - return response - - def second_data_to_push_item(self, data, stat, timestamp, overall, case): - """ - @data: SecondAggregateDataItem - """ - api_data = { - 'overall': overall, - 'case': case, - 'net_codes': [], - 'http_codes': [], - 'time_intervals': [], - 'trail': { - 'time': str(timestamp), - 'reqps': stat["metrics"]["reqps"], - 'resps': data["interval_real"]["len"], - 'expect': data["interval_real"]["total"] / 1000.0 / - data["interval_real"]["len"], - 'disper': 0, - 'self_load': - 0, # TODO abs(round(100 - float(data.selfload), 2)), - 'input': data["size_in"]["total"], - 'output': data["size_out"]["total"], - 'connect_time': data["connect_time"]["total"] / 1000.0 / - data["connect_time"]["len"], - 'send_time': - data["send_time"]["total"] / 1000.0 / data["send_time"]["len"], - 'latency': - data["latency"]["total"] / 1000.0 / data["latency"]["len"], - 'receive_time': data["receive_time"]["total"] / 1000.0 / - data["receive_time"]["len"], - 'threads': stat["metrics"]["instances"], # TODO - } - } - - for q, value in zip( - data["interval_real"]["q"]["q"], - data["interval_real"]["q"]["value"]): - api_data['trail']['q' + str(q)] = value / 1000.0 - - for code, cnt in data["net_code"]["count"].iteritems(): - api_data['net_codes'].append({'code': int(code), 'count': int(cnt)}) - - for code, cnt in data["proto_code"]["count"].iteritems(): - api_data['http_codes'].append({ - 'code': int(code), - 'count': int(cnt) - }) - - api_data['time_intervals'] = self.convert_hist( - data["interval_real"]["hist"]) - return api_data - - def convert_hist(self, hist): - data = hist['data'] - bins = hist['bins'] - return [ - { - "from": 0, # deprecated - "to": b / 1000.0, - "count": count, - } for b, count in zip(bins, data) - ] - - def push_test_data(self, jobno, data_item, stat_item): - items = [] - uri = 'api/job/{0}/push_data.json?upload_token={1}'.format( - jobno, self.upload_token) - ts = datetime.datetime.fromtimestamp(data_item["ts"]) - for case_name, case_data in data_item["tagged"].iteritems(): - if case_name == "": - case_name = "__EMPTY__" - if (len(case_name)) > 128: - raise RuntimeError('tag (case) name is too long: ' + case_name) - push_item = self.second_data_to_push_item( - case_data, stat_item, ts, 0, case_name) - items.append(push_item) - overall = self.second_data_to_push_item( - data_item["overall"], stat_item, ts, 1, '') - items.append(overall) - - while True: - try: - res = self.post(uri, items) - break - except requests.exceptions.HTTPError as ex: - if ex.response.status_code == 400: - logger.error('Bad request to %s: %s', uri, ex) - return 0 - elif ex.response.status_code == 410: - logger.info("Test has been stopped by Overload server") - return 0 - else: - logger.warn( - "Unknown HTTP error while sending second data. " - "Retry in 10 sec: %s", ex) - time.sleep(10) # FIXME this makes all plugins freeze - except requests.exceptions.RequestException as ex: - logger.warn( - "Failed to push second data to API," - " retry in 10 sec: %s", ex) - time.sleep(10) # FIXME this makes all plugins freeze - except Exception: # pylint: disable=W0703 - # something nasty happened, but we don't want to fail here - logger.exception( - "Unknown exception while pushing second data to API") - return 0 - try: - success = int(res[0]['success']) - except Exception: # pylint: disable=W0703 - logger.warning("Malformed response from API: %s", res) - success = 0 - return success - - def push_monitoring_data(self, jobno, send_data): - if send_data: - addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % ( - jobno, self.upload_token) - while True: - try: - self.post_raw(addr, send_data) - return - except requests.exceptions.HTTPError as ex: - if ex.response.status_code == 400: - logger.error('Bad request to %s: %s', addr, ex) - break - elif ex.response.status_code == 410: - logger.info("Test has been stopped by Overload server") - return - else: - logger.warning( - 'Unknown http code while sending monitoring data,' - ' retry in 10s: %s', ex) - time.sleep(10) # FIXME this makes all plugins freeze - except requests.exceptions.RequestException as ex: - logger.warning( - 'Problems sending monitoring data,' - ' retry in 10s: %s', ex) - time.sleep(10) # FIXME this makes all plugins freeze - except Exception: # pylint: disable=W0703 - # something irrecoverable happened - logger.exception( - "Unknown exception while pushing monitoring data to API") - return - - def send_console(self, jobno, console): - logger.debug( - "Sending console view [%s]: %s", len(console), console[:64]) - addr = ("api/job/%s/console.txt?api_token=" % jobno) + self.api_token, - self.post_raw(addr, {"console": console, }) - - def send_config_snapshot(self, jobno, config): - logger.debug("Sending config snapshot") - addr = ("api/job/%s/configinfo.txt?api_token=" % jobno) + self.api_token - self.post_raw(addr, {"configinfo": config, }) diff --git a/yandextank/plugins/Overload/plugin.py b/yandextank/plugins/Overload/plugin.py deleted file mode 100644 index e8e9d17..0000000 --- a/yandextank/plugins/Overload/plugin.py +++ /dev/null @@ -1,374 +0,0 @@ -# TODO: make the next two lines unnecessary -# pylint: disable=line-too-long -# pylint: disable=missing-docstring -import StringIO -import copy -import json -import logging -import os -import pwd -import socket -import sys - -from .client import OverloadClient -from ..Autostop import Plugin as AutostopPlugin -from ..Console import Plugin as ConsolePlugin -from ..JMeter import Plugin as JMeterPlugin -from ..Monitoring import Plugin as MonitoringPlugin -# from ..Pandora import Plugin as PandoraPlugin -from ..Phantom import Plugin as PhantomPlugin -from ...common.interfaces import AbstractPlugin,\ - MonitoringDataListener, AggregateResultListener, AbstractInfoWidget - -logger = logging.getLogger(__name__) # pylint: disable=C0103 - - -class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener): - """ - Yandex Overload analytics service client (https://overload.yandex.net) - """ - SECTION = 'overload' - RC_STOP_FROM_WEB = 8 - - def __init__(self, core, config_section): - super(Plugin, self).__init__(core, config_section) - self.locks_list_dict = {} - self.api_client = OverloadClient() - self.jobno = None - self.operator = '' - self.retcode = -1 - self.copy_config = None - self.jobno_file = None - self.target = None - self.lock_target_duration = None - self.locks_list_cfg = None - self.task = None - self.job_name = None - self.job_dsc = None - self.notify_list = None - self.version_tested = None - self.regression_component = None - self.is_regression = None - self.ignore_target_lock = None - self.port = None - self.mon = None - - @staticmethod - def get_key(): - return __file__ - - def get_available_options(self): - opts = [ - "api_address", - "task", - "job_name", - "job_dsc", - "notify", - "ver", - ] - opts += [ - "component", - "regress", - "operator", - "copy_config_to", - "jobno_file", - ] - opts += ["token_file"] - return opts - - @staticmethod - def read_token(filename): - if filename: - logger.debug("Trying to read token from %s", filename) - try: - with open(filename, 'r') as handle: - data = handle.read().strip() - logger.info( - "Read authentication token from %s, " - "token length is %d bytes", filename, len(str(data))) - except IOError: - logger.error( - "Failed to read Overload API token from %s", filename) - logger.info( - "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter" - ) - raise RuntimeError("API token error") - return data - else: - logger.error("Overload API token filename is not defined") - logger.info( - "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter" - ) - raise RuntimeError("API token error") - - def configure(self): - aggregator = self.core.job.aggregator_plugin - aggregator.add_result_listener(self) - - self.api_client.set_api_address(self.get_option("api_address")) - self.api_client.set_api_timeout(self.get_option("api_timeout", 30)) - self.api_client.set_api_token( - self.read_token(self.get_option("token_file", ""))) - self.task = self.get_option("task", "DEFAULT") - self.job_name = unicode( - self.get_option("job_name", "none").decode("utf8")) - if self.job_name == "ask" and sys.stdin.isatty(): - self.job_name = unicode( - raw_input("Please, enter job_name: ").decode("utf8")) - self.job_dsc = unicode(self.get_option("job_dsc", "").decode("utf8")) - if self.job_dsc == "ask" and sys.stdin.isatty(): - self.job_dsc = unicode( - raw_input("Please, enter job_dsc: ").decode("utf8")) - self.notify_list = self.get_option("notify", "").split(" ") - self.version_tested = unicode(self.get_option("ver", "")) - self.regression_component = unicode(self.get_option("component", "")) - self.is_regression = self.get_option("regress", "0") - self.operator = self.get_option("operator", self.operator) - if not self.operator: - try: - # Clouds and some virtual envs may fail this - self.operator = pwd.getpwuid(os.geteuid())[0] - except: - logger.warning('failed to getpwuid.', exc_into=True) - self.operator = 'unknown' - self.copy_config = self.get_option("copy_config_to", "") - self.jobno_file = self.get_option("jobno_file", "") - - if self.core.job.monitoring_plugin: - self.mon = self.core.job.monitoring_plugin - if self.mon.monitoring: - self.mon.monitoring.add_listener(self) - - self.__save_conf() - - def prepare_test(self): - try: - console = self.core.get_plugin_of_type(ConsolePlugin) - except KeyError: - logger.debug("Console plugin not found", exc_info=True) - console = None - - if console: - console.add_info_widget(JobInfoWidget(self)) - console.remote_translator = self - - try: - phantom = self.core.get_plugin_of_type(PhantomPlugin) - info = phantom.get_info() - self.target = info.address - except (KeyError, AttributeError) as ex: - logger.debug("No phantom plugin to get target info: %s", ex) - self.target = socket.getfqdn() - - self.__save_conf() - - def start_test(self): - try: - phantom = self.core.get_plugin_of_type(PhantomPlugin) - info = phantom.get_info() - self.target = info.address - port = info.port - instances = info.instances - tank_type = 1 if info.tank_type == "http" else 2 - # FIXME why don't we use resource_opener here? - if info.ammo_file.startswith( - "http://") or info.ammo_file.startswith("https://"): - ammo_path = info.ammo_file - else: - ammo_path = os.path.realpath(info.ammo_file) - loadscheme = [] if isinstance( - info.rps_schedule, unicode) else info.rps_schedule - loop_count = info.loop_count - except (KeyError, AttributeError) as ex: - logger.debug("No phantom plugin to get target info: %s", ex) - self.target = socket.getfqdn() - port = 80 - instances = 1 - tank_type = 1 - ammo_path = '' - loadscheme = [] - loop_count = 0 - - try: - jmeter = self.core.get_plugin_of_type(JMeterPlugin) - ammo_path = jmeter.original_jmx - except KeyError as ex: - logger.debug("No jmeter plugin to get info: %s", ex) - - # try: - # pandora = self.core.get_plugin_of_type(PandoraPlugin) - # # TODO: get info from Pandora here - # except KeyError as ex: - # logger.debug("No pandora plugin to get info: %s", ex) - - detailed_field = "interval_real" - - logger.info("Detected target: %s", self.target) - - self.jobno = self.api_client.new_job( - self.task, self.operator, - socket.getfqdn(), self.target, port, loadscheme, detailed_field, - self.notify_list) - web_link = "%s%s" % (self.api_client.address, self.jobno) - logger.info("Web link: %s", web_link) - self.publish("jobno", self.jobno) - self.publish("web_link", web_link) - self.make_symlink(self.jobno) - self.set_option("jobno", str(self.jobno)) - if self.jobno_file: - logger.debug("Saving jobno to: %s", self.jobno_file) - fdes = open(self.jobno_file, 'w') - fdes.write(str(self.jobno)) - fdes.close() - - self.api_client.edit_job_metainfo( - self.jobno, self.job_name, self.job_dsc, instances, ammo_path, - loop_count, self.version_tested, self.is_regression, - self.regression_component, tank_type, " ".join(sys.argv), 0) - - self.__save_conf() - - def is_test_finished(self): - return self.retcode - - def end_test(self, retcode): - self.__save_conf() - return retcode - - def post_process(self, rc): - if self.jobno: - try: - self.api_client.close_job(self.jobno, rc) - except Exception: # pylint: disable=W0703 - logger.warning("Failed to close job", exc_info=True) - - logger.info("Web link: %s%s", self.api_client.address, self.jobno) - - autostop = None - try: - autostop = self.core.get_plugin_of_type(AutostopPlugin) - except KeyError: - logger.debug("No autostop plugin loaded", exc_info=True) - - if autostop and autostop.cause_criterion: - rps = 0 - if autostop.cause_criterion.cause_second: - rps = autostop.cause_criterion.cause_second[1]["metrics"][ - "reqps"] - if not rps: - rps = autostop.cause_criterion.cause_second[0][ - "overall"]["interval_real"]["len"] - self.api_client.set_imbalance_and_dsc( - self.jobno, rps, autostop.cause_criterion.explain()) - - else: - logger.debug("No autostop cause detected") - self.__save_conf() - return rc - - def __send_data(self, data_item, stat_item): - if self.retcode < 0 and not self.api_client.push_test_data( - self.jobno, data_item, stat_item): - logger.warn("The test was stopped from Web interface") - self.retcode = self.RC_STOP_FROM_WEB - - def on_aggregated_data(self, data, stats): - """ - @data: aggregated data - @stats: stats about gun - """ - if not self.jobno: - logger.warning("No jobNo gained yet") - return - self.__send_data(data, stats) - - def monitoring_data(self, data_list): - if not self.jobno: - logger.debug("No jobNo gained yet") - return - - if self.retcode < 0: - if "Telegraf" in self.core.job.monitoring_plugin.__module__: - self.api_client.push_monitoring_data( - self.jobno, json.dumps(data_list)) - elif "Monitoring" in self.core.job.monitoring_plugin.__module__: - [ - self.api_client.push_monitoring_data(self.jobno, data) - for data in data_list if data - ] - else: - logger.warn("The test was stopped from Web interface") - - def __save_conf(self): - if self.copy_config: - self.core.config.flush(self.copy_config) - - config = copy.copy(self.core.config.config) - - try: - mon = self.core.get_plugin_of_type(MonitoringPlugin) - config_filename = mon.config - if config_filename and config_filename not in ['none', 'auto']: - with open(config_filename) as config_file: - config.set( - MonitoringPlugin.SECTION, "config_contents", - config_file.read()) - except Exception: # pylint: disable=W0703 - logger.debug("Can't get monitoring config", exc_info=True) - - output = StringIO.StringIO() - config.write(output) - if self.jobno: - try: - self.api_client.send_config_snapshot( - self.jobno, output.getvalue()) - except Exception: # pylint: disable=W0703 - logger.debug("Can't send config snapshot: %s", exc_info=True) - - def send_console(self, text): - try: - self.api_client.send_console(self.jobno, text) - except Exception: # pylint: disable=W0703 - logger.debug("Can't send console snapshot: %s", exc_info=True) - - def make_symlink(self, name): - PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, self.SECTION) - if not os.path.exists(PLUGIN_DIR): - os.makedirs(PLUGIN_DIR) - os.symlink(self.core.artifacts_dir, os.path.join(PLUGIN_DIR, str(name))) - - def _core_with_tank_api(self): - """ - Return True if we are running under Tank API - """ - api_found = False - try: - import yandex_tank_api.worker # pylint: disable=F0401 - except ImportError: - logger.debug("Attempt to import yandex_tank_api.worker failed") - else: - api_found = isinstance(self.core, yandex_tank_api.worker.TankCore) - logger.debug( - "We are%s running under API server", "" - if api_found else " likely not") - return api_found - - -class JobInfoWidget(AbstractInfoWidget): - def __init__(self, sender): - AbstractInfoWidget.__init__(self) - self.owner = sender - - def get_index(self): - return 1 - - def render(self, screen): - template = "Author: " + screen.markup.RED + "%s" + \ - screen.markup.RESET + \ - "%s\n Job: %s %s\n Web: %s%s" - data = ( - self.owner.operator[:1], self.owner.operator[1:], self.owner.jobno, - self.owner.job_name, self.owner.api_client.address, - self.owner.jobno) - - return template % data