diff --git a/yandextank/common/exceptions.py b/yandextank/common/exceptions.py index 5eaee83..68b4a23 100644 --- a/yandextank/common/exceptions.py +++ b/yandextank/common/exceptions.py @@ -9,5 +9,5 @@ class PluginNotPrepared(Exception): """ Can't find plugin's info in core.job """ - def __init__(self, msg): + def __init__(self, msg=None): self.message = "%s\n%s" % (self.__doc__, msg) \ No newline at end of file diff --git a/yandextank/common/interfaces.py b/yandextank/common/interfaces.py index 6f97fad..4c988a6 100644 --- a/yandextank/common/interfaces.py +++ b/yandextank/common/interfaces.py @@ -1,4 +1,5 @@ import logging +import os class AbstractPlugin(object): @@ -172,3 +173,27 @@ class AbstractCriterion(object): def get_type_string(): """ returns string that used as config name for criterion """ raise NotImplementedError("Abstract methods requires overriding") + + +class GeneratorPlugin(object): + DEFAULT_INFO = {'target': 'undefined', + 'port': 80, + 'instances': 1, + 'ammo_file': '', + 'rps_schedule': [], + 'duration': 0, + 'loop_count': 0} + + class Info(object): + def __init__(self, target, port, instances, ammo_file, rps_schedule, duration, loop_count): + self.target = target + self.port = port + self.instances = instances + self.ammo_file = ammo_file + self.rps_schedule = rps_schedule + self.duration = duration + self.loop_count = loop_count + + def get_info(self): + # type: () -> Info + return self.Info(**self.DEFAULT_INFO) diff --git a/yandextank/core/tankcore.py b/yandextank/core/tankcore.py index 7fc3bab..9e585bc 100644 --- a/yandextank/core/tankcore.py +++ b/yandextank/core/tankcore.py @@ -16,6 +16,9 @@ from builtins import str from configparser import NoSectionError from yandextank.common.exceptions import PluginNotPrepared +from configparser import NoSectionError +from yandextank.common.exceptions import PluginNotPrepared +from yandextank.common.interfaces import GeneratorPlugin from ..common.util import update_status, execute, pid_exists @@ -26,8 +29,9 @@ from ..plugins.Telegraf import Plugin as TelegrafPlugin class Job(object): - def __init__(self, name, description, task, version, config_copy, monitoring_plugin, aggregator_plugin, tank): - # type: (str, str, str, str, str, MonitoringPlugin, AggregatorPlugin, str) -> Job + def __init__(self, name, description, task, version, config_copy, monitoring_plugin, aggregator_plugin, tank, + generator_plugin=None): + # type: (str, str, str, str, str, MonitoringPlugin, AggregatorPlugin, GeneratorPlugin) -> Job self.name = name self.description = description self.task = task @@ -37,10 +41,11 @@ class Job(object): self.aggregator_plugin = aggregator_plugin self.tank = tank self._phantom_info = None + self.generator_plugin = generator_plugin def subscribe_plugin(self, plugin): self.aggregator_plugin.add_result_listener(plugin) - self.monitoring_plugin.add_listener(plugin) + self.monitoring_plugin.monitoring.add_listener(plugin) @property def phantom_info(self): @@ -69,7 +74,8 @@ class TankCore(object): self.config = ConfigManager() self.status = {} self.plugins = [] - self.artifacts_dir = None + self.artifacts_dir_name = None + self._artifacts_dir = None self.artifact_files = {} self.artifacts_base_dir = '.' self.manual_start = False @@ -118,7 +124,7 @@ class TankCore(object): base_dir = self.get_option(self.SECTION, "artifacts_base_dir", self.artifacts_base_dir) self.artifacts_base_dir = os.path.expanduser(base_dir) - self.artifacts_dir = self.get_option(self.SECTION, "artifacts_dir", "") + self.artifacts_dir_name = self.get_option(self.SECTION, "artifacts_dir", "") self.taskset_path = self.get_option(self.SECTION, 'taskset_path', 'taskset') self.taskset_affinity = self.get_option(self.SECTION, 'affinity', '') @@ -138,7 +144,7 @@ class TankCore(object): plugin_path) if plugin_path.startswith("Tank/Plugins/"): plugin_path = "yandextank.plugins." + \ - plugin_path.split('/')[-1].split('.')[0] + plugin_path.split('/')[-1].split('.')[0] self.log.warning("Converted plugin path to %s", plugin_path) else: @@ -172,13 +178,13 @@ class TankCore(object): try: mon = self.get_plugin_of_type(TelegrafPlugin) except KeyError: - self.log.debug("Telegraf plugin not found:", exc_info=True) + self.log.debug("Telegraf plugin not found:", exc_info=True) try: mon = self.get_plugin_of_type(MonitoringPlugin) except KeyError: self.log.debug("Monitoring plugin not found:", exc_info=True) mon = None - + # aggregator plugin try: aggregator = self.get_plugin_of_type(AggregatorPlugin) @@ -186,6 +192,13 @@ class TankCore(object): self.log.warning("Aggregator plugin not found:", exc_info=True) aggregator = None + # generator plugin + try: + gen = self.get_plugin_of_type(GeneratorPlugin) + except KeyError: + self.log.warning("Load generator not found:", exc_info=True) + gen = None + self.job = Job(name=str(self.get_option(self.SECTION_META, "job_name", 'none').decode('utf8')), description=str(self.get_option(self.SECTION_META, "job_dsc", '').decode('utf8')), task=str(self.get_option(self.SECTION_META, 'task', 'dir')), @@ -193,6 +206,7 @@ class TankCore(object): config_copy=self.get_option(self.SECTION_META, 'copy_config_to', 'config_copy'), monitoring_plugin=mon, aggregator_plugin=aggregator, + generator_plugin=gen, tank=socket.getfqdn()) for plugin in self.plugins: @@ -316,18 +330,6 @@ class TankCore(object): def __collect_artifacts(self): self.log.debug("Collecting artifacts") - if not self.artifacts_dir: - date_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.") - self.artifacts_dir = tempfile.mkdtemp("", date_str, - self.artifacts_base_dir) - else: - self.artifacts_dir = os.path.expanduser(self.artifacts_dir) - - if not os.path.isdir(self.artifacts_dir): - os.makedirs(self.artifacts_dir) - - os.chmod(self.artifacts_dir, 0o755) - self.log.info("Artifacts dir: %s", self.artifacts_dir) for filename, keep in self.artifact_files.items(): try: @@ -398,10 +400,6 @@ class TankCore(object): """ Move or copy single file to artifacts dir """ - if not self.artifacts_dir: - self.log.warning("No artifacts dir configured") - return - dest = self.artifacts_dir + '/' + os.path.basename(filename) self.log.debug("Collecting file: %s to %s", filename, dest) if not filename or not os.path.exists(filename): @@ -434,7 +432,7 @@ class TankCore(object): try: section = option_str[:option_str.index('.')] option = option_str[ - option_str.index('.') + 1:option_str.index('=')] + option_str.index('.') + 1:option_str.index('=')] except ValueError: section = default_section option = option_str[:option_str.index('=')] @@ -525,6 +523,20 @@ class TankCore(object): self.log.debug("Failed closing plugin: %s", traceback.format_exc(ex)) + @property + def artifacts_dir(self): + if not self.artifacts_dir_name: + date_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.") + self.artifacts_dir_name = tempfile.mkdtemp("", date_str, + self.artifacts_base_dir) + if not self._artifacts_dir: + self._artifacts_dir = os.path.expanduser(self.artifacts_dir_name) + if not os.path.isdir(self.artifacts_dir): + os.makedirs(self.artifacts_dir) + os.chmod(self._artifacts_dir, 0o755) + self._artifacts_dir = os.path.abspath(self._artifacts_dir) + return self._artifacts_dir + class ConfigManager(object): """ Option storage class """ diff --git a/yandextank/plugins/Bfg/plugin.py b/yandextank/plugins/Bfg/plugin.py index 43f6079..103117b 100644 --- a/yandextank/plugins/Bfg/plugin.py +++ b/yandextank/plugins/Bfg/plugin.py @@ -2,7 +2,7 @@ import logging import time import pip -from ...common.interfaces import AbstractPlugin +from ...common.interfaces import AbstractPlugin, GeneratorPlugin from .guns import LogGun, SqlGun, CustomGun, HttpGun, ScenarioGun, UltimateGun from .reader import BfgReader, BfgStatsReader @@ -13,7 +13,7 @@ from ..Console import Plugin as ConsolePlugin from ...stepper import StepperWrapper -class Plugin(AbstractPlugin): +class Plugin(AbstractPlugin, GeneratorPlugin): ''' Big Fucking Gun plugin ''' SECTION = 'bfg' diff --git a/yandextank/plugins/JMeter/plugin.py b/yandextank/plugins/JMeter/plugin.py index b7e1314..880de11 100644 --- a/yandextank/plugins/JMeter/plugin.py +++ b/yandextank/plugins/JMeter/plugin.py @@ -8,7 +8,7 @@ import time from pkg_resources import resource_string from ...common.util import splitstring -from ...common.interfaces import AbstractPlugin, AggregateResultListener, AbstractInfoWidget +from ...common.interfaces import AbstractPlugin, AggregateResultListener, AbstractInfoWidget, GeneratorPlugin from .reader import JMeterReader from ..Aggregator import Plugin as AggregatorPlugin @@ -18,7 +18,7 @@ from ..Console import screen as ConsoleScreen logger = logging.getLogger(__name__) -class Plugin(AbstractPlugin): +class Plugin(AbstractPlugin, GeneratorPlugin): """ JMeter tank plugin """ SECTION = 'jmeter' diff --git a/yandextank/plugins/JsonReport/plugin.py b/yandextank/plugins/JsonReport/plugin.py index 34b9be3..d46e866 100644 --- a/yandextank/plugins/JsonReport/plugin.py +++ b/yandextank/plugins/JsonReport/plugin.py @@ -21,24 +21,25 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener): return [] def configure(self): - try: - aggregator = self.core.get_plugin_of_type(AggregatorPlugin) - except KeyError: - logger.debug("Aggregator plugin not found", exc_info=True) - else: - aggregator.add_result_listener(self) - - try: - self.mon = self.core.get_plugin_of_type(TelegrafPlugin) - except KeyError: - logger.debug("Telegraf plugin not found:", exc_info=True) - try: - self.mon = self.core.get_plugin_of_type(MonitoringPlugin) - except KeyError: - logger.debug("Monitoring plugin not found:", exc_info=True) - - if self.mon and self.mon.monitoring: - self.mon.monitoring.add_listener(self) + # try: + # aggregator = self.core.get_plugin_of_type(AggregatorPlugin) + # except KeyError: + # logger.debug("Aggregator plugin not found", exc_info=True) + # else: + # aggregator.add_result_listener(self) + # + # try: + # self.mon = self.core.get_plugin_of_type(TelegrafPlugin) + # except KeyError: + # logger.debug("Telegraf plugin not found:", exc_info=True) + # try: + # self.mon = self.core.get_plugin_of_type(MonitoringPlugin) + # except KeyError: + # logger.debug("Monitoring plugin not found:", exc_info=True) + # + # if self.mon and self.mon.monitoring: + # self.mon.monitoring.add_listener(self) + self.core.job.subscribe_plugin(self) def on_aggregated_data(self, data, stats): """ diff --git a/yandextank/plugins/Overload/plugin.py b/yandextank/plugins/Overload/plugin.py index 71e1ec7..169a752 100644 --- a/yandextank/plugins/Overload/plugin.py +++ b/yandextank/plugins/Overload/plugin.py @@ -10,8 +10,6 @@ import pwd import socket import sys -from ...common.interfaces import AbstractPlugin, MonitoringDataListener, AggregateResultListener, AbstractInfoWidget - from .client import OverloadClient from ..Aggregator import Plugin as AggregatorPlugin from ..Autostop import Plugin as AutostopPlugin @@ -21,6 +19,7 @@ from ..Monitoring import Plugin as MonitoringPlugin from ..Pandora import Plugin as PandoraPlugin from ..Phantom import Plugin as PhantomPlugin from ..Telegraf import Plugin as TelegrafPlugin +from ...common.interfaces import AbstractPlugin, MonitoringDataListener, AggregateResultListener, AbstractInfoWidget logger = logging.getLogger(__name__) # pylint: disable=C0103 @@ -229,9 +228,9 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener): logger.info("Web link: %s", web_link) self.publish("jobno", self.jobno) self.publish("web_link", web_link) - if not self._core_with_tank_api(): - self.core.artifacts_dir = self.core.artifacts_base_dir + \ - '/' + str(self.jobno) + + self.make_symlink(self.jobno) + self.set_option("jobno", self.jobno) if self.jobno_file: logger.debug("Saving jobno to: %s", self.jobno_file) @@ -351,6 +350,12 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener): except Exception: # pylint: disable=W0703 logger.debug("Can't send console snapshot: %s", exc_info=True) + def make_symlink(self, name): + PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, self.SECTION) + if not os.path.exists(PLUGIN_DIR): + os.makedirs(PLUGIN_DIR) + os.symlink(self.core.artifacts_dir, os.path.join(PLUGIN_DIR, str(name))) + class JobInfoWidget(AbstractInfoWidget): def __init__(self, sender): diff --git a/yandextank/plugins/Pandora/plugin.py b/yandextank/plugins/Pandora/plugin.py index 7424372..19d08ce 100644 --- a/yandextank/plugins/Pandora/plugin.py +++ b/yandextank/plugins/Pandora/plugin.py @@ -3,7 +3,7 @@ import logging import subprocess import time -from ...common.interfaces import AbstractPlugin, AbstractInfoWidget +from ...common.interfaces import AbstractPlugin, AbstractInfoWidget, GeneratorPlugin from .config import PoolConfig, PandoraConfig, parse_schedule from .reader import PandoraStatsReader @@ -15,7 +15,7 @@ from ..Phantom import PhantomReader logger = logging.getLogger(__name__) -class Plugin(AbstractPlugin): +class Plugin(AbstractPlugin, GeneratorPlugin): ''' Pandora load generator plugin ''' OPTION_CONFIG = "config" @@ -157,9 +157,6 @@ class Plugin(AbstractPlugin): logger.debug("Seems subprocess finished OK") return retcode - def get_info(self): - return None - class PandoraInfoWidget(AbstractInfoWidget): ''' Right panel widget ''' diff --git a/yandextank/plugins/Phantom/plugin.py b/yandextank/plugins/Phantom/plugin.py index 92dc773..79a4a8d 100644 --- a/yandextank/plugins/Phantom/plugin.py +++ b/yandextank/plugins/Phantom/plugin.py @@ -8,7 +8,7 @@ import subprocess import time from ...common.util import execute, expand_to_seconds -from ...common.interfaces import AbstractPlugin, AbstractCriterion +from ...common.interfaces import AbstractPlugin, AbstractCriterion, GeneratorPlugin from .reader import PhantomReader, PhantomStatsReader from .utils import PhantomConfig @@ -20,7 +20,7 @@ from ..Console import Plugin as ConsolePlugin logger = logging.getLogger(__name__) -class Plugin(AbstractPlugin): +class Plugin(AbstractPlugin, GeneratorPlugin): """ Plugin for running phantom tool """ OPTION_CONFIG = "config" diff --git a/yandextank/stepper/load_plan.py b/yandextank/stepper/load_plan.py index 751856f..c3e23f3 100644 --- a/yandextank/stepper/load_plan.py +++ b/yandextank/stepper/load_plan.py @@ -40,7 +40,7 @@ class Const(object): return int(self.duration / 1000 * self.rps) def get_rps_list(self): - return [(int(self.rps), self.duration / 1000.)] + return [(int(self.rps), self.duration / 1000)] def __repr__(self): return 'const(%s, %s)' % (self.rps, self.duration / 1000) diff --git a/yandextank/stepper/tests/test_load_plan.py b/yandextank/stepper/tests/test_load_plan.py index bf82d78..712794f 100644 --- a/yandextank/stepper/tests/test_load_plan.py +++ b/yandextank/stepper/tests/test_load_plan.py @@ -11,10 +11,10 @@ class TestLine(object): assert rps_list[-1][0] == 100 -@pytest.mark.parametrize("rps, duration", [ - (100, 300), - (0, 300), - (100, 0) +@pytest.mark.parametrize("rps, duration, rps_list", [ + (100, 3000, [(100, 3)]), + (0, 3000, [(0, 3)]), + (100, 0, [(100, 0)]) ]) class TestConst(object): @pytest.mark.parametrize("check_point, expected", [ @@ -23,11 +23,12 @@ class TestConst(object): (lambda duration: duration + 1, lambda rps: 0), (lambda duration: -1, lambda rps: 0) ]) - def test_rps_at(self, rps, duration, check_point, expected): + def test_rps_at(self, rps, duration, rps_list, check_point, expected): assert Const(rps, duration).rps_at(check_point(duration)) == expected(rps) - def test_get_rps_list(self, rps, duration): - assert Const(rps, duration).get_rps_list() == [(rps, duration / 1000.)] + def test_get_rps_list(self, rps, duration, rps_list): + assert Const(rps, duration).get_rps_list() == rps_list + assert isinstance(rps_list[0][1], int) class TestLineNew(object):