merged master

This commit is contained in:
Arseniy Fomchenko 2016-10-25 13:45:00 +03:00
commit b2ece72045
11 changed files with 109 additions and 68 deletions

View File

@ -9,5 +9,5 @@ class PluginNotPrepared(Exception):
"""
Can't find plugin's info in core.job
"""
def __init__(self, msg):
def __init__(self, msg=None):
self.message = "%s\n%s" % (self.__doc__, msg)

View File

@ -1,4 +1,5 @@
import logging
import os
class AbstractPlugin(object):
@ -172,3 +173,27 @@ class AbstractCriterion(object):
def get_type_string():
""" returns string that used as config name for criterion """
raise NotImplementedError("Abstract methods requires overriding")
class GeneratorPlugin(object):
DEFAULT_INFO = {'target': 'undefined',
'port': 80,
'instances': 1,
'ammo_file': '',
'rps_schedule': [],
'duration': 0,
'loop_count': 0}
class Info(object):
def __init__(self, target, port, instances, ammo_file, rps_schedule, duration, loop_count):
self.target = target
self.port = port
self.instances = instances
self.ammo_file = ammo_file
self.rps_schedule = rps_schedule
self.duration = duration
self.loop_count = loop_count
def get_info(self):
# type: () -> Info
return self.Info(**self.DEFAULT_INFO)

View File

@ -16,6 +16,9 @@ from builtins import str
from configparser import NoSectionError
from yandextank.common.exceptions import PluginNotPrepared
from configparser import NoSectionError
from yandextank.common.exceptions import PluginNotPrepared
from yandextank.common.interfaces import GeneratorPlugin
from ..common.util import update_status, execute, pid_exists
@ -26,8 +29,9 @@ from ..plugins.Telegraf import Plugin as TelegrafPlugin
class Job(object):
def __init__(self, name, description, task, version, config_copy, monitoring_plugin, aggregator_plugin, tank):
# type: (str, str, str, str, str, MonitoringPlugin, AggregatorPlugin, str) -> Job
def __init__(self, name, description, task, version, config_copy, monitoring_plugin, aggregator_plugin, tank,
generator_plugin=None):
# type: (str, str, str, str, str, MonitoringPlugin, AggregatorPlugin, GeneratorPlugin) -> Job
self.name = name
self.description = description
self.task = task
@ -37,10 +41,11 @@ class Job(object):
self.aggregator_plugin = aggregator_plugin
self.tank = tank
self._phantom_info = None
self.generator_plugin = generator_plugin
def subscribe_plugin(self, plugin):
self.aggregator_plugin.add_result_listener(plugin)
self.monitoring_plugin.add_listener(plugin)
self.monitoring_plugin.monitoring.add_listener(plugin)
@property
def phantom_info(self):
@ -69,7 +74,8 @@ class TankCore(object):
self.config = ConfigManager()
self.status = {}
self.plugins = []
self.artifacts_dir = None
self.artifacts_dir_name = None
self._artifacts_dir = None
self.artifact_files = {}
self.artifacts_base_dir = '.'
self.manual_start = False
@ -118,7 +124,7 @@ class TankCore(object):
base_dir = self.get_option(self.SECTION, "artifacts_base_dir",
self.artifacts_base_dir)
self.artifacts_base_dir = os.path.expanduser(base_dir)
self.artifacts_dir = self.get_option(self.SECTION, "artifacts_dir", "")
self.artifacts_dir_name = self.get_option(self.SECTION, "artifacts_dir", "")
self.taskset_path = self.get_option(self.SECTION, 'taskset_path',
'taskset')
self.taskset_affinity = self.get_option(self.SECTION, 'affinity', '')
@ -138,7 +144,7 @@ class TankCore(object):
plugin_path)
if plugin_path.startswith("Tank/Plugins/"):
plugin_path = "yandextank.plugins." + \
plugin_path.split('/')[-1].split('.')[0]
plugin_path.split('/')[-1].split('.')[0]
self.log.warning("Converted plugin path to %s",
plugin_path)
else:
@ -172,13 +178,13 @@ class TankCore(object):
try:
mon = self.get_plugin_of_type(TelegrafPlugin)
except KeyError:
self.log.debug("Telegraf plugin not found:", exc_info=True)
self.log.debug("Telegraf plugin not found:", exc_info=True)
try:
mon = self.get_plugin_of_type(MonitoringPlugin)
except KeyError:
self.log.debug("Monitoring plugin not found:", exc_info=True)
mon = None
# aggregator plugin
try:
aggregator = self.get_plugin_of_type(AggregatorPlugin)
@ -186,6 +192,13 @@ class TankCore(object):
self.log.warning("Aggregator plugin not found:", exc_info=True)
aggregator = None
# generator plugin
try:
gen = self.get_plugin_of_type(GeneratorPlugin)
except KeyError:
self.log.warning("Load generator not found:", exc_info=True)
gen = None
self.job = Job(name=str(self.get_option(self.SECTION_META, "job_name", 'none').decode('utf8')),
description=str(self.get_option(self.SECTION_META, "job_dsc", '').decode('utf8')),
task=str(self.get_option(self.SECTION_META, 'task', 'dir')),
@ -193,6 +206,7 @@ class TankCore(object):
config_copy=self.get_option(self.SECTION_META, 'copy_config_to', 'config_copy'),
monitoring_plugin=mon,
aggregator_plugin=aggregator,
generator_plugin=gen,
tank=socket.getfqdn())
for plugin in self.plugins:
@ -316,18 +330,6 @@ class TankCore(object):
def __collect_artifacts(self):
self.log.debug("Collecting artifacts")
if not self.artifacts_dir:
date_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.")
self.artifacts_dir = tempfile.mkdtemp("", date_str,
self.artifacts_base_dir)
else:
self.artifacts_dir = os.path.expanduser(self.artifacts_dir)
if not os.path.isdir(self.artifacts_dir):
os.makedirs(self.artifacts_dir)
os.chmod(self.artifacts_dir, 0o755)
self.log.info("Artifacts dir: %s", self.artifacts_dir)
for filename, keep in self.artifact_files.items():
try:
@ -398,10 +400,6 @@ class TankCore(object):
"""
Move or copy single file to artifacts dir
"""
if not self.artifacts_dir:
self.log.warning("No artifacts dir configured")
return
dest = self.artifacts_dir + '/' + os.path.basename(filename)
self.log.debug("Collecting file: %s to %s", filename, dest)
if not filename or not os.path.exists(filename):
@ -434,7 +432,7 @@ class TankCore(object):
try:
section = option_str[:option_str.index('.')]
option = option_str[
option_str.index('.') + 1:option_str.index('=')]
option_str.index('.') + 1:option_str.index('=')]
except ValueError:
section = default_section
option = option_str[:option_str.index('=')]
@ -525,6 +523,20 @@ class TankCore(object):
self.log.debug("Failed closing plugin: %s",
traceback.format_exc(ex))
@property
def artifacts_dir(self):
if not self.artifacts_dir_name:
date_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.")
self.artifacts_dir_name = tempfile.mkdtemp("", date_str,
self.artifacts_base_dir)
if not self._artifacts_dir:
self._artifacts_dir = os.path.expanduser(self.artifacts_dir_name)
if not os.path.isdir(self.artifacts_dir):
os.makedirs(self.artifacts_dir)
os.chmod(self._artifacts_dir, 0o755)
self._artifacts_dir = os.path.abspath(self._artifacts_dir)
return self._artifacts_dir
class ConfigManager(object):
""" Option storage class """

View File

@ -2,7 +2,7 @@ import logging
import time
import pip
from ...common.interfaces import AbstractPlugin
from ...common.interfaces import AbstractPlugin, GeneratorPlugin
from .guns import LogGun, SqlGun, CustomGun, HttpGun, ScenarioGun, UltimateGun
from .reader import BfgReader, BfgStatsReader
@ -13,7 +13,7 @@ from ..Console import Plugin as ConsolePlugin
from ...stepper import StepperWrapper
class Plugin(AbstractPlugin):
class Plugin(AbstractPlugin, GeneratorPlugin):
''' Big Fucking Gun plugin '''
SECTION = 'bfg'

View File

@ -8,7 +8,7 @@ import time
from pkg_resources import resource_string
from ...common.util import splitstring
from ...common.interfaces import AbstractPlugin, AggregateResultListener, AbstractInfoWidget
from ...common.interfaces import AbstractPlugin, AggregateResultListener, AbstractInfoWidget, GeneratorPlugin
from .reader import JMeterReader
from ..Aggregator import Plugin as AggregatorPlugin
@ -18,7 +18,7 @@ from ..Console import screen as ConsoleScreen
logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin):
class Plugin(AbstractPlugin, GeneratorPlugin):
""" JMeter tank plugin """
SECTION = 'jmeter'

View File

@ -21,24 +21,25 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
return []
def configure(self):
try:
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
except KeyError:
logger.debug("Aggregator plugin not found", exc_info=True)
else:
aggregator.add_result_listener(self)
try:
self.mon = self.core.get_plugin_of_type(TelegrafPlugin)
except KeyError:
logger.debug("Telegraf plugin not found:", exc_info=True)
try:
self.mon = self.core.get_plugin_of_type(MonitoringPlugin)
except KeyError:
logger.debug("Monitoring plugin not found:", exc_info=True)
if self.mon and self.mon.monitoring:
self.mon.monitoring.add_listener(self)
# try:
# aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
# except KeyError:
# logger.debug("Aggregator plugin not found", exc_info=True)
# else:
# aggregator.add_result_listener(self)
#
# try:
# self.mon = self.core.get_plugin_of_type(TelegrafPlugin)
# except KeyError:
# logger.debug("Telegraf plugin not found:", exc_info=True)
# try:
# self.mon = self.core.get_plugin_of_type(MonitoringPlugin)
# except KeyError:
# logger.debug("Monitoring plugin not found:", exc_info=True)
#
# if self.mon and self.mon.monitoring:
# self.mon.monitoring.add_listener(self)
self.core.job.subscribe_plugin(self)
def on_aggregated_data(self, data, stats):
"""

View File

@ -10,8 +10,6 @@ import pwd
import socket
import sys
from ...common.interfaces import AbstractPlugin, MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
from .client import OverloadClient
from ..Aggregator import Plugin as AggregatorPlugin
from ..Autostop import Plugin as AutostopPlugin
@ -21,6 +19,7 @@ from ..Monitoring import Plugin as MonitoringPlugin
from ..Pandora import Plugin as PandoraPlugin
from ..Phantom import Plugin as PhantomPlugin
from ..Telegraf import Plugin as TelegrafPlugin
from ...common.interfaces import AbstractPlugin, MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
logger = logging.getLogger(__name__) # pylint: disable=C0103
@ -229,9 +228,9 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
logger.info("Web link: %s", web_link)
self.publish("jobno", self.jobno)
self.publish("web_link", web_link)
if not self._core_with_tank_api():
self.core.artifacts_dir = self.core.artifacts_base_dir + \
'/' + str(self.jobno)
self.make_symlink(self.jobno)
self.set_option("jobno", self.jobno)
if self.jobno_file:
logger.debug("Saving jobno to: %s", self.jobno_file)
@ -351,6 +350,12 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
except Exception: # pylint: disable=W0703
logger.debug("Can't send console snapshot: %s", exc_info=True)
def make_symlink(self, name):
PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, self.SECTION)
if not os.path.exists(PLUGIN_DIR):
os.makedirs(PLUGIN_DIR)
os.symlink(self.core.artifacts_dir, os.path.join(PLUGIN_DIR, str(name)))
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):

View File

@ -3,7 +3,7 @@ import logging
import subprocess
import time
from ...common.interfaces import AbstractPlugin, AbstractInfoWidget
from ...common.interfaces import AbstractPlugin, AbstractInfoWidget, GeneratorPlugin
from .config import PoolConfig, PandoraConfig, parse_schedule
from .reader import PandoraStatsReader
@ -15,7 +15,7 @@ from ..Phantom import PhantomReader
logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin):
class Plugin(AbstractPlugin, GeneratorPlugin):
''' Pandora load generator plugin '''
OPTION_CONFIG = "config"
@ -157,9 +157,6 @@ class Plugin(AbstractPlugin):
logger.debug("Seems subprocess finished OK")
return retcode
def get_info(self):
return None
class PandoraInfoWidget(AbstractInfoWidget):
''' Right panel widget '''

View File

@ -8,7 +8,7 @@ import subprocess
import time
from ...common.util import execute, expand_to_seconds
from ...common.interfaces import AbstractPlugin, AbstractCriterion
from ...common.interfaces import AbstractPlugin, AbstractCriterion, GeneratorPlugin
from .reader import PhantomReader, PhantomStatsReader
from .utils import PhantomConfig
@ -20,7 +20,7 @@ from ..Console import Plugin as ConsolePlugin
logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin):
class Plugin(AbstractPlugin, GeneratorPlugin):
""" Plugin for running phantom tool """
OPTION_CONFIG = "config"

View File

@ -40,7 +40,7 @@ class Const(object):
return int(self.duration / 1000 * self.rps)
def get_rps_list(self):
return [(int(self.rps), self.duration / 1000.)]
return [(int(self.rps), self.duration / 1000)]
def __repr__(self):
return 'const(%s, %s)' % (self.rps, self.duration / 1000)

View File

@ -11,10 +11,10 @@ class TestLine(object):
assert rps_list[-1][0] == 100
@pytest.mark.parametrize("rps, duration", [
(100, 300),
(0, 300),
(100, 0)
@pytest.mark.parametrize("rps, duration, rps_list", [
(100, 3000, [(100, 3)]),
(0, 3000, [(0, 3)]),
(100, 0, [(100, 0)])
])
class TestConst(object):
@pytest.mark.parametrize("check_point, expected", [
@ -23,11 +23,12 @@ class TestConst(object):
(lambda duration: duration + 1, lambda rps: 0),
(lambda duration: -1, lambda rps: 0)
])
def test_rps_at(self, rps, duration, check_point, expected):
def test_rps_at(self, rps, duration, rps_list, check_point, expected):
assert Const(rps, duration).rps_at(check_point(duration)) == expected(rps)
def test_get_rps_list(self, rps, duration):
assert Const(rps, duration).get_rps_list() == [(rps, duration / 1000.)]
def test_get_rps_list(self, rps, duration, rps_list):
assert Const(rps, duration).get_rps_list() == rps_list
assert isinstance(rps_list[0][1], int)
class TestLineNew(object):