Merge pull request #355 from yandex/release

v1.8.34
This commit is contained in:
Alexey Lavrenuke 2017-04-10 17:53:58 +03:00 committed by GitHub
commit d0d5abd81d
41 changed files with 2280 additions and 911 deletions

View File

@ -1,4 +1,4 @@
# Yandex Tank [![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
# Yandex Tank [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](https://secure.travis-ci.org/yandex/yandex-tank.png?branch=master)](http://travis-ci.org/yandex/yandex-tank)
@ -22,7 +22,7 @@ Yandex.Tank is an extensible open source load testing tool for advanced linux us
Installation at [ReadTheDocs](http://yandextank.readthedocs.org/en/latest/install.html).
## Get help
Chat with authors and other performance specialists: [![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Chat with authors and other performance specialists: [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Documentation at [ReadTheDocs](https://yandextank.readthedocs.org/en/latest/).

View File

@ -105,6 +105,41 @@ Example:
except Exception, ex:
logger.error('Error trying to perform a test: %s', ex)
exit codes
==========
.. code-block:: json
{
"0": "completed",
"1": "interrupted_generic_interrupt",
"2": "interrupted",
"3": "interrupted_active_task_not_found ",
"4": "interrupted_no_ammo_file",
"5": "interrupted_address_not_specified",
"6": "interrupted_cpu_or_disk_overload",
"7": "interrupted_unknown_config_parameter",
"8": "interrupted_stop_via_web",
"9": "interrupted",
"11": "interrupted_job_number_error",
"12": "interrupted_phantom_error",
"13": "interrupted_job_metainfo_error",
"14": "interrupted_target_monitoring_error",
"15": "interrupted_target_info_error",
"21": "autostop_time",
"22": "autostop_http",
"23": "autostop_net",
"24": "autostop_instances",
"25": "autostop_total_time",
"26": "autostop_total_http",
"27": "autostop_total_net",
"28": "autostop_negative_http",
"29": "autostop_negative_net",
"30": "autostop_http_trend",
"31": "autostop_metric_higher",
"32": "autostop_metric_lower"
}
***************
Load Generators
***************
@ -579,6 +614,24 @@ How it works
.. image:: ./pic/tank-bfg.png
BFG Worker Type
-----------
By default, BFG will create lots of processes (number is defined by ``instances`` option).
Every process will execute requests in a single thread. These processes will comsume a lot of memory.
It's also possible to switch this behavior and use ``gevent`` to power up every worker process,
allowing it to have multiple concurrent threads executing HTTP requests.
With green worker, it's recommended to set ``instances`` to number of CPU cores,
and adjust the number of real threads by ``green_threads_per_instance`` option.
INI file section: **[bfg]**
:worker_type:
Set it to ``green`` to let every process have multiple concurrent green threads.
:green_threads_per_instance:
Number of green threads every worker process will execute. Only affects ``green`` worker type.
BFG Options
-----------
@ -673,40 +726,40 @@ Disable phantom first (unless you really want to keep it active alongside at you
; Pandora config section:
[pandora]
; ammo file name
ammo=ammo.jsonline
; Pandora executable path
pandora_cmd=/usr/bin/pandora
; loop limit
loop=1000
; Enable/disable expvar monitoring
expvar = 1 ; default
; each user will maintain this schedule
user_schedule = periodic(1, 1, 100)
; Pandora config contents (json)
config_content = {
"pools": [
{
"name": "dummy pool",
"gun": {"type": "log"},
"ammo": {
"type": "dummy/log",
"AmmoLimit": 10000000
},
"result": {
"type": "log/phout",
"destination": "./phout.log"
},
"shared-limits": false,
"user-limiter": {
"type": "unlimited"
},
"startup-limiter": {
"type": "periodic",
"batch": 1,
"max": 5,
"period": "0.5s"
}
}]}
; users are started using this schedule
startup_schedule = periodic(1, 1, 100)
; if shared_schedule is false, then each user is independent,
; in other case they all hold to a common schedule
shared_schedule = 0
; target host and port
target=localhost:3000
Ammo format
-----------
Pandora currently supports only one ammo format: ``jsonline``, i.e. one json doc per line.
Example:
::
{"uri": "/00", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"uri": "/01", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"tag": "mytag", "uri": "/02", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"uri": "/03", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
Each json doc describes an HTTP request. Some of them may have a tag field, it will be used as other tags in other ammo formats.
; OR config file (yaml or json)
config_file = pandora_config.yml
Schedules
---------
@ -786,7 +839,7 @@ Example:
::
[tank]
; plugin is disabled by default, enable it:
plugin_overload=yandextank.plugins.Overload
plugin_uploader=yandextank.plugins.DataUploader overload
[overload]
token_file=token.txt

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='yandextank',
version='1.8.33',
version='1.8.34',
description='a performance measurement tool',
longer_description='''
Yandex.Tank is a performance measurement and load testing automatization tool.
@ -60,5 +60,6 @@ analytic tools for the results they produce.
'yandextank.plugins.Pandora': ['config/*'],
'yandextank.plugins.Aggregator': ['config/*'],
'yandextank.plugins.Telegraf': ['config/*'],
'yandextank.plugins.Android': ['binary/*'],
},
use_2to3=False, )

View File

@ -13,13 +13,15 @@ class AbstractPlugin(object):
should point to __file__ magic constant """
raise TypeError("Abstract method needs to be overridden")
def __init__(self, core):
def __init__(self, core, config_section):
"""
@type core: TankCore
"""
self.log = logging.getLogger(__name__)
self.core = core
if config_section:
self.SECTION = config_section
def configure(self):
""" A stage to read config values and instantiate objects """

View File

@ -25,6 +25,7 @@ from ..common.resource import manager as resource
from ..plugins.Aggregator import Plugin as AggregatorPlugin
from ..plugins.Monitoring import Plugin as MonitoringPlugin
from ..plugins.Telegraf import Plugin as TelegrafPlugin
if sys.version_info[0] < 3:
import ConfigParser
else:
@ -36,22 +37,12 @@ logger = logging.getLogger(__name__)
class Job(object):
def __init__(
self,
name,
description,
task,
version,
config_copy,
monitoring_plugin,
aggregator_plugin,
tank,
generator_plugin=None):
# type: (unicode, unicode, unicode, unicode, unicode, MonitoringPlugin,
# AggregatorPlugin, GeneratorPlugin) -> Job
self.name = name
self.description = description
self.task = task
self.version = version
self.config_copy = config_copy
self.monitoring_plugin = monitoring_plugin
self.aggregator_plugin = aggregator_plugin
self.tank = tank
@ -76,6 +67,14 @@ class Job(object):
self._phantom_info = info
def parse_plugin(s):
try:
plugin, config_section = s.split()
except ValueError:
plugin, config_section = s, None
return plugin, config_section
class TankCore(object):
"""
JMeter + dstat inspired :)
@ -90,7 +89,7 @@ class TankCore(object):
def __init__(self, artifacts_base_dir=None, artifacts_dir_name=None):
self.config = ConfigManager()
self.status = {}
self.plugins = []
self.plugins = {}
self.artifacts_dir_name = artifacts_dir_name
self._artifacts_dir = None
self.artifact_files = {}
@ -153,7 +152,9 @@ class TankCore(object):
self.taskset_affinity = self.get_option(self.SECTION, 'affinity', '')
options = self.config.get_options(self.SECTION, self.PLUGIN_PREFIX)
for (plugin_name, plugin_path) in options:
for (plugin_name, plugin) in options:
plugin_path, config_section = parse_plugin(plugin)
if not plugin_path:
logger.debug("Seems the plugin '%s' was disabled", plugin_name)
continue
@ -172,6 +173,12 @@ class TankCore(object):
raise ValueError(
"Couldn't convert plugin path to new format:\n %s" %
plugin_path)
if plugin_path is "yandextank.plugins.Overload":
logger.warning(
"Deprecated plugin name: 'yandextank.plugins.Overload'\n"
"There is a new generic plugin now.\n"
"Correcting to 'yandextank.plugins.DataUploader overload'")
plugin_path = "yandextank.plugins.DataUploader overload"
try:
plugin = il.import_module(plugin_path)
except ImportError:
@ -197,7 +204,7 @@ class TankCore(object):
logger.warning("Patched plugin path: %s", plugin_path)
plugin = il.import_module(plugin_path)
try:
instance = getattr(plugin, 'Plugin')(self)
instance = getattr(plugin, 'Plugin')(self, config_section)
except:
logger.warning(
"Deprecated plugin classname: %s. Should be 'Plugin'",
@ -205,7 +212,7 @@ class TankCore(object):
instance = getattr(
plugin, plugin_path.split('.')[-1] + 'Plugin')(self)
self.plugins.append(instance)
self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)
logger.debug("Plugin instances: %s", self.plugins)
@ -246,23 +253,12 @@ class TankCore(object):
logger.warning("Load generator not found:", exc_info=True)
gen = None
self.job = Job(
name=self.get_option(self.SECTION_META, "job_name",
'none').decode('utf8'),
description=self.get_option(self.SECTION_META, "job_dsc",
'').decode('utf8'),
task=self.get_option(self.SECTION_META, 'task',
'dir').decode('utf8'),
version=self.get_option(self.SECTION_META, 'ver',
'').decode('utf8'),
config_copy=self.get_option(
self.SECTION_META, 'copy_config_to', 'config_copy'),
monitoring_plugin=mon,
aggregator_plugin=aggregator,
generator_plugin=gen,
tank=socket.getfqdn())
self.job = Job(monitoring_plugin=mon,
aggregator_plugin=aggregator,
generator_plugin=gen,
tank=socket.getfqdn())
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Configuring %s", plugin)
plugin.configure()
self.config.flush()
@ -273,7 +269,7 @@ class TankCore(object):
""" Call prepare_test() on all plugins """
logger.info("Preparing test...")
self.publish("core", "stage", "prepare")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Preparing %s", plugin)
plugin.prepare_test()
if self.flush_config_to:
@ -283,7 +279,7 @@ class TankCore(object):
""" Call start_test() on all plugins """
logger.info("Starting test...")
self.publish("core", "stage", "start")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Starting %s", plugin)
plugin.start_test()
if self.flush_config_to:
@ -302,7 +298,7 @@ class TankCore(object):
while not self.interrupted:
begin_time = time.time()
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Polling %s", plugin)
retcode = plugin.is_test_finished()
if retcode >= 0:
@ -321,7 +317,7 @@ class TankCore(object):
logger.info("Finishing test...")
self.publish("core", "stage", "end")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Finalize %s", plugin)
try:
logger.debug("RC before: %s", retcode)
@ -345,7 +341,7 @@ class TankCore(object):
logger.info("Post-processing test...")
self.publish("core", "stage", "post_process")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Post-process %s", plugin)
try:
logger.debug("RC before: %s", retcode)
@ -435,10 +431,7 @@ class TankCore(object):
Retrieve a plugin of desired class, KeyError raised otherwise
"""
logger.debug("Searching for plugin: %s", plugin_class)
matches = [
plugin for plugin in self.plugins
if isinstance(plugin, plugin_class)
]
matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]
if len(matches) > 0:
if len(matches) > 1:
logger.debug(
@ -448,6 +441,10 @@ class TankCore(object):
else:
raise KeyError("Requested plugin type not found: %s" % plugin_class)
def get_jobno(self, plugin_name='plugin_lunapark'):
uploader_plugin = self.plugins[plugin_name]
return uploader_plugin.lp_job.number
def __collect_file(self, filename, keep_original=False):
"""
Move or copy single file to artifacts dir
@ -569,7 +566,7 @@ class TankCore(object):
"""
logger.info("Close allocated resources...")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Close %s", plugin)
try:
plugin.close()
@ -602,6 +599,11 @@ class TankCore(object):
os_agent = 'OS/{}'.format(platform.platform())
return ' '.join((tank_agent, python_agent, os_agent))
def register_plugin(self, plugin_name, instance):
if self.plugins.get(plugin_name, None) is not None:
logger.exception('Plugins\' names should diverse')
self.plugins[plugin_name] = instance
class ConfigManager(object):
""" Option storage class """
@ -613,9 +615,7 @@ class ConfigManager(object):
def load_files(self, configs):
""" Read configs set into storage """
logger.debug("Reading configs: %s", configs)
config_filenames = [
resource.resource_filename(config) for config in configs
]
config_filenames = [resource.resource_filename(config) for config in configs]
try:
self.config.read(config_filenames)
except Exception as ex:

View File

@ -130,6 +130,15 @@ class DataPoller(object):
time.sleep(self.poll_period)
def to_utc(ts):
# dst = daylight saving time
is_dst = time.daylight and time.localtime().tm_isdst > 0
offset = (time.altzone if is_dst else time.timezone)
import pdb
pdb.set_trace()
return ts + offset
class Aggregator(object):
def __init__(self, source, config, verbose_histogram):
self.worker = Worker(config, verbose_histogram)

View File

@ -44,8 +44,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.listeners = [] # [LoggingListener()]
self.reader = None
self.stats_reader = None

View File

@ -0,0 +1 @@
from plugin import Plugin # noqa:F401

Binary file not shown.

View File

@ -0,0 +1,209 @@
import logging
import subprocess
import time
import urllib
import sys
import glob
import os
from multiprocessing import Process
from signal import SIGKILL
try:
from volta.analysis import grab, uploader
except Exception:
raise RuntimeError("Please install volta. https://github.com/yandex-load/volta")
from pkg_resources import resource_filename
from ...common.interfaces import AbstractPlugin, GeneratorPlugin
from .reader import AndroidReader, AndroidStatsReader
logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin, GeneratorPlugin):
SECTION = "android"
SECTION_META = "meta"
def __init__(self, core):
super(Plugin, self).__init__(core)
self.apk_path = None
self.test_path = None
self.package = None
self.package_test = None
self.clazz = None
self.device = None
self.test_runner = None
self.process_test = None
self.process_stderr = None
self.process_grabber = None
self.apk = "./app.apk"
self.test = "./app-test.apk"
self.grab_log = "./output.bin"
self.event_log = "./events.log"
@staticmethod
def get_key():
return __file__
def get_available_options(self):
opts = ["package", "test_package", "apk", "test_apk", "class", "test_runner"]
return opts
def configure(self):
# plugin part
self.apk_path = self.get_option("apk")
self.test_path = self.get_option("test_apk")
self.clazz = self.get_option("class")
self.package = self.get_option("package")
self.package_test = self.get_option("test_package")
self.test_runner = self.get_option("test_runner")
def prepare_test(self):
aggregator = self.core.job.aggregator_plugin
if aggregator:
aggregator.reader = AndroidReader()
aggregator.stats_reader = AndroidStatsReader()
ports = None
logger.info("Trying to find device")
if sys.platform.startswith('linux'):
ports = glob.glob('/dev/ttyUSB[0-9]*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/cu.wchusbserial[0-9]*')
else:
logger.info('Your OS is not supported yet')
logger.info("Ports = " + ''.join(ports))
try:
self.device = [port for port in ports if 'Bluetooth' not in port][0]
logger.info("Found device = " + self.device)
except Exception:
logger.info("Device not found")
logger.info("Download apk...")
urllib.urlretrieve(self.apk_path, self.apk)
logger.info("Download test...")
urllib.urlretrieve(self.test_path, self.test)
logger.info("Uninstall the lightning...")
subprocess.check_output(["adb", "uninstall", "net.yandex.overload.lightning"])
logger.info("Uninstall the app...")
subprocess.check_output(["adb", "uninstall", self.package])
logger.info("Uninstall the test...")
subprocess.check_output(["adb", "uninstall", self.package_test])
lightning = resource_filename(__name__, 'binary/lightning.apk')
logger.info("Get from resources " + lightning)
logger.info("Install the lightning...")
subprocess.check_output(["adb", "install", lightning])
logger.info("Install the apk...")
subprocess.check_output(["adb", "install", self.apk])
logger.info("Install the test...")
subprocess.check_output(["adb", "install", self.test])
logger.info("Clear logcat...")
subprocess.check_output(["adb", "logcat", "-c"])
def start_test(self):
if self.device:
logger.info("Start grabber...")
args = {
'device': self.device,
'seconds': 10800,
'output': self.grab_log,
'debug': False,
'binary': False
}
self.process_grabber = Process(target=grab.main, args=(args,))
self.process_grabber.start()
process_stderr_file = self.core.mkstemp(".log", "android_")
self.core.add_artifact_file(process_stderr_file)
self.process_stderr = open(process_stderr_file, 'w')
logger.info("Start flashlight...")
args = ["adb", "shell", "am", "start", "-n",
"net.yandex.overload.lightning/net.yandex.overload.lightning.MainActivity"]
subprocess.Popen(args)
time.sleep(12)
args = ["adb", "shell", "am", "instrument", "-w", "-e", "class", self.clazz,
'{package}/{runner}'.format(package=self.package_test, runner=self.test_runner)]
logger.info("Starting: %s", args)
self.process_test = subprocess.Popen(
args,
stderr=self.process_stderr,
stdout=self.process_stderr,
close_fds=True
)
def is_test_finished(self):
retcode = self.process_test.poll()
if retcode is not None:
logger.info("Subprocess done its work with exit code: %s", retcode)
return abs(retcode)
else:
return -1
def end_test(self, retcode):
if self.process_grabber:
logger.info("Kill grabber...")
os.kill(self.process_grabber.pid, SIGKILL)
logger.info("Get logcat dump...")
subprocess.check_call('adb logcat -d > {file}'.format(file=self.event_log), shell=True)
if os.path.exists(self.grab_log):
logger.info("Upload logs...")
args = {
'filename': self.grab_log,
'events': self.event_log,
'samplerate': 500,
'slope': 1,
'offset': 0,
'bynary': False,
'job_config': {
'task': self.core.get_option(self.SECTION_META, 'task').decode('utf8'),
'jobname': self.core.get_option(self.SECTION_META, 'job_name').decode('utf8'),
'dsc': self.core.get_option(self.SECTION_META, 'job_dsc').decode('utf8'),
'component': self.core.get_option('meta', 'component')
}
}
process_uploader = Process(target=uploader.main, args=(args,))
process_uploader.start()
process_uploader.join()
if self.process_test and self.process_test.poll() is None:
logger.info("Terminating tests with PID %s", self.process_test.pid)
self.process_test.terminate()
if self.process_stderr:
self.process_stderr.close()
logger.info("Uninstall the app...")
subprocess.check_output(["adb", "uninstall", self.package])
logger.info("Uninstall the test...")
subprocess.check_output(["adb", "uninstall", self.package_test])
return retcode
def get_info(self):
return AndroidInfo()
class AndroidInfo(object):
def __init__(self):
self.address = ''
self.port = 80
self.ammo_file = ''
self.duration = 0
self.loop_count = 1
self.instances = 1
self.rps_schedule = ''

View File

@ -0,0 +1,14 @@
class AndroidReader(object):
def close(self):
pass
def __iter__(self):
yield None
class AndroidStatsReader(object):
def close(self):
pass
def __iter__(self):
yield None

View File

@ -12,8 +12,8 @@ class Plugin(AbstractPlugin):
SECTION = "appium"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.appium_cmd = None
self.appium_log = None
self.appium_port = None

View File

@ -17,8 +17,8 @@ class Plugin(AbstractPlugin, AggregateResultListener):
""" Plugin that accepts criterion classes and triggers autostop """
SECTION = 'autostop'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
AggregateResultListener.__init__(self)
self.cause_criterion = None

View File

@ -16,8 +16,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.logfile = None
self.default_target = None
self.device_id = None

View File

@ -17,7 +17,7 @@ requests.packages.urllib3.disable_warnings()
class AbstractGun(AbstractPlugin):
def __init__(self, core):
super(AbstractGun, self).__init__(core)
super(AbstractGun, self).__init__(core, None)
self.results = None
@contextmanager

View File

@ -7,7 +7,7 @@ from ...common.interfaces import AbstractPlugin, GeneratorPlugin
from .guns import LogGun, SqlGun, CustomGun, HttpGun, ScenarioGun, UltimateGun
from .reader import BfgReader, BfgStatsReader
from .widgets import BfgInfoWidget
from .worker import BFG
from .worker import BFGMultiprocessing, BFGGreen
from ..Aggregator import Plugin as AggregatorPlugin
from ..Console import Plugin as ConsolePlugin
from ...stepper import StepperWrapper
@ -17,9 +17,9 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
''' Big Fucking Gun plugin '''
SECTION = 'bfg'
def __init__(self, core):
def __init__(self, core, config_section):
self.log = logging.getLogger(__name__)
AbstractPlugin.__init__(self, core)
AbstractPlugin.__init__(self, core, config_section)
self.gun_type = None
self.start_time = time.time()
self.stepper_wrapper = StepperWrapper(self.core, Plugin.SECTION)
@ -69,11 +69,19 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
cached_stpd = True
else:
cached_stpd = False
if self.get_option("worker_type", "") == "green":
BFG = BFGGreen
else:
BFG = BFGMultiprocessing
self.bfg = BFG(
gun=self.gun,
instances=self.stepper_wrapper.instances,
stpd_filename=self.stepper_wrapper.stpd,
cached_stpd=cached_stpd)
cached_stpd=cached_stpd,
green_threads_per_instance=int(self.get_option('green_threads_per_instance', 1000)),
)
aggregator = None
try:
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)

View File

@ -9,13 +9,14 @@ from ...stepper import StpdReader
logger = logging.getLogger(__name__)
class BFG(object):
class BFGBase(object):
"""
A BFG load generator that manages multiple workers as processes and
threads in each of them and feeds them with tasks
"""
def __init__(self, gun, instances, stpd_filename, cached_stpd=False):
def __init__(self, gun, instances, stpd_filename, cached_stpd=False,
green_threads_per_instance=None):
logger.info(
"""
BFG using stpd from {stpd_filename}
@ -35,13 +36,14 @@ Gun: {gun.__class__.__name__}
self.cached_stpd = cached_stpd
self.stpd_filename = stpd_filename
self.pool = [
mp.Process(target=self._worker) for _ in xrange(0, self.instances)
mp.Process(target=self._worker) for _ in xrange(self.instances)
]
self.feeder = th.Thread(target=self._feed, name="Feeder")
self.feeder.daemon = True
self.workers_finished = False
self.start_time = None
self.plan = None
self.green_threads_per_instance = green_threads_per_instance
def start(self):
self.start_time = time.time()
@ -129,6 +131,12 @@ Gun: {gun.__class__.__name__}
map(lambda x: x.join(), self.pool)
self.workers_finished = True
class BFGMultiprocessing(BFGBase):
"""
Default worker type, creates process per worker,
every process executes requests synchronously inside.
"""
def _worker(self):
"""
A worker that does actual jobs
@ -176,3 +184,91 @@ Gun: {gun.__class__.__name__}
logger.exception("Couldn't finalize gun. Exit shooter process")
return
logger.debug("Exit shooter process")
class BFGGreen(BFGBase):
"""
Green version of the worker. Starts `self.instances` processes,
each of process has a pool of `self.green_threads_per_instance` green threads.
"""
def _worker(self):
from gevent import monkey, spawn
from gevent.queue import Queue as GreenQueue
# NOTE: Patching everything will conflict with multiprocessing
monkey.patch_all(thread=False, select=False)
logger.debug("Init shooter process")
try:
self.gun.setup()
except Exception:
logger.exception("Couldn't initialize gun. Exit shooter process")
return
self.green_queue = GreenQueue(self.green_threads_per_instance)
self.green_pool = [spawn(self._green_worker) for _ in xrange(0, self.green_threads_per_instance)]
# Keep track of tasks sent to greenlets. If all greenlets are busy -
# don't pull more tasks from the main queue, let other workers do that.
self._free_threads_count = self.green_threads_per_instance
while not self.quit.is_set():
while not self.task_queue.empty() and self._free_threads_count:
try:
task = self.task_queue.get_nowait()
except Empty:
continue
self._free_threads_count -= 1
if not task:
logger.debug("Got killer task.")
self.quit.set()
break
self.green_queue.put(task)
time.sleep(0.1)
map(lambda g: g.join(), self.green_pool)
try:
self.gun.teardown()
except Exception:
logger.exception("Couldn't finalize gun. Exit shooter process")
return
logger.debug("Exit shooter process")
def _green_worker(self):
"""
A worker that does actual jobs
"""
while not self.quit.is_set():
try:
task = self.green_queue.get(timeout=1)
timestamp, missile, marker = task
planned_time = self.start_time + (timestamp / 1000.0)
delay = planned_time - time.time()
if delay > 0:
time.sleep(delay)
try:
with self.instance_counter.get_lock():
self.instance_counter.value += 1
self.gun.shoot(missile, marker)
finally:
with self.instance_counter.get_lock():
self.instance_counter.value -= 1
self._free_threads_count += 1
except (KeyboardInterrupt, SystemExit):
break
except Empty:
continue
except Full:
logger.warning("Couldn't put to result queue because it's full")
except Exception:
logger.exception("Bfg shoot exception")

View File

@ -16,12 +16,11 @@ class Plugin(AbstractPlugin, AggregateResultListener):
''' Console plugin '''
SECTION = 'console'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.screen = None
self.render_exception = None
self.console_markup = None
self.remote_translator = None
self.info_panel_width = '33'
self.short_only = 0
# these three provide non-blocking console output
@ -71,9 +70,6 @@ class Plugin(AbstractPlugin, AggregateResultListener):
sys.stdout.write(self.__console_view)
sys.stdout.write(self.console_markup.TOTAL_RESET)
if self.remote_translator:
self.remote_translator.send_console(self.__console_view)
def is_test_finished(self):
if not self.__writer_thread:
self.__writer_event = threading.Event()

View File

@ -0,0 +1,165 @@
import ConfigParser
import argparse
import glob
import json
import logging
import os
import socket
import sys
import pwd
from StringIO import StringIO
from urlparse import urljoin
from datetime import datetime
import pkg_resources
from .client import APIClient
from .plugin import LPJob
CONFIG_FILE = 'saved_conf.ini'
DATA_LOG = 'test_data.log'
MONITORING_LOG = 'monitoring.log'
SECTION = 'meta'
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
verbose_handler = logging.FileHandler(
datetime.now().strftime("post_loader_%Y-%m-%d_%H-%M-%S.log"), 'w')
verbose_handler.setLevel(logging.DEBUG)
logger.addHandler(verbose_handler)
def read_config(shooting_dir):
config_file = glob.glob(os.path.join(shooting_dir, CONFIG_FILE))[0]
logger.info('Config file found: %s' % config_file)
config = ConfigParser.ConfigParser()
config.read(config_file)
return config
def get_lp_config(config):
"""
looks for config file in shooting_dir,
returns config dict of section 'meta'
:rtype: dict
"""
lp_config = dict(config.items(SECTION))
for key in sorted(lp_config.keys()):
logger.debug('%s: %s' % (key, lp_config[key]))
return lp_config
def check_log(log_name):
assert os.path.exists(log_name), \
'Data log {} not found\n'.format(log_name) + \
'JsonReport plugin should be enabled when launching Yandex-tank'
def upload_data(shooting_dir, log_name, lp_job):
data_log = os.path.join(shooting_dir, log_name)
check_log(data_log)
sys.stdout.write('Uploading test data')
with open(data_log, 'r') as f:
for line in f:
data = json.loads(line.strip())
lp_job.push_test_data(data['data'], data['stats'])
sys.stdout.write('.')
sys.stdout.flush()
sys.stdout.write('\n')
def upload_monitoring(shooting_dir, log_name, lp_job):
data_log = os.path.join(shooting_dir, log_name)
check_log(data_log)
sys.stdout.write('Uploading monitoring data')
with open(data_log, 'r') as f:
for line in f:
lp_job.push_monitoring_data(line.strip())
sys.stdout.write('.')
sys.stdout.flush()
sys.stdout.write('\n')
def send_config_snapshot(config, lp_job):
config.set(SECTION, 'launched_from', 'post-loader')
output = StringIO()
config.write(output)
lp_job.send_config_snapshot(output.getvalue())
def edit_metainfo(lp_config, lp_job):
lp_job.edit_metainfo(is_regression=lp_config.get('regress'),
regression_component=lp_config.get('component'),
cmdline=lp_config.get('cmdline'),
ammo_path=lp_config.get('ammo_path'),
loop_count=lp_config.get('loop_count'))
def get_plugin_dir(shooting_dir):
DIRNAME = 'lunapark'
parent = os.path.abspath(os.path.join(shooting_dir, os.pardir))
if os.path.basename(parent) == DIRNAME:
return parent
else:
plugin_dir = os.path.join(parent, DIRNAME)
if not os.path.exists(plugin_dir):
os.makedirs(plugin_dir)
return plugin_dir
def make_symlink(shooting_dir, name):
plugin_dir = get_plugin_dir(shooting_dir)
link_name = os.path.join(plugin_dir, str(name))
os.symlink(os.path.relpath(shooting_dir, plugin_dir), link_name)
logger.info('Symlink created: {}'.format(os.path.abspath(link_name)))
def post_loader():
parser = argparse.ArgumentParser()
parser.add_argument('shooting_dir',
help='Directory containing shooting artifacts')
shooting_dir = parser.parse_args().shooting_dir
assert os.path.exists(shooting_dir), 'Directory not found'
config = read_config(shooting_dir)
lp_config = get_lp_config(config)
api_client = APIClient(base_url=lp_config['api_address'],
user_agent='Lunapark/{}'.format(pkg_resources.require('yatank-internal-lunapark')[0].version)
# todo: add timeouts
)
lp_job = LPJob(
client=api_client,
target_host=lp_config.get('target_host'),
target_port=lp_config.get('target_port'),
person=lp_config.get(
'operator',
'') or pwd.getpwuid(
os.geteuid())[0],
task=lp_config['task'],
name=lp_config['job_name'],
description=lp_config['job_dsc'],
tank=socket.getfqdn())
edit_metainfo(lp_config, lp_job)
upload_data(shooting_dir, DATA_LOG, lp_job)
send_config_snapshot(config, lp_job)
try:
upload_monitoring(shooting_dir, MONITORING_LOG, lp_job)
except AssertionError as e:
logger.error(e)
lp_job.close(0)
make_symlink(shooting_dir, lp_job.number)
logger.info(
'LP job created: {}'.format(
urljoin(
api_client.base_url, str(
lp_job.number))))
if __name__ == '__main__':
post_loader()

View File

@ -0,0 +1,594 @@
import json
import time
import traceback
import urllib
from future.moves.urllib.parse import urljoin
from builtins import range
import requests
import logging
from requests.exceptions import ConnectionError, Timeout
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__) # pylint: disable=C0103
class APIClient(object):
def __init__(
self,
base_url=None,
writer_url=None,
network_attempts=10,
api_attempts=10,
maintenance_attempts=40,
network_timeout=2,
api_timeout=5,
maintenance_timeout=15,
connection_timeout=5.0,
user_agent=None,
api_token=None):
self.user_agent = user_agent
self.connection_timeout = connection_timeout
self._base_url = base_url
self.writer_url = writer_url
self.retry_timeout = 10
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({"User-Agent": "tank"})
self.network_attempts = network_attempts
self.network_timeout = network_timeout
self.api_attempts = api_attempts
self.api_timeout = api_timeout
self.maintenance_attempts = maintenance_attempts
self.maintenance_timeout = maintenance_timeout
self.params = {'api_token': api_token} if api_token else {}
@property
def base_url(self):
if not self._base_url:
raise ValueError("Base url is not set")
else:
return self._base_url
@base_url.setter
def base_url(self, url):
self._base_url = url
class UnderMaintenance(Exception):
message = "API is under maintenance"
class NotAvailable(Exception):
desc = "API is not available"
def __init__(self, request, response):
self.message = "%s\n%s\n%s" % (self.desc, request, response)
super(self.__class__, self).__init__(self.message)
class StoppedFromOnline(Exception):
message = "Shooting is stopped from online"
class JobNotCreated(Exception):
pass
class NetworkError(Exception):
pass
def set_api_timeout(self, timeout):
self.api_timeout = float(timeout)
def network_timeouts(self):
return (self.network_timeout for _ in range(self.network_attempts - 1))
def api_timeouts(self):
return (self.api_timeout for _ in range(self.api_attempts - 1))
def maintenance_timeouts(self):
return (
self.maintenance_timeout for _ in range(
self.maintenance_attempts - 1))
@staticmethod
def filter_headers(headers):
boring = ['X-Content-Security-Policy', 'Content-Security-Policy',
'Strict-Transport-Security', 'X-WebKit-CSP', 'Set-Cookie',
'X-DNS-Prefetch-Control', 'X-Frame-Options', 'P3P',
'X-Content-Type-Options', 'X-Download-Options',
'Surrogate-Control']
for h in boring:
if h in headers:
del (headers[h])
return headers
def __send_single_request(self, req, trace=False):
p = self.session.prepare_request(req)
if trace:
logger.debug("Making request: %s %s Headers: %s Body: %s",
p.method, p.url, p.headers, p.body)
resp = self.session.send(p, timeout=self.connection_timeout)
if trace:
logger.debug("Got response in %ss: %s %s Headers: %s Body: %s",
resp.elapsed.total_seconds(), resp.reason,
resp.status_code, self.filter_headers(resp.headers),
resp.content)
if resp.status_code in [500, 502, 503, 504]:
raise self.NotAvailable(
request="request: %s %s\n\tHeaders: %s\n\tBody: %s" %
(p.method,
p.url,
p.headers,
p.body),
response="Got response in %ss: %s %s\n\tHeaders: %s\n\tBody: %s" %
(resp.elapsed.total_seconds(),
resp.reason,
resp.status_code,
self.filter_headers(
resp.headers),
resp.content))
elif resp.status_code == 410:
raise self.StoppedFromOnline
elif resp.status_code == 423:
raise self.UnderMaintenance
else:
resp.raise_for_status()
return resp
def __make_api_request(
self,
http_method,
path,
data=None,
response_callback=lambda x: x,
writer=False,
trace=False,
json=None,
maintenance_timeouts=None,
maintenance_msg=None):
url = urljoin(self.base_url, path)
if json:
request = requests.Request(
http_method, url, json=json, headers={'User-Agent': self.user_agent}, params=self.params)
else:
request = requests.Request(
http_method, url, data=data, headers={'User-Agent': self.user_agent}, params=self.params)
network_timeouts = self.network_timeouts()
maintenance_timeouts = maintenance_timeouts or self.maintenance_timeouts()
maintenance_msg = maintenance_msg or "%s is under maintenance" % (self._base_url)
while True:
try:
response = self.__send_single_request(request, trace=trace)
return response_callback(response)
except (Timeout, ConnectionError):
logger.warn(traceback.format_exc())
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
logger.warn(maintenance_msg)
logger.warn("Retrying in %ss..." % timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def __make_writer_request(
self,
params=None,
json=None,
http_method="POST",
trace=False):
'''
Send request to writer service.
'''
request = requests.Request(
http_method,
self.writer_url,
params=params,
json=json,
headers={
'User-Agent': self.user_agent})
network_timeouts = self.network_timeouts()
maintenance_timeouts = self.maintenance_timeouts()
while True:
try:
response = self.__send_single_request(request, trace=trace)
return response
except (Timeout, ConnectionError):
logger.warn(traceback.format_exc())
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
logger.warn(
"Writer is under maintenance, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def __get(self, addr, trace=False, maintenance_timeouts=None, maintenance_msg=None):
return self.__make_api_request(
'GET',
addr,
trace=trace,
response_callback=lambda r: json.loads(r.content.decode('utf8')),
maintenance_timeouts=maintenance_timeouts,
maintenance_msg=maintenance_msg
)
def __post_raw(self, addr, txt_data, trace=False):
return self.__make_api_request(
'POST', addr, txt_data, lambda r: r.content, trace=trace)
def __post(self, addr, data, trace=False):
return self.__make_api_request(
'POST',
addr,
json=data,
response_callback=lambda r: r.json(),
trace=trace)
def __put(self, addr, data, trace=False):
return self.__make_api_request(
'PUT',
addr,
json=data,
response_callback=lambda r: r.text,
trace=trace)
def __patch(self, addr, data, trace=False):
return self.__make_api_request(
'PATCH',
addr,
json=data,
response_callback=lambda r: r.text,
trace=trace)
def get_task_data(self, task, trace=False):
return self.__get("api/task/" + task + "/summary.json", trace=trace)
def new_job(
self,
task,
person,
tank,
target_host,
target_port,
loadscheme=None,
detailed_time=None,
notify_list=None,
trace=False):
"""
:return: job_nr, upload_token
:rtype: tuple
"""
if not notify_list:
notify_list = []
data = {
'task': task,
'person': person,
'tank': tank,
'host': target_host,
'port': target_port,
'loadscheme': loadscheme,
'detailed_time': detailed_time,
'notify': notify_list
}
logger.debug("Job create request: %s", data)
api_timeouts = self.api_timeouts()
while True:
try:
response = self.__post(
"api/job/create.json", data, trace=trace)[0]
# [{"upload_token": "1864a3b2547d40f19b5012eb038be6f6", "job": 904317}]
return response['job'], response['upload_token']
except (self.NotAvailable, self.StoppedFromOnline) as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss..." % timeout)
time.sleep(timeout)
continue
except StopIteration:
logger.warn('Failed to create job on lunapark')
raise self.JobNotCreated(e.message)
except Exception as e:
logger.warn('Failed to create job on lunapark')
logger.warn(e.message)
raise self.JobNotCreated()
def get_job_summary(self, jobno):
result = self.__get('api/job/' + str(jobno) + '/summary.json')
return result[0]
def close_job(self, jobno, retcode, trace=False):
params = {'exitcode': str(retcode)}
result = self.__get('api/job/' + str(jobno) + '/close.json?' +
urllib.urlencode(params), trace=trace)
return result[0]['success']
def edit_job_metainfo(
self,
jobno,
job_name,
job_dsc,
instances,
ammo_path,
loop_count,
version_tested,
is_regression,
component,
cmdline,
is_starred,
tank_type=0,
trace=False):
data = {
'name': job_name,
'description': job_dsc,
'instances': str(instances),
'ammo': ammo_path,
'loop': loop_count,
'version': version_tested,
'regression': str(is_regression),
'component': component,
'tank_type': int(tank_type),
'command_line': cmdline,
'starred': int(is_starred)
}
response = self.__post(
'api/job/' +
str(jobno) +
'/edit.json',
data,
trace=trace)
return response
def set_imbalance_and_dsc(self, jobno, rps, comment):
data = {}
if rps:
data['imbalance'] = rps
if comment:
res = self.get_job_summary(jobno)
data['description'] = (res['dsc'] + "\n" + comment).strip()
response = self.__post('api/job/' + str(jobno) + '/edit.json', data)
return response
def second_data_to_push_item(self, data, stat, timestamp, overall, case):
"""
@data: SecondAggregateDataItem
"""
api_data = {
'overall': overall,
'case': case,
'net_codes': [],
'http_codes': [],
'time_intervals': [],
'trail': {
'time': str(timestamp),
'reqps': stat["metrics"]["reqps"],
'resps': data["interval_real"]["len"],
'expect': data["interval_real"]["total"] / 1000.0 /
data["interval_real"]["len"],
'disper': 0,
'self_load':
0, # TODO abs(round(100 - float(data.selfload), 2)),
'input': data["size_in"]["total"],
'output': data["size_out"]["total"],
'connect_time': data["connect_time"]["total"] / 1000.0 /
data["connect_time"]["len"],
'send_time':
data["send_time"]["total"] / \
1000.0 / data["send_time"]["len"],
'latency':
data["latency"]["total"] / 1000.0 / data["latency"]["len"],
'receive_time': data["receive_time"]["total"] / 1000.0 /
data["receive_time"]["len"],
'threads': stat["metrics"]["instances"], # TODO
}
}
for q, value in zip(data["interval_real"]["q"]["q"],
data["interval_real"]["q"]["value"]):
api_data['trail']['q' + str(q)] = value / 1000.0
for code, cnt in data["net_code"]["count"].items():
api_data['net_codes'].append({'code': int(code),
'count': int(cnt)})
for code, cnt in data["proto_code"]["count"].items():
api_data['http_codes'].append({'code': int(code),
'count': int(cnt)})
api_data['time_intervals'] = self.convert_hist(data["interval_real"][
"hist"])
return api_data
def convert_hist(self, hist):
data = hist['data']
bins = hist['bins']
return [
{
"from": 0, # deprecated
"to": b / 1000.0,
"count": count,
} for b, count in zip(bins, data)
]
def push_test_data(
self,
jobno,
upload_token,
data_item,
stat_item,
trace=False):
items = []
uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(
jobno, upload_token)
ts = data_item["ts"]
for case_name, case_data in data_item["tagged"].items():
if case_name == "":
case_name = "__NOTAG__"
if (len(case_name)) > 128:
raise RuntimeError('tag (case) name is too long: ' + case_name)
push_item = self.second_data_to_push_item(case_data, stat_item, ts,
0, case_name)
items.append(push_item)
overall = self.second_data_to_push_item(data_item["overall"],
stat_item, ts, 1, '')
items.append(overall)
api_timeouts = self.api_timeouts()
while True:
try:
if self.writer_url:
res = self.__make_writer_request(
params={
"jobno": jobno,
"upload_token": upload_token,
},
json={
"trail": items,
},
trace=trace)
logger.debug("Writer response: %s", res.text)
return res.json()["success"]
else:
res = self.__post(uri, items, trace=trace)
logger.debug("API response: %s", res)
success = int(res[0]['success'])
return success
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def push_monitoring_data(
self,
jobno,
upload_token,
send_data,
trace=False):
if send_data:
addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (
jobno, upload_token)
api_timeouts = self.api_timeouts()
while True:
try:
if self.writer_url:
res = self.__make_writer_request(
params={
"jobno": jobno,
"upload_token": upload_token,
},
json={
"monitoring": send_data,
},
trace=trace)
logger.debug("Writer response: %s", res.text)
return res.json()["success"]
else:
res = self.__post_raw(
addr, json.dumps(send_data), trace=trace)
logger.debug("API response: %s", res)
success = res == 'ok'
return success
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def send_status(self, jobno, upload_token, status, trace=False):
addr = "api/v2/jobs/%s/?upload_token=%s" % (jobno, upload_token)
status_line = status.get("core", {}).get("stage", "unknown")
if "stepper" in status:
status_line += " %s" % status["stepper"].get("progress")
api_timeouts = self.api_timeouts()
while True:
try:
self.__patch(addr, {"status": status_line}, trace=trace)
return
except self.NotAvailable as e:
try:
timeout = next(api_timeouts)
logger.warn("API error, will retry in %ss...", timeout)
time.sleep(timeout)
continue
except StopIteration:
raise e
def is_target_locked(self, target, trace=False):
addr = "api/server/lock.json?action=check&address=%s" % target
res = self.__get(addr, trace=trace)
return res[0]
def lock_target(self, target, duration, trace=False, maintenance_timeouts=None, maintenance_msg=None):
addr = "api/server/lock.json?action=lock&" + \
"address=%s&duration=%s&jobno=None" % \
(target, int(duration))
res = self.__get(addr, trace=trace, maintenance_timeouts=maintenance_timeouts, maintenance_msg=maintenance_msg)
return res[0]
def unlock_target(self, target):
addr = self.get_manual_unlock_link(target)
res = self.__get(addr)
return res[0]
def get_virtual_host_info(self, hostname):
addr = "api/server/virtual_host.json?hostname=%s" % hostname
res = self.__get(addr)
try:
return res[0]
except KeyError:
raise Exception(res['error'])
@staticmethod
def get_manual_unlock_link(target):
return "api/server/lock.json?action=unlock&address=%s" % target
def send_config_snapshot(self, jobno, config, trace=False):
logger.debug("Sending config snapshot")
addr = "api/job/%s/configinfo.txt" % jobno
self.__post_raw(addr, {"configinfo": config}, trace=trace)
class OverloadClient(APIClient):
def send_status(self, jobno, upload_token, status, trace=False):
return
def lock_target(self, target, duration, trace=False, **kwargs):
return
def unlock_target(self, *args, **kwargs):
return

View File

@ -0,0 +1,853 @@
# coding=utf-8
# TODO: make the next two lines unnecessary
# pylint: disable=line-too-long
# pylint: disable=missing-docstring
import copy
import logging
import os
import pwd
import re
import sys
import time
from StringIO import StringIO
from urlparse import urljoin
from queue import Empty, Queue
from builtins import str
import requests
import threading
from ...common.interfaces import AbstractPlugin, \
MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
from ...common.util import expand_to_seconds
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from .client import APIClient, OverloadClient
logger = logging.getLogger(__name__) # pylint: disable=C0103
class BackendTypes(object):
OVERLOAD = 'OVERLOAD'
LUNAPARK = 'LUNAPARK'
@classmethod
def identify_backend(cls, section_name):
patterns = [
(r'^overload(-\d+)?$', cls.OVERLOAD),
(r'^(meta|lunapark|lp)(-\d+)?$', cls.LUNAPARK)
]
for pattern, backend_type in patterns:
if re.match(pattern, section_name):
return backend_type
else:
raise KeyError('Config section name doesn\'t match any of the patterns:\n%s' %
'\n'.join([ptrn[0] for ptrn in patterns]))
pass
class Plugin(AbstractPlugin, AggregateResultListener,
MonitoringDataListener):
SECTION = 'meta'
RC_STOP_FROM_WEB = 8
VERSION = '3.0'
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.data_queue = Queue()
self.monitoring_queue = Queue()
self.ignore_target_lock = None
self.jobno_file = None
self.lock_target_duration = None
self.mon = None
self.regression_component = None
self.retcode = -1
self._target = None
self.task_name = ''
self.token_file = None
self.version_tested = None
self.send_status_period = 10
self._generator_info = None
self._is_telegraf = None
self.backend_type = BackendTypes.identify_backend(self.SECTION)
self._task = None
self._api_token = ''
self._lp_job = None
@staticmethod
def get_key():
return __file__
def get_available_options(self):
opts = [
"api_address",
"writer_endpoint",
"task",
"job_name",
"job_dsc",
"notify",
"ver", "component",
"regress",
"operator",
"copy_config_to",
"jobno_file",
"ignore_target_lock",
"target_lock_duration",
"lock_targets",
"jobno",
"upload_token",
'connection_timeout',
'network_attempts',
'api_attempts',
'maintenance_attempts',
'network_timeout',
'api_timeout',
'maintenance_timeout',
'strict_lock',
'send_status_period',
'log_data_requests',
'log_monitoring_requests',
'log_status_requests',
'log_other_requests',
'threads_timeout'
]
return opts
def configure(self):
self.mon = self.core.job.monitoring_plugin
self.jobno_file = self.get_option("jobno_file", '')
self.regression_component = str(self.get_option("component", ''))
ignore_all_locks = self.core.get_option(self.core.SECTION,
"ignore_locks", '0')
self.ignore_target_lock = int(self.get_option("ignore_target_lock",
ignore_all_locks))
self.lock_target_duration = expand_to_seconds(self.get_option(
"target_lock_duration", "30m"))
self.send_status_period = expand_to_seconds(
self.get_option('send_status_period', '10'))
def check_task_is_open(self):
if self.backend_type == BackendTypes.OVERLOAD:
return
TASK_TIP = 'The task should be connected to Lunapark. Open startrek task page, click "actions" -> "load testing".'
logger.debug("Check if task %s is open", self.task)
try:
task_data = self.lp_job.get_task_data(self.task)[0]
try:
task_status = task_data['status']
if task_status == 'Open':
logger.info("Task %s is ok", self.task)
self.task_name = str(task_data['name'])
else:
logger.info("Task %s:" % self.task)
logger.info(task_data)
raise RuntimeError("Task is not open")
except KeyError:
try:
error = task_data['error']
raise RuntimeError(
"Task %s error: %s\n%s" %
(self.task, error, TASK_TIP))
except KeyError:
raise RuntimeError(
'Unknown task data format:\n{}'.format(task_data))
except requests.exceptions.HTTPError as ex:
logger.error("Failed to check task status for '%s': %s", self.task, ex)
if ex.response.status_code == 404:
raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))
elif ex.response.status_code == 500 or ex.response.status_code == 400:
raise RuntimeError(
"Unable to check task staus, id: %s, error code: %s" %
(self.task, ex.response.status_code))
raise ex
@staticmethod
def search_task_from_cwd(cwd):
issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")
while cwd:
logger.debug("Checking if dir is named like JIRA issue: %s", cwd)
if issue.match(os.path.basename(cwd)):
res = re.search(issue, os.path.basename(cwd))
return res.group(1).upper()
newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))
if newdir == cwd:
break
else:
cwd = newdir
raise RuntimeError(
"task=dir requested, but no JIRA issue name in cwd: %s" %
os.getcwd())
def prepare_test(self):
info = self.generator_info
port = info.port
instances = info.instances
if info.ammo_file.startswith(
"http://") or info.ammo_file.startswith("https://"):
ammo_path = info.ammo_file
else:
ammo_path = os.path.realpath(info.ammo_file)
duration = int(info.duration)
if duration:
self.lock_target_duration = duration
loop_count = info.loop_count
lp_job = self.lp_job
self.locked_targets = self.check_and_lock_targets(strict=bool(
int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)
try:
if lp_job._number:
self.make_symlink(lp_job._number)
self.check_task_is_open()
else:
self.check_task_is_open()
lp_job.create()
self.make_symlink(lp_job.number)
self.core.publish(self.SECTION, 'jobno', lp_job.number)
except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:
logger.error(e.message)
logger.error(
'Failed to connect to Lunapark, disabling DataUploader')
self.start_test = lambda *a, **kw: None
self.post_process = lambda *a, **kw: None
self.on_aggregated_data = lambda *a, **kw: None
self.monitoring_data = lambda *a, **kw: None
return
cmdline = ' '.join(sys.argv)
lp_job.edit_metainfo(
instances=instances,
ammo_path=ammo_path,
loop_count=loop_count,
is_regression=self.get_option(
'regress',
'0'),
regression_component=self.regression_component,
cmdline=cmdline,
) # todo: tanktype?
self.core.job.subscribe_plugin(self)
try:
console = self.core.get_plugin_of_type(ConsolePlugin)
except KeyError:
logger.debug("Console plugin not found", exc_info=True)
console = None
if console:
console.add_info_widget(JobInfoWidget(self))
self.set_option('target_host', self.target)
self.set_option('target_port', port)
self.set_option('cmdline', cmdline)
self.set_option('ammo_path', ammo_path)
self.set_option('loop_count', loop_count)
self.__save_conf()
def start_test(self):
self.on_air = True
status_sender = threading.Thread(target=self.__send_status)
status_sender.daemon = True
status_sender.start()
self.status_sender = status_sender
upload = threading.Thread(target=self.__data_uploader)
upload.daemon = True
upload.start()
self.upload = upload
monitoring = threading.Thread(target=self.__monitoring_uploader)
monitoring.daemon = True
monitoring.start()
self.monitoring = monitoring
web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))
logger.info("Web link: %s", web_link)
self.publish("jobno", self.lp_job.number)
self.publish("web_link", web_link)
self.set_option("jobno", str(self.lp_job.number))
if self.jobno_file:
logger.debug("Saving jobno to: %s", self.jobno_file)
fdes = open(self.jobno_file, 'w')
fdes.write(str(self.lp_job.number))
fdes.close()
self.__save_conf()
def is_test_finished(self):
return self.retcode
def end_test(self, retcode):
self.on_air = False
self.monitoring_queue.put(None)
self.data_queue.put(None)
self.__save_conf()
timeout = int(self.get_option('threads_timeout', '60'))
logger.info(
'Waiting for sender threads to join for {} seconds ("meta.threads_timeout" config option)'.format(timeout))
self.monitoring.join(timeout=timeout)
if self.monitoring.isAlive():
logger.error('Monitoring thread joining timed out. Terminating.')
self.upload.join(timeout=timeout)
if self.upload.isAlive():
logger.error('Upload thread joining timed out. Terminating.')
self.unlock_targets(self.locked_targets)
return retcode
def post_process(self, rc):
try:
self.lp_job.close(rc)
except Exception: # pylint: disable=W0703
logger.warning("Failed to close job", exc_info=True)
logger.info(
"Web link: %s%s",
self.lp_job.api_client.base_url,
self.lp_job.number)
autostop = None
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
except KeyError:
logger.debug("No autostop plugin loaded", exc_info=True)
if autostop and autostop.cause_criterion:
rps = 0
if autostop.cause_criterion.cause_second:
rps = autostop.cause_criterion.cause_second[
1]["metrics"]["reqps"]
if not rps:
rps = autostop.cause_criterion.cause_second[0][
"overall"]["interval_real"]["len"]
self.lp_job.set_imbalance_and_dsc(
int(rps), autostop.cause_criterion.explain())
else:
logger.debug("No autostop cause detected")
self.__save_conf()
return rc
def on_aggregated_data(self, data, stats):
"""
@data: aggregated data
@stats: stats about gun
"""
if self.lp_job.is_alive:
self.data_queue.put((data, stats))
def monitoring_data(self, data_list):
if self.lp_job.is_alive:
if len(data_list) > 0:
if self.is_telegraf:
# telegraf
self.monitoring_queue.put(data_list)
else:
# monitoring
[self.monitoring_queue.put(data) for data in data_list]
@property
def is_telegraf(self):
if self._is_telegraf is None:
self._is_telegraf = 'Telegraf' in self.core.job.monitoring_plugin.__module__
return self._is_telegraf
def _core_with_tank_api(self):
"""
Return True if we are running under Tank API
"""
api_found = False
try:
import yandex_tank_api.worker # pylint: disable=F0401
except ImportError:
logger.debug("Attempt to import yandex_tank_api.worker failed")
else:
api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)
logger.debug("We are%s running under API server", '' if api_found else ' likely not')
return api_found
def __send_status(self):
logger.info('Status sender thread started')
lp_job = self.lp_job
while lp_job.is_alive and self.on_air:
try:
self.lp_job.send_status(self.core.status)
time.sleep(self.send_status_period)
except (APIClient.NetworkError, APIClient.NotAvailable) as e:
logger.warn('Failed to send status')
logger.debug(e.message)
break
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
logger.info("Closing Status sender thread")
def __data_uploader(self):
logger.info('Data uploader thread started')
lp_job = self.lp_job
queue = self.data_queue
while lp_job.is_alive:
try:
entry = queue.get(timeout=1)
if entry is not None:
data, stats = entry
else:
logger.info("Data uploader queue returned None")
break
lp_job.push_test_data(data, stats)
except Empty:
continue
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
except Exception as e:
logger.info("Mysterious exception: %s", e)
self.retcode = 8
break
logger.info("Closing Data uploader thread")
def __monitoring_uploader(self):
logger.info('Monitoring uploader thread started')
lp_job = self.lp_job
queue = self.monitoring_queue
while lp_job.is_alive:
try:
data = queue.get(timeout=1)
if data is not None:
lp_job.push_monitoring_data(data)
else:
logger.info('Monitoring queue returned None')
break
except Empty:
continue
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push monitoring data')
logger.warn(e.message)
break
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
logger.info('Closing Monitoring uploader thread')
def __save_conf(self):
config_copy = self.get_option('copy_config_to', '')
if config_copy:
self.core.config.flush(config_copy)
config = copy.copy(self.core.config.config)
try:
config_filename = self.core.job.monitoring_plugin.config
if config_filename and config_filename not in ['none', 'auto']:
with open(config_filename) as config_file:
config.set(
self.core.job.monitoring_plugin.SECTION,
"config_contents",
config_file.read())
except Exception: # pylint: disable=W0703
logger.warning("Can't get monitoring config", exc_info=True)
output = StringIO()
config.write(output)
self.lp_job.send_config_snapshot(output.getvalue())
with open(os.path.join(self.core.artifacts_dir, 'saved_conf.ini'), 'w') as f:
config.write(f)
def parse_lock_targets(self):
# prepare target lock list
locks_list_cfg = self.get_option('lock_targets', 'auto').strip()
def no_target():
logging.warn("Target lock set to 'auto', but no target info available")
return ''
locks_list = (self.target or no_target() if locks_list_cfg.lower() == 'auto' else locks_list_cfg).split('\n')
targets_to_lock = [host for host in locks_list if host]
return targets_to_lock
def lock_targets(self, targets_to_lock, ignore, strict):
locked_targets = [target for target in targets_to_lock
if self.lp_job.lock_target(target, self.lock_target_duration, ignore, strict)]
return locked_targets
def unlock_targets(self, locked_targets):
logger.info("Unlocking targets: %s", locked_targets)
for target in locked_targets:
logger.info(target)
self.lp_job.api_client.unlock_target(target)
def check_and_lock_targets(self, strict, ignore):
targets_list = self.parse_lock_targets()
logger.info('Locking targets: %s', targets_list)
locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)
logger.info('Locked targets: %s', locked_targets)
return locked_targets
def make_symlink(self, name):
PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')
if not os.path.exists(PLUGIN_DIR):
os.makedirs(PLUGIN_DIR)
os.symlink(
os.path.relpath(
self.core.artifacts_dir,
PLUGIN_DIR),
os.path.join(
PLUGIN_DIR,
str(name)))
def _get_user_agent(self):
plugin_agent = 'Uploader/{}'.format(self.VERSION)
return ' '.join((plugin_agent,
self.core.get_user_agent()))
def __get_operator(self):
try:
return self.get_option(
'operator',
'') or pwd.getpwuid(
os.geteuid())[0]
except:
logger.error(
"Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config file.")
raise
def __get_api_client(self):
if self.backend_type == BackendTypes.LUNAPARK:
client = APIClient
self._api_token = None
elif self.backend_type == BackendTypes.OVERLOAD:
client = OverloadClient
self._api_token = self.read_token(self.get_option("token_file", ""))
else:
raise RuntimeError("Backend type doesn't match any of the expected")
return client(base_url=self.get_option('api_address'),
writer_url=self.get_option('writer_endpoint', ""),
network_attempts=int(self.get_option('network_attempts', 60)),
api_attempts=int(self.get_option('api_attempts', 60)),
maintenance_attempts=int(self.get_option('maintenance_attempts', 10)),
network_timeout=int(self.get_option('network_timeout', 10)),
api_timeout=int(self.get_option('api_timeout', 10)),
maintenance_timeout=int(self.get_option('maintenance_timeout', 60)),
connection_timeout=int(self.get_option('connection_timeout', 30)),
user_agent=self._get_user_agent(),
api_token=self.api_token)
@property
def lp_job(self):
if self._lp_job is None:
self._lp_job = self.__get_lp_job()
return self._lp_job
def __get_lp_job(self):
api_client = self.__get_api_client()
info = self.generator_info
port = info.port
loadscheme = [] if isinstance(info.rps_schedule,
str) else info.rps_schedule
return LPJob(client=api_client,
target_host=self.target,
target_port=port,
number=self.get_option('jobno', ''),
token=self.get_option('upload_token', ''),
person=self.__get_operator(),
task=self.task,
name=self.get_option('job_name', 'none').decode('utf8'),
description=self.get_option('job_dsc', '').decode('utf8'),
tank=self.core.job.tank,
notify_list=self.get_option("notify", '').split(' '),
load_scheme=loadscheme,
log_data_requests=bool(int(self.get_option('log_data_requests', '0'))),
log_monitoring_requests=bool(int(self.get_option('log_monitoring_requests', '0'))),
log_status_requests=bool(int(self.get_option('log_status_requests', '0'))),
log_other_requests=bool(int(self.get_option('log_other_requests', '0'))), )
@property
def task(self):
if self._task is None:
task = self.get_option('task', '')
if task == 'dir':
task = self.search_task_from_cwd(os.getcwd())
self._task = task
return self._task
@property
def api_token(self):
if self._api_token == '':
if self.backend_type == BackendTypes.LUNAPARK:
self._api_token = None
elif self.backend_type == BackendTypes.OVERLOAD:
self._api_token = self.read_token(self.get_option("token_file", ""))
else:
raise RuntimeError("Backend type doesn't match any of the expected")
return self._api_token
@staticmethod
def read_token(filename):
if filename:
logger.debug("Trying to read token from %s", filename)
try:
with open(filename, 'r') as handle:
data = handle.read().strip()
logger.info(
"Read authentication token from %s, "
"token length is %d bytes", filename, len(str(data)))
except IOError:
logger.error(
"Failed to read Overload API token from %s", filename)
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
return data
else:
logger.error("Overload API token filename is not defined")
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
@property
def generator_info(self):
if self._generator_info is None:
self._generator_info = self.core.job.generator_plugin.get_info()
return self._generator_info
@property
def target(self):
if self._target is None:
self._target = self.generator_info.address
logger.info("Detected target: %s", self.target)
return self._target
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):
# type: (Plugin) -> object
AbstractInfoWidget.__init__(self)
self.owner = sender
def get_index(self):
return 1
def render(self, screen):
template = "Author: " + screen.markup.RED + "%s" + \
screen.markup.RESET + \
"%s\n Job: %s %s\n Task: %s %s\n Web: %s%s"
data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],
self.owner.lp_job.number, self.owner.lp_job.name, self.owner.lp_job.task,
# todo: task_name from api_client.get_task_data()
self.owner.lp_job.task, self.owner.lp_job.api_client.base_url,
self.owner.lp_job.number)
return template % data
class LPJob(object):
def __init__(
self,
client,
target_host,
target_port,
person,
task,
name,
description,
tank,
log_data_requests=False,
log_other_requests=False,
log_status_requests=False,
log_monitoring_requests=False,
number=None,
token=None,
is_alive=True,
notify_list=None,
version=None,
detailed_time=None,
load_scheme=None):
"""
:param client: APIClient
:param log_data_requests: bool
:param log_other_request: bool
:param log_status_requests: bool
:param log_monitoring_requests: bool
"""
assert bool(number) == bool(
token), 'Job number and upload token should come together'
self.log_other_requests = log_other_requests
self.log_data_requests = log_data_requests
self.log_status_requests = log_status_requests
self.log_monitoring_requests = log_monitoring_requests
self.name = name
self.tank = tank
self.target_host = target_host
self.target_port = target_port
self.person = person
self.task = task
self.is_alive = is_alive
self._number = number
self._token = token
self.api_client = client
self.notify_list = notify_list
self.description = description
self.version = version
self.detailed_time = detailed_time
self.load_scheme = load_scheme
self.is_finished = False
def push_test_data(self, data, stats):
if self.is_alive:
try:
self.api_client.push_test_data(
self.number, self.token, data, stats, trace=self.log_data_requests)
except (APIClient.NotAvailable, APIClient.NetworkError, APIClient.UnderMaintenance):
logger.warn('Failed to push test data')
self.is_alive = False
def edit_metainfo(
self,
instances=0,
ammo_path=None,
loop_count=None,
is_regression=None,
regression_component=None,
cmdline=None,
is_starred=False,
tank_type=1):
try:
self.api_client.edit_job_metainfo(jobno=self.number,
job_name=self.name,
job_dsc=self.description,
instances=instances,
ammo_path=ammo_path,
loop_count=loop_count,
version_tested=self.version,
is_regression=is_regression,
component=regression_component,
cmdline=cmdline,
is_starred=is_starred,
tank_type=tank_type,
trace=self.log_other_requests)
except (APIClient.NotAvailable, APIClient.StoppedFromOnline, APIClient.NetworkError,
APIClient.UnderMaintenance) as e:
logger.warn('Failed to edit job metainfo on Lunapark')
logger.warn(e.message)
@property
def number(self):
if not self._number:
self.create()
return self._number
@property
def token(self):
if not self._token:
self.create()
return self._token
def close(self, rc):
return self.api_client.close_job(
self.number, rc, trace=self.log_other_requests)
def create(self):
self._number, self._token = self.api_client.new_job(task=self.task,
person=self.person,
tank=self.tank,
loadscheme=self.load_scheme,
target_host=self.target_host,
target_port=self.target_port,
detailed_time=self.detailed_time,
notify_list=self.notify_list,
trace=self.log_other_requests)
logger.info('Job created: {}'.format(self._number))
def send_status(self, status):
if self._number and self.is_alive:
self.api_client.send_status(
self.number,
self.token,
status,
trace=self.log_status_requests)
def get_task_data(self, task):
return self.api_client.get_task_data(
task, trace=self.log_other_requests)
def send_config_snapshot(self, config):
try:
if self._number:
self.api_client.send_config_snapshot(
self.number, config, trace=self.log_other_requests)
except Exception:
logger.debug("Can't send config snapshot: %s", exc_info=True)
def push_monitoring_data(self, data):
if self.is_alive:
self.api_client.push_monitoring_data(
self.number, self.token, data, trace=self.log_monitoring_requests)
def lock_target(self, lock_target, lock_target_duration, ignore, strict):
lock_wait_timeout = 10
maintenance_timeouts = iter([0]) if ignore else iter(lambda: lock_wait_timeout, 0)
while True:
try:
self.api_client.lock_target(lock_target, lock_target_duration, trace=self.log_other_requests,
maintenance_timeouts=maintenance_timeouts,
maintenance_msg="Target is locked.\nManual unlock link: %s%s" % (
self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target)
))
return True
except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e:
logger.info('Target is not locked due to %s', e.message)
if ignore:
logger.info('ignore_target_locks = 1')
return False
elif strict:
raise e
else:
logger.info('strict_lock = 0')
return False
except APIClient.UnderMaintenance:
logger.info('Target is locked')
if ignore:
logger.info('ignore_target_locks = 1')
return False
logger.info("Manual unlock link: %s%s", self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target))
continue
def set_imbalance_and_dsc(self, rps, comment):
return self.api_client.set_imbalance_and_dsc(self.number, rps, comment)
def is_target_locked(self, host, strict):
while True:
try:
return self.api_client.is_target_locked(
host, trace=self.log_other_requests)
except APIClient.UnderMaintenance:
logger.info('Target is locked, retrying...')
continue
except (APIClient.StoppedFromOnline, APIClient.NotAvailable, APIClient.NetworkError):
logger.warn('Can\'t check whether target is locked\n')
if strict:
logger.warn('Stopping test due to strict_lock')
raise
else:
logger.warn('strict_lock is False, proceeding')
return {'status': 'ok'}

View File

@ -0,0 +1,32 @@
import pytest
from yandextank.plugins.DataUploader.plugin import BackendTypes
class TestBackendTypes(object):
@pytest.mark.parametrize('section_name, expected_type', [
('meta', BackendTypes.LUNAPARK),
('meta-01', BackendTypes.LUNAPARK),
('lp', BackendTypes.LUNAPARK),
('lp-01', BackendTypes.LUNAPARK),
('lunapark', BackendTypes.LUNAPARK),
('lunapark-1', BackendTypes.LUNAPARK),
('overload', BackendTypes.OVERLOAD),
('overload-01', BackendTypes.OVERLOAD)
])
def test_identify(self, section_name, expected_type):
assert BackendTypes.identify_backend(section_name) == expected_type
@pytest.mark.parametrize('section_name', [
'meta lunapark',
'meta ',
' lunapark',
'lp ',
'meta-'
])
def test_exception(self, section_name):
with pytest.raises(KeyError) as excinfo:
BackendTypes.identify_backend(section_name)
assert 'section name' in str(excinfo.value)

View File

@ -22,8 +22,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
""" JMeter tank plugin """
SECTION = 'jmeter'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.jmeter_process = None
self.args = None
self.original_jmx = None

View File

@ -17,8 +17,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
# pylint:disable=R0902
SECTION = 'json_report'
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self._is_telegraf = None
def get_available_options(self):
@ -53,7 +53,7 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
if data
]
def end_test(self, retcode):
def post_process(self, retcode):
self.data_and_stats_stream.close()
self.monitoring_stream.close()
return retcode

View File

@ -22,10 +22,10 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
SECTION = "maven"
def __init__(self, core):
def __init__(self, core, config_section):
# FIXME python version 2.7 does not support this syntax. super() should
# have arguments in Python 2
super().__init__(core)
super(Plugin, self).__init__(core, config_section)
self.maven_cmd = "mvn"
self.process = None
self.process_stderr = None

View File

@ -1,6 +1,7 @@
"""Target monitoring via SSH"""
import base64
import getpass
import hashlib
import logging
import os.path
import re
@ -206,7 +207,7 @@ class AgentClient(object):
class MonitoringCollector(object):
"""Aggregate data from several collectors"""
def __init__(self):
def __init__(self, disguise_hostnames):
self.config = None
self.default_target = None
self.agents = []
@ -220,6 +221,7 @@ class MonitoringCollector(object):
self.filter_mask = defaultdict(str)
self.ssh_timeout = 5
self.load_start_time = None
self.disguise_hostnames = disguise_hostnames
def add_listener(self, obj):
self.listeners.append(obj)
@ -265,10 +267,12 @@ class MonitoringCollector(object):
lines = block.split("\n")
for data in lines:
logger.debug("Got data from agent: %s", data.strip())
self.send_data.append(
self.filter_unused_data(
self.filter_conf, self.filter_mask, data))
self.hash_hostnames(
self.filter_unused_data(
self.filter_conf, self.filter_mask, data)))
logger.debug("Data after filtering: %s", self.send_data)
if not self.first_data_received and self.send_data:
@ -504,6 +508,21 @@ class MonitoringCollector(object):
else:
return ''
def hash_hostnames(self, data):
"""
'bus-receiver02g.load.maps.yandex.net;1491233043;659;83;480;21052.0820312;19541.8710938;476.0859375;87840.6210938;13228.0;8241.0;2.15557638238;1.15588878475;96.4698531709;0.0624804748516;39313;61537;0;8192;0.34;1.06;1.19;2;0;0;0;0;0'
'start;bus-receiver02g.load.maps.yandex.net;1491233263;Net_closewait;Net_estab;Net_timewait;'
"""
if not self.disguise_hostnames or not data:
return data
else:
data_entries = data.split(';')
if data_entries[0] == 'start':
data_entries[1] = hashlib.md5(data_entries[1]).hexdigest()
else:
data_entries[0] = hashlib.md5(data_entries[0]).hexdigest()
return ';'.join(data_entries)
class StdOutPrintMon(MonitoringDataListener):
"""Simple listener, writing data to stdout"""

View File

@ -24,13 +24,13 @@ class Plugin(AbstractPlugin):
SECTION = 'monitoring'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.jobno = None
self.default_target = None
self.config = None
self.process = None
self.monitoring = MonitoringCollector()
self.monitoring = MonitoringCollector(disguise_hostnames=bool(int(self.get_option('disguise_hostnames', '0'))))
self.die_on_fail = True
self.data_file = None
self.mon_saver = None
@ -47,7 +47,12 @@ class Plugin(AbstractPlugin):
"load_start_time = %s" % self.monitoring.load_start_time)
def get_available_options(self):
return ["config", "default_target", 'ssh_timeout']
return [
"config",
"default_target",
'ssh_timeout',
'disguise_hostnames'
]
def configure(self):
self.config = self.get_option("config", 'auto').strip()

View File

@ -1,319 +0,0 @@
import datetime
import json
import time
import urllib
import requests
import logging
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__) # pylint: disable=C0103
class OverloadClient(object):
def __init__(self):
self.address = None
self.token = None
self.upload_token = ''
self.api_token = ""
self.api_timeout = None
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({"User-Agent": "tank"})
if "https" in requests.utils.getproxies():
logger.info("Connecting via proxy %s" % requests.utils.getproxies()['https'])
self.session.proxies = requests.utils.getproxies()
else:
logger.info("Proxy not set")
def set_api_address(self, addr):
self.address = addr
def set_api_timeout(self, timeout):
self.api_timeout = float(timeout)
def set_api_token(self, api_token):
self.api_token = api_token
def get_raw(self, addr):
if not self.address:
raise ValueError("Can't request unknown address")
addr = self.address + addr
logger.debug("Making request to: %s", addr)
req = requests.Request('GET', addr)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
resp_data = resp.content.strip()
logger.debug("Raw response: %s", resp_data)
return resp_data
def get(self, addr):
resp = self.get_raw(addr)
response = json.loads(resp)
logger.debug("Response: %s", response)
return response
def post_raw(self, addr, txt_data):
if not self.address:
raise ValueError("Can't request unknown address")
addr = self.address + addr
logger.debug("Making POST request to: %s", addr)
req = requests.Request("POST", addr, data=txt_data)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
logger.debug("Response: %s", resp.content)
return resp.content
def post(self, addr, data):
addr = self.address + addr
json_data = json.dumps(data, indent=2)
logger.debug("Making POST request to: %s\n%s", addr, json_data)
req = requests.Request("POST", addr, data=json_data)
prepared = self.session.prepare_request(req)
resp = self.session.send(prepared, timeout=self.api_timeout)
resp.raise_for_status()
logger.debug("Response: %s", resp.content)
return resp.json()
def get_task_data(self, task):
return self.get("api/task/" + task + "/summary.json")
def new_job(
self, task, person, tank, target_host, target_port, loadscheme,
detailed_time, notify_list):
data = {
'task': task,
'person': person,
'tank': tank,
'host': target_host,
'port': target_port,
'loadscheme': loadscheme,
'detailed_time': detailed_time,
'notify': notify_list,
}
logger.debug("Job create request: %s", data)
while True:
try:
response = self.post(
"api/job/create.json?api_token=" + self.api_token, data)
self.upload_token = response[0].get('upload_token', '')
return response[0]['job']
except requests.exceptions.HTTPError as ex:
logger.debug("Got error for job create request: %s", ex)
if ex.response.status_code == 423:
logger.warn(
"Overload is under maintenance, will retry in 5s...")
time.sleep(5)
else:
raise ex
raise RuntimeError("Unreachable point hit")
def get_job_summary(self, jobno):
result = self.get(
'api/job/' + str(jobno) + "/summary.json?api_token=" +
self.api_token)
return result[0]
def close_job(self, jobno, retcode):
params = {
'exitcode': str(retcode),
'api_token': self.api_token,
}
result = self.get(
'api/job/' + str(jobno) + '/close.json?' + urllib.urlencode(params))
return result[0]['success']
def edit_job_metainfo(
self, jobno, job_name, job_dsc, instances, ammo_path, loop_count,
version_tested, is_regression, component, tank_type, cmdline,
is_starred):
data = {
'name': job_name,
'description': job_dsc,
'instances': str(instances),
'ammo': ammo_path,
'loop': loop_count,
'version': version_tested,
'regression': str(is_regression),
'component': component,
'tank_type': int(tank_type),
'command_line': cmdline,
'starred': int(is_starred),
}
response = self.post(
'api/job/' + str(jobno) + "/edit.json?api_token=" + self.api_token,
data)
return response
def set_imbalance_and_dsc(self, jobno, rps, comment):
data = {}
if rps:
data['imbalance'] = rps
if comment:
data['description'] = comment.strip()
response = self.post(
'api/job/' + str(jobno) + "/set_imbalance.json?api_token=" +
self.api_token, data)
return response
def second_data_to_push_item(self, data, stat, timestamp, overall, case):
"""
@data: SecondAggregateDataItem
"""
api_data = {
'overall': overall,
'case': case,
'net_codes': [],
'http_codes': [],
'time_intervals': [],
'trail': {
'time': str(timestamp),
'reqps': stat["metrics"]["reqps"],
'resps': data["interval_real"]["len"],
'expect': data["interval_real"]["total"] / 1000.0 /
data["interval_real"]["len"],
'disper': 0,
'self_load':
0, # TODO abs(round(100 - float(data.selfload), 2)),
'input': data["size_in"]["total"],
'output': data["size_out"]["total"],
'connect_time': data["connect_time"]["total"] / 1000.0 /
data["connect_time"]["len"],
'send_time':
data["send_time"]["total"] / 1000.0 / data["send_time"]["len"],
'latency':
data["latency"]["total"] / 1000.0 / data["latency"]["len"],
'receive_time': data["receive_time"]["total"] / 1000.0 /
data["receive_time"]["len"],
'threads': stat["metrics"]["instances"], # TODO
}
}
for q, value in zip(
data["interval_real"]["q"]["q"],
data["interval_real"]["q"]["value"]):
api_data['trail']['q' + str(q)] = value / 1000.0
for code, cnt in data["net_code"]["count"].iteritems():
api_data['net_codes'].append({'code': int(code), 'count': int(cnt)})
for code, cnt in data["proto_code"]["count"].iteritems():
api_data['http_codes'].append({
'code': int(code),
'count': int(cnt)
})
api_data['time_intervals'] = self.convert_hist(
data["interval_real"]["hist"])
return api_data
def convert_hist(self, hist):
data = hist['data']
bins = hist['bins']
return [
{
"from": 0, # deprecated
"to": b / 1000.0,
"count": count,
} for b, count in zip(bins, data)
]
def push_test_data(self, jobno, data_item, stat_item):
items = []
uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(
jobno, self.upload_token)
ts = datetime.datetime.fromtimestamp(data_item["ts"])
for case_name, case_data in data_item["tagged"].iteritems():
if case_name == "":
case_name = "__EMPTY__"
if (len(case_name)) > 128:
raise RuntimeError('tag (case) name is too long: ' + case_name)
push_item = self.second_data_to_push_item(
case_data, stat_item, ts, 0, case_name)
items.append(push_item)
overall = self.second_data_to_push_item(
data_item["overall"], stat_item, ts, 1, '')
items.append(overall)
while True:
try:
res = self.post(uri, items)
break
except requests.exceptions.HTTPError as ex:
if ex.response.status_code == 400:
logger.error('Bad request to %s: %s', uri, ex)
return 0
elif ex.response.status_code == 410:
logger.info("Test has been stopped by Overload server")
return 0
else:
logger.warn(
"Unknown HTTP error while sending second data. "
"Retry in 10 sec: %s", ex)
time.sleep(10) # FIXME this makes all plugins freeze
except requests.exceptions.RequestException as ex:
logger.warn(
"Failed to push second data to API,"
" retry in 10 sec: %s", ex)
time.sleep(10) # FIXME this makes all plugins freeze
except Exception: # pylint: disable=W0703
# something nasty happened, but we don't want to fail here
logger.exception(
"Unknown exception while pushing second data to API")
return 0
try:
success = int(res[0]['success'])
except Exception: # pylint: disable=W0703
logger.warning("Malformed response from API: %s", res)
success = 0
return success
def push_monitoring_data(self, jobno, send_data):
if send_data:
addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (
jobno, self.upload_token)
while True:
try:
self.post_raw(addr, send_data)
return
except requests.exceptions.HTTPError as ex:
if ex.response.status_code == 400:
logger.error('Bad request to %s: %s', addr, ex)
break
elif ex.response.status_code == 410:
logger.info("Test has been stopped by Overload server")
return
else:
logger.warning(
'Unknown http code while sending monitoring data,'
' retry in 10s: %s', ex)
time.sleep(10) # FIXME this makes all plugins freeze
except requests.exceptions.RequestException as ex:
logger.warning(
'Problems sending monitoring data,'
' retry in 10s: %s', ex)
time.sleep(10) # FIXME this makes all plugins freeze
except Exception: # pylint: disable=W0703
# something irrecoverable happened
logger.exception(
"Unknown exception while pushing monitoring data to API")
return
def send_console(self, jobno, console):
logger.debug(
"Sending console view [%s]: %s", len(console), console[:64])
addr = ("api/job/%s/console.txt?api_token=" % jobno) + self.api_token,
self.post_raw(addr, {"console": console, })
def send_config_snapshot(self, jobno, config):
logger.debug("Sending config snapshot")
addr = ("api/job/%s/configinfo.txt?api_token=" % jobno) + self.api_token
self.post_raw(addr, {"configinfo": config, })

View File

@ -1,374 +0,0 @@
# TODO: make the next two lines unnecessary
# pylint: disable=line-too-long
# pylint: disable=missing-docstring
import StringIO
import copy
import json
import logging
import os
import pwd
import socket
import sys
from .client import OverloadClient
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from ..JMeter import Plugin as JMeterPlugin
from ..Monitoring import Plugin as MonitoringPlugin
# from ..Pandora import Plugin as PandoraPlugin
from ..Phantom import Plugin as PhantomPlugin
from ...common.interfaces import AbstractPlugin,\
MonitoringDataListener, AggregateResultListener, AbstractInfoWidget
logger = logging.getLogger(__name__) # pylint: disable=C0103
class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
"""
Yandex Overload analytics service client (https://overload.yandex.net)
"""
SECTION = 'overload'
RC_STOP_FROM_WEB = 8
def __init__(self, core):
super(Plugin, self).__init__(core)
self.locks_list_dict = {}
self.api_client = OverloadClient()
self.jobno = None
self.operator = ''
self.retcode = -1
self.copy_config = None
self.jobno_file = None
self.target = None
self.lock_target_duration = None
self.locks_list_cfg = None
self.task = None
self.job_name = None
self.job_dsc = None
self.notify_list = None
self.version_tested = None
self.regression_component = None
self.is_regression = None
self.ignore_target_lock = None
self.port = None
self.mon = None
@staticmethod
def get_key():
return __file__
def get_available_options(self):
opts = [
"api_address",
"task",
"job_name",
"job_dsc",
"notify",
"ver",
]
opts += [
"component",
"regress",
"operator",
"copy_config_to",
"jobno_file",
]
opts += ["token_file"]
return opts
@staticmethod
def read_token(filename):
if filename:
logger.debug("Trying to read token from %s", filename)
try:
with open(filename, 'r') as handle:
data = handle.read().strip()
logger.info(
"Read authentication token from %s, "
"token length is %d bytes", filename, len(str(data)))
except IOError:
logger.error(
"Failed to read Overload API token from %s", filename)
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
return data
else:
logger.error("Overload API token filename is not defined")
logger.info(
"Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"
)
raise RuntimeError("API token error")
def configure(self):
aggregator = self.core.job.aggregator_plugin
aggregator.add_result_listener(self)
self.api_client.set_api_address(self.get_option("api_address"))
self.api_client.set_api_timeout(self.get_option("api_timeout", 30))
self.api_client.set_api_token(
self.read_token(self.get_option("token_file", "")))
self.task = self.get_option("task", "DEFAULT")
self.job_name = unicode(
self.get_option("job_name", "none").decode("utf8"))
if self.job_name == "ask" and sys.stdin.isatty():
self.job_name = unicode(
raw_input("Please, enter job_name: ").decode("utf8"))
self.job_dsc = unicode(self.get_option("job_dsc", "").decode("utf8"))
if self.job_dsc == "ask" and sys.stdin.isatty():
self.job_dsc = unicode(
raw_input("Please, enter job_dsc: ").decode("utf8"))
self.notify_list = self.get_option("notify", "").split(" ")
self.version_tested = unicode(self.get_option("ver", ""))
self.regression_component = unicode(self.get_option("component", ""))
self.is_regression = self.get_option("regress", "0")
self.operator = self.get_option("operator", self.operator)
if not self.operator:
try:
# Clouds and some virtual envs may fail this
self.operator = pwd.getpwuid(os.geteuid())[0]
except:
logger.warning('failed to getpwuid.', exc_into=True)
self.operator = 'unknown'
self.copy_config = self.get_option("copy_config_to", "")
self.jobno_file = self.get_option("jobno_file", "")
if self.core.job.monitoring_plugin:
self.mon = self.core.job.monitoring_plugin
if self.mon.monitoring:
self.mon.monitoring.add_listener(self)
self.__save_conf()
def prepare_test(self):
try:
console = self.core.get_plugin_of_type(ConsolePlugin)
except KeyError:
logger.debug("Console plugin not found", exc_info=True)
console = None
if console:
console.add_info_widget(JobInfoWidget(self))
console.remote_translator = self
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
info = phantom.get_info()
self.target = info.address
except (KeyError, AttributeError) as ex:
logger.debug("No phantom plugin to get target info: %s", ex)
self.target = socket.getfqdn()
self.__save_conf()
def start_test(self):
try:
phantom = self.core.get_plugin_of_type(PhantomPlugin)
info = phantom.get_info()
self.target = info.address
port = info.port
instances = info.instances
tank_type = 1 if info.tank_type == "http" else 2
# FIXME why don't we use resource_opener here?
if info.ammo_file.startswith(
"http://") or info.ammo_file.startswith("https://"):
ammo_path = info.ammo_file
else:
ammo_path = os.path.realpath(info.ammo_file)
loadscheme = [] if isinstance(
info.rps_schedule, unicode) else info.rps_schedule
loop_count = info.loop_count
except (KeyError, AttributeError) as ex:
logger.debug("No phantom plugin to get target info: %s", ex)
self.target = socket.getfqdn()
port = 80
instances = 1
tank_type = 1
ammo_path = ''
loadscheme = []
loop_count = 0
try:
jmeter = self.core.get_plugin_of_type(JMeterPlugin)
ammo_path = jmeter.original_jmx
except KeyError as ex:
logger.debug("No jmeter plugin to get info: %s", ex)
# try:
# pandora = self.core.get_plugin_of_type(PandoraPlugin)
# # TODO: get info from Pandora here
# except KeyError as ex:
# logger.debug("No pandora plugin to get info: %s", ex)
detailed_field = "interval_real"
logger.info("Detected target: %s", self.target)
self.jobno = self.api_client.new_job(
self.task, self.operator,
socket.getfqdn(), self.target, port, loadscheme, detailed_field,
self.notify_list)
web_link = "%s%s" % (self.api_client.address, self.jobno)
logger.info("Web link: %s", web_link)
self.publish("jobno", self.jobno)
self.publish("web_link", web_link)
self.make_symlink(self.jobno)
self.set_option("jobno", str(self.jobno))
if self.jobno_file:
logger.debug("Saving jobno to: %s", self.jobno_file)
fdes = open(self.jobno_file, 'w')
fdes.write(str(self.jobno))
fdes.close()
self.api_client.edit_job_metainfo(
self.jobno, self.job_name, self.job_dsc, instances, ammo_path,
loop_count, self.version_tested, self.is_regression,
self.regression_component, tank_type, " ".join(sys.argv), 0)
self.__save_conf()
def is_test_finished(self):
return self.retcode
def end_test(self, retcode):
self.__save_conf()
return retcode
def post_process(self, rc):
if self.jobno:
try:
self.api_client.close_job(self.jobno, rc)
except Exception: # pylint: disable=W0703
logger.warning("Failed to close job", exc_info=True)
logger.info("Web link: %s%s", self.api_client.address, self.jobno)
autostop = None
try:
autostop = self.core.get_plugin_of_type(AutostopPlugin)
except KeyError:
logger.debug("No autostop plugin loaded", exc_info=True)
if autostop and autostop.cause_criterion:
rps = 0
if autostop.cause_criterion.cause_second:
rps = autostop.cause_criterion.cause_second[1]["metrics"][
"reqps"]
if not rps:
rps = autostop.cause_criterion.cause_second[0][
"overall"]["interval_real"]["len"]
self.api_client.set_imbalance_and_dsc(
self.jobno, rps, autostop.cause_criterion.explain())
else:
logger.debug("No autostop cause detected")
self.__save_conf()
return rc
def __send_data(self, data_item, stat_item):
if self.retcode < 0 and not self.api_client.push_test_data(
self.jobno, data_item, stat_item):
logger.warn("The test was stopped from Web interface")
self.retcode = self.RC_STOP_FROM_WEB
def on_aggregated_data(self, data, stats):
"""
@data: aggregated data
@stats: stats about gun
"""
if not self.jobno:
logger.warning("No jobNo gained yet")
return
self.__send_data(data, stats)
def monitoring_data(self, data_list):
if not self.jobno:
logger.debug("No jobNo gained yet")
return
if self.retcode < 0:
if "Telegraf" in self.core.job.monitoring_plugin.__module__:
self.api_client.push_monitoring_data(
self.jobno, json.dumps(data_list))
elif "Monitoring" in self.core.job.monitoring_plugin.__module__:
[
self.api_client.push_monitoring_data(self.jobno, data)
for data in data_list if data
]
else:
logger.warn("The test was stopped from Web interface")
def __save_conf(self):
if self.copy_config:
self.core.config.flush(self.copy_config)
config = copy.copy(self.core.config.config)
try:
mon = self.core.get_plugin_of_type(MonitoringPlugin)
config_filename = mon.config
if config_filename and config_filename not in ['none', 'auto']:
with open(config_filename) as config_file:
config.set(
MonitoringPlugin.SECTION, "config_contents",
config_file.read())
except Exception: # pylint: disable=W0703
logger.debug("Can't get monitoring config", exc_info=True)
output = StringIO.StringIO()
config.write(output)
if self.jobno:
try:
self.api_client.send_config_snapshot(
self.jobno, output.getvalue())
except Exception: # pylint: disable=W0703
logger.debug("Can't send config snapshot: %s", exc_info=True)
def send_console(self, text):
try:
self.api_client.send_console(self.jobno, text)
except Exception: # pylint: disable=W0703
logger.debug("Can't send console snapshot: %s", exc_info=True)
def make_symlink(self, name):
PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, self.SECTION)
if not os.path.exists(PLUGIN_DIR):
os.makedirs(PLUGIN_DIR)
os.symlink(self.core.artifacts_dir, os.path.join(PLUGIN_DIR, str(name)))
def _core_with_tank_api(self):
"""
Return True if we are running under Tank API
"""
api_found = False
try:
import yandex_tank_api.worker # pylint: disable=F0401
except ImportError:
logger.debug("Attempt to import yandex_tank_api.worker failed")
else:
api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)
logger.debug(
"We are%s running under API server", ""
if api_found else " likely not")
return api_found
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):
AbstractInfoWidget.__init__(self)
self.owner = sender
def get_index(self):
return 1
def render(self, screen):
template = "Author: " + screen.markup.RED + "%s" + \
screen.markup.RESET + \
"%s\n Job: %s %s\n Web: %s%s"
data = (
self.owner.operator[:1], self.owner.operator[1:], self.owner.jobno,
self.owner.job_name, self.owner.api_client.address,
self.owner.jobno)
return template % data

View File

@ -3,9 +3,9 @@ import logging
import subprocess
import time
from ...common.interfaces import AbstractPlugin, AbstractInfoWidget, GeneratorPlugin
from ...common.interfaces import AbstractPlugin, \
AbstractInfoWidget, GeneratorPlugin
from .config import PoolConfig, PandoraConfig, parse_schedule
from .reader import PandoraStatsReader
from ..Aggregator import Plugin as AggregatorPlugin
from ..Console import Plugin as ConsolePlugin
@ -16,19 +16,21 @@ logger = logging.getLogger(__name__)
class Plugin(AbstractPlugin, GeneratorPlugin):
''' Pandora load generator plugin '''
''' Pandora load generator plugin '''
OPTION_CONFIG = "config"
SECTION = "pandora"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
self.buffered_seconds = 2
self.enum_ammo = False
self.process = None
self.process_stderr = None
self.process_start_time = None
self.custom_config = False
self.sample_log = "./phout.log"
self.expvar = True
@staticmethod
def get_key():
@ -36,77 +38,43 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
def get_available_options(self):
opts = [
"pandora_cmd", "buffered_seconds", "ammo", "loop", "sample_log",
"config_file", "startup_schedule", "user_schedule", "gun_type",
"custom_config"
"pandora_cmd", "buffered_seconds",
"config_content", "config_file",
"expvar"
]
return opts
def configure(self):
self.custom_config = self.get_option("custom_config", "0") == "1"
self.expvar = self.get_option("expvar", "1") == "1"
self.pandora_cmd = self.get_option("pandora_cmd", "pandora")
self.buffered_seconds = int(
self.get_option("buffered_seconds", self.buffered_seconds))
pool_config = PoolConfig()
ammo = self.get_option("ammo", "")
if ammo:
pool_config.set_ammo(ammo)
loop_limit = int(self.get_option("loop", "0"))
pool_config.set_loop(loop_limit)
self.sample_log = self.get_option("sample_log", "")
if not self.sample_log:
self.sample_log = self.core.mkstemp(".samples", "results_")
with open(self.sample_log, 'w'):
pass
self.core.add_artifact_file(self.sample_log)
pool_config.set_sample_log(self.sample_log)
startup_schedule = self.get_option("startup_schedule", "")
if startup_schedule:
pool_config.set_startup_schedule(parse_schedule(startup_schedule))
else:
raise RuntimeError("startup_schedule not specified")
user_schedule = self.get_option("user_schedule", "")
if user_schedule:
pool_config.set_user_schedule(parse_schedule(user_schedule))
else:
raise RuntimeError("user_schedule not specified")
shared_schedule = bool(int(self.get_option("shared_schedule", "1")))
pool_config.set_shared_schedule(shared_schedule)
target = self.get_option("target", "localhost:3000")
pool_config.set_target(target)
gun_type = self.get_option("gun_type", "http")
if gun_type == 'https':
pool_config.set_ssl(True)
logger.info("SSL is on")
gun_type = "http"
logger.info("Pandora gun type is: %s", gun_type)
pool_config.set_gun_type(gun_type)
ammo_type = self.get_option("ammo_type", "jsonline/http")
logger.info("Pandora ammo type is: %s", ammo_type)
pool_config.set_ammo_type(ammo_type)
self.pandora_config = PandoraConfig()
self.pandora_config.add_pool(pool_config)
self.pandora_config_file = self.get_option("config_file", "")
if not self.pandora_config_file:
if self.custom_config:
raise RuntimeError(
"You said you would like to use custom config,"
" but you didn't specify it")
config_content = self.get_option("config_content", "")
if config_content:
self.pandora_config_file = self.core.mkstemp(
".json", "pandora_config_")
self.core.add_artifact_file(self.pandora_config_file)
if not self.custom_config:
self.core.add_artifact_file(self.pandora_config_file)
with open(self.pandora_config_file, 'w') as config_file:
config_file.write(self.pandora_config.json())
config_file.write(config_content)
else:
config_file = self.get_option("config_file", "")
if not config_file:
raise RuntimeError(
"neither pandora config content"
"nor pandora config file is specified")
else:
extension = config_file.rsplit(".", 1)[1]
self.pandora_config_file = self.core.mkstemp(
"." + extension, "pandora_config_")
self.core.add_artifact_file(self.pandora_config_file)
with open(config_file, 'rb') as config:
config_content = config.read()
with open(self.pandora_config_file, 'wb') as config_file:
config_file.write(config_content)
def prepare_test(self):
aggregator = None
@ -120,7 +88,7 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
"Linking sample and stats readers to aggregator. Reading samples from %s",
self.sample_log)
aggregator.reader = PhantomReader(self.sample_log)
aggregator.stats_reader = PandoraStatsReader()
aggregator.stats_reader = PandoraStatsReader(self.expvar)
try:
console = self.core.get_plugin_of_type(ConsolePlugin)

View File

@ -7,8 +7,21 @@ logger = logging.getLogger(__name__)
class PandoraStatsReader(object):
# TODO: maybe make stats collection asyncronous
def __init__(self, expvar):
self.closed = False
self.expvar = expvar
def next(self):
if self.closed:
raise StopIteration()
if not self.expvar:
return [{
'ts': int(time.time() - 1),
'metrics': {
'instances': 0,
'reqps': 0
}
}]
try:
pandora_response = requests.get("http://localhost:1234/debug/vars")
pandora_stat = pandora_response.json()
@ -40,7 +53,7 @@ class PandoraStatsReader(object):
}]
def close(self):
pass
self.closed = True
def __iter__(self):
return self

View File

@ -26,8 +26,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
OPTION_CONFIG = "config"
SECTION = PhantomConfig.SECTION
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.config = None
self.process = None

View File

@ -19,8 +19,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.hosts = []
self.port = None
self.logfile = None

View File

@ -5,8 +5,8 @@ from ...common.interfaces import AbstractPlugin
class Plugin(AbstractPlugin):
SECTION = 'rcassert'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.ok_codes = []
self.fail_code = 10

View File

@ -16,9 +16,9 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__
def __init__(self, core):
def __init__(self, core, config_section):
''' Constructor '''
AbstractPlugin.__init__(self, core)
AbstractPlugin.__init__(self, core, config_section)
self.interval = "10s"
self.disk_limit = 2048 # 2 GB
self.mem_limit = 512 # 0.5 GB

View File

@ -12,8 +12,8 @@ class Plugin(AbstractPlugin):
'''
SECTION = 'shellexec'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
self.catch_out = False
self.end = None
self.poll = None

View File

@ -31,8 +31,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
SECTION = "svgreport"
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section):
super(Plugin, self).__init__(core, config_section)
def get_available_options(self):
return [_REPORT_FILE_OPTION]

View File

@ -1,8 +1,11 @@
"""Monitoring collector """
import hashlib
import logging
import sys
import time
import copy
from ...common.interfaces import MonitoringDataListener
from ..Telegraf.client import SSHClient, LocalhostClient
@ -26,14 +29,15 @@ class MonitoringCollector(object):
"""
def __init__(self):
def __init__(self, disguise_hostnames):
self.disguise_hostnames = disguise_hostnames
self.config = None
self.default_target = None
self.agents = []
self.agent_sessions = []
self.listeners = []
self.first_data_received = False
self.send_data = []
self.__collected_data = []
self.artifact_files = []
self.load_start_time = None
self.config_manager = ConfigManager()
@ -92,21 +96,21 @@ class MonitoringCollector(object):
ready_to_send = {
"timestamp": int(ts),
"data": {
agent.host: {
self.hash_hostname(agent.host): {
"comment": agent.config.comment,
"metrics": prepared_results
}
}
}
self.send_data.append(ready_to_send)
self.__collected_data.append(ready_to_send)
logger.debug(
'Polling/decoding agents data took: %.2fms',
(time.time() - start_time) * 1000)
collected_data_length = len(self.send_data)
collected_data_length = len(self.__collected_data)
if not self.first_data_received and self.send_data:
if not self.first_data_received and self.__collected_data:
self.first_data_received = True
logger.info("Monitoring received first data.")
else:
@ -130,11 +134,25 @@ class MonitoringCollector(object):
def send_collected_data(self):
"""sends pending data set to listeners"""
[
listener.monitoring_data(self.send_data)
for listener in self.listeners
]
self.send_data = []
data = self.__collected_data
self.__collected_data = []
for listener in self.listeners:
# deep copy to ensure each listener gets it's own copy
listener.monitoring_data(copy.deepcopy(data))
def not_empty(self):
return len(self.__collected_data) > 0
def send_rest_data(self):
while self.not_empty():
logger.info("Sending monitoring data rests...")
self.send_collected_data()
def hash_hostname(self, host):
if self.disguise_hostnames and host:
return hashlib.md5(host).hexdigest()
else:
return host
class StdOutPrintMon(MonitoringDataListener):

View File

@ -9,6 +9,9 @@ import json
import logging
import os
import time
from copy import deepcopy
from ...common.resource import manager as resource
from ...common.interfaces import MonitoringDataListener, \
AbstractPlugin, AbstractInfoWidget
@ -32,15 +35,15 @@ class Plugin(AbstractPlugin):
SECTION = 'telegraf' # may be redefined to 'monitoring' sometimes.
def __init__(self, core):
super(Plugin, self).__init__(core)
def __init__(self, core, config_section=None):
super(Plugin, self).__init__(core, config_section)
self.jobno = None
self.default_target = None
self.default_config = "{path}/config/monitoring_default_config.xml".format(
path=os.path.dirname(__file__))
self.config = None
self.process = None
self.monitoring = MonitoringCollector()
self.monitoring = MonitoringCollector(disguise_hostnames=bool(int(self.get_option('disguise_hostnames', '0'))))
self.die_on_fail = True
self.data_file = None
self.mon_saver = None
@ -56,7 +59,12 @@ class Plugin(AbstractPlugin):
"load_start_time = %s", self.monitoring.load_start_time)
def get_available_options(self):
return ["config", "default_target", "ssh_timeout"]
return [
"config",
"default_target",
"ssh_timeout",
"disguise_hostnames"
]
def __detect_configuration(self):
"""
@ -222,9 +230,7 @@ class Plugin(AbstractPlugin):
for log in self.monitoring.artifact_files:
self.core.add_artifact_file(log)
while self.monitoring.send_data:
logger.info("Sending monitoring data rests...")
self.monitoring.send_collected_data()
self.monitoring.send_rest_data()
if self.mon_saver:
self.mon_saver.close()
return retcode
@ -325,13 +331,12 @@ class MonitoringWidget(AbstractInfoWidget, MonitoringDataListener):
return "Monitoring is " + screen.markup.RED + "offline" + screen.markup.RESET
else:
res = "Monitoring is " + screen.markup.GREEN + \
"online" + screen.markup.RESET + ":\n"
"online" + screen.markup.RESET + ":\n"
for hostname, metrics in self.data.items():
tm_stamp = datetime.datetime.fromtimestamp(
float(self.time[hostname])).strftime('%H:%M:%S')
res += (
" " + screen.markup.CYAN + "%s" + screen.markup.RESET +
" at %s:\n") % (hostname, tm_stamp)
res += (" " + screen.markup.CYAN + "%s" + screen.markup.RESET +
" at %s:\n") % (hostname, tm_stamp)
for metric, value in sorted(metrics.iteritems()):
if self.sign[hostname][metric] > 0:
value = screen.markup.YELLOW + value + screen.markup.RESET
@ -365,10 +370,11 @@ class AbstractMetricCriterion(AbstractCriterion, MonitoringDataListener):
self.last_second = None
self.seconds_count = 0
def monitoring_data(self, block):
def monitoring_data(self, _block):
if self.triggered:
return
block = deepcopy(_block)
for chunk in block:
host = chunk['data'].keys()[0]
data = chunk['data'][host]['metrics']

View File

@ -16,8 +16,8 @@ class Plugin(AbstractPlugin, AbstractInfoWidget):
'''
SECTION = 'tips'
def __init__(self, core):
AbstractPlugin.__init__(self, core)
def __init__(self, core, config_section):
AbstractPlugin.__init__(self, core, config_section)
AbstractInfoWidget.__init__(self)
self.lines = [
l.decode('utf-8')