Merge branch 'develop' into patch-1

This commit is contained in:
Alexey Lavrenuke 2017-04-10 17:57:30 +03:00 committed by GitHub
commit e8d9544d7a
12 changed files with 218 additions and 165 deletions

View File

@ -1,4 +1,4 @@
# Yandex Tank [![Build Status](https://travis-ci.org/yandex/yandex-tank.svg?branch=master)](https://travis-ci.org/yandex/yandex-tank) [![gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
# Yandex Tank [![Build Status](https://travis-ci.org/yandex/yandex-tank.svg?branch=master)](https://travis-ci.org/yandex/yandex-tank) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Yandex.Tank is an extensible open source load testing tool for advanced linux users which is especially good as a part of an automated load testing suite
@ -22,6 +22,9 @@ Yandex.Tank is an extensible open source load testing tool for advanced linux us
- [Stackoverflow](https://stackoverflow.com/) use `load-testing` + `yandex` tags
## Get help
Chat with authors and other performance specialists: [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/yandex/yandex-tank?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
## See also
- [Overload𝛃](https://overload.yandex.net/) performance analytics server

View File

@ -105,6 +105,41 @@ Example:
except Exception, ex:
logger.error('Error trying to perform a test: %s', ex)
exit codes
==========
.. code-block:: json
{
"0": "completed",
"1": "interrupted_generic_interrupt",
"2": "interrupted",
"3": "interrupted_active_task_not_found ",
"4": "interrupted_no_ammo_file",
"5": "interrupted_address_not_specified",
"6": "interrupted_cpu_or_disk_overload",
"7": "interrupted_unknown_config_parameter",
"8": "interrupted_stop_via_web",
"9": "interrupted",
"11": "interrupted_job_number_error",
"12": "interrupted_phantom_error",
"13": "interrupted_job_metainfo_error",
"14": "interrupted_target_monitoring_error",
"15": "interrupted_target_info_error",
"21": "autostop_time",
"22": "autostop_http",
"23": "autostop_net",
"24": "autostop_instances",
"25": "autostop_total_time",
"26": "autostop_total_http",
"27": "autostop_total_net",
"28": "autostop_negative_http",
"29": "autostop_negative_net",
"30": "autostop_http_trend",
"31": "autostop_metric_higher",
"32": "autostop_metric_lower"
}
***************
Load Generators
***************
@ -691,40 +726,40 @@ Disable phantom first (unless you really want to keep it active alongside at you
; Pandora config section:
[pandora]
; ammo file name
ammo=ammo.jsonline
; Pandora executable path
pandora_cmd=/usr/bin/pandora
; loop limit
loop=1000
; Enable/disable expvar monitoring
expvar = 1 ; default
; each user will maintain this schedule
user_schedule = periodic(1, 1, 100)
; Pandora config contents (json)
config_content = {
"pools": [
{
"name": "dummy pool",
"gun": {"type": "log"},
"ammo": {
"type": "dummy/log",
"AmmoLimit": 10000000
},
"result": {
"type": "log/phout",
"destination": "./phout.log"
},
"shared-limits": false,
"user-limiter": {
"type": "unlimited"
},
"startup-limiter": {
"type": "periodic",
"batch": 1,
"max": 5,
"period": "0.5s"
}
}]}
; users are started using this schedule
startup_schedule = periodic(1, 1, 100)
; if shared_schedule is false, then each user is independent,
; in other case they all hold to a common schedule
shared_schedule = 0
; target host and port
target=localhost:3000
Ammo format
-----------
Pandora currently supports only one ammo format: ``jsonline``, i.e. one json doc per line.
Example:
::
{"uri": "/00", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"uri": "/01", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"tag": "mytag", "uri": "/02", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
{"uri": "/03", "method": "GET", "headers": {"Host": "example.org", "User-Agent": "Pandora/0.0.1"}, "host": "example.org"}
Each json doc describes an HTTP request. Some of them may have a tag field, it will be used as other tags in other ammo formats.
; OR config file (yaml or json)
config_file = pandora_config.yml
Schedules
---------
@ -804,7 +839,7 @@ Example:
::
[tank]
; plugin is disabled by default, enable it:
plugin_overload=yandextank.plugins.Overload
plugin_uploader=yandextank.plugins.DataUploader overload
[overload]
token_file=token.txt

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='yandextank',
version='1.8.33',
version='1.8.34',
description='a performance measurement tool',
longer_description='''
Yandex.Tank is a performance measurement and load testing automatization tool.

View File

@ -89,7 +89,7 @@ class TankCore(object):
def __init__(self, artifacts_base_dir=None, artifacts_dir_name=None):
self.config = ConfigManager()
self.status = {}
self.plugins = []
self.plugins = {}
self.artifacts_dir_name = artifacts_dir_name
self._artifacts_dir = None
self.artifact_files = {}
@ -173,6 +173,12 @@ class TankCore(object):
raise ValueError(
"Couldn't convert plugin path to new format:\n %s" %
plugin_path)
if plugin_path is "yandextank.plugins.Overload":
logger.warning(
"Deprecated plugin name: 'yandextank.plugins.Overload'\n"
"There is a new generic plugin now.\n"
"Correcting to 'yandextank.plugins.DataUploader overload'")
plugin_path = "yandextank.plugins.DataUploader overload"
try:
plugin = il.import_module(plugin_path)
except ImportError:
@ -206,7 +212,7 @@ class TankCore(object):
instance = getattr(
plugin, plugin_path.split('.')[-1] + 'Plugin')(self)
self.plugins.append(instance)
self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)
logger.debug("Plugin instances: %s", self.plugins)
@ -252,7 +258,7 @@ class TankCore(object):
generator_plugin=gen,
tank=socket.getfqdn())
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Configuring %s", plugin)
plugin.configure()
self.config.flush()
@ -263,7 +269,7 @@ class TankCore(object):
""" Call prepare_test() on all plugins """
logger.info("Preparing test...")
self.publish("core", "stage", "prepare")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Preparing %s", plugin)
plugin.prepare_test()
if self.flush_config_to:
@ -273,7 +279,7 @@ class TankCore(object):
""" Call start_test() on all plugins """
logger.info("Starting test...")
self.publish("core", "stage", "start")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Starting %s", plugin)
plugin.start_test()
if self.flush_config_to:
@ -292,7 +298,7 @@ class TankCore(object):
while not self.interrupted:
begin_time = time.time()
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Polling %s", plugin)
retcode = plugin.is_test_finished()
if retcode >= 0:
@ -311,7 +317,7 @@ class TankCore(object):
logger.info("Finishing test...")
self.publish("core", "stage", "end")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Finalize %s", plugin)
try:
logger.debug("RC before: %s", retcode)
@ -335,7 +341,7 @@ class TankCore(object):
logger.info("Post-processing test...")
self.publish("core", "stage", "post_process")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Post-process %s", plugin)
try:
logger.debug("RC before: %s", retcode)
@ -425,7 +431,7 @@ class TankCore(object):
Retrieve a plugin of desired class, KeyError raised otherwise
"""
logger.debug("Searching for plugin: %s", plugin_class)
matches = [plugin for plugin in self.plugins if isinstance(plugin, plugin_class)]
matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]
if len(matches) > 0:
if len(matches) > 1:
logger.debug(
@ -435,6 +441,10 @@ class TankCore(object):
else:
raise KeyError("Requested plugin type not found: %s" % plugin_class)
def get_jobno(self, plugin_name='plugin_lunapark'):
uploader_plugin = self.plugins[plugin_name]
return uploader_plugin.lp_job.number
def __collect_file(self, filename, keep_original=False):
"""
Move or copy single file to artifacts dir
@ -556,7 +566,7 @@ class TankCore(object):
"""
logger.info("Close allocated resources...")
for plugin in self.plugins:
for plugin in self.plugins.values():
logger.debug("Close %s", plugin)
try:
plugin.close()
@ -589,6 +599,11 @@ class TankCore(object):
os_agent = 'OS/{}'.format(platform.platform())
return ' '.join((tank_agent, python_agent, os_agent))
def register_plugin(self, plugin_name, instance):
if self.plugins.get(plugin_name, None) is not None:
logger.exception('Plugins\' names should diverse')
self.plugins[plugin_name] = instance
class ConfigManager(object):
""" Option storage class """

View File

@ -21,7 +21,6 @@ class Plugin(AbstractPlugin, AggregateResultListener):
self.screen = None
self.render_exception = None
self.console_markup = None
self.remote_translator = None
self.info_panel_width = '33'
self.short_only = 0
# these three provide non-blocking console output
@ -71,9 +70,6 @@ class Plugin(AbstractPlugin, AggregateResultListener):
sys.stdout.write(self.__console_view)
sys.stdout.write(self.console_markup.TOTAL_RESET)
if self.remote_translator:
self.remote_translator.send_console(self.__console_view)
def is_test_finished(self):
if not self.__writer_thread:
self.__writer_event = threading.Event()

View File

@ -144,7 +144,9 @@ class APIClient(object):
response_callback=lambda x: x,
writer=False,
trace=False,
json=None):
json=None,
maintenance_timeouts=None,
maintenance_msg=None):
url = urljoin(self.base_url, path)
if json:
request = requests.Request(
@ -153,7 +155,8 @@ class APIClient(object):
request = requests.Request(
http_method, url, data=data, headers={'User-Agent': self.user_agent}, params=self.params)
network_timeouts = self.network_timeouts()
maintenance_timeouts = self.maintenance_timeouts()
maintenance_timeouts = maintenance_timeouts or self.maintenance_timeouts()
maintenance_msg = maintenance_msg or "%s is under maintenance" % (self._base_url)
while True:
try:
response = self.__send_single_request(request, trace=trace)
@ -172,8 +175,8 @@ class APIClient(object):
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
logger.warn(
"%s is under maintenance, will retry in %ss..." % (self._base_url, timeout))
logger.warn(maintenance_msg)
logger.warn("Retrying in %ss..." % timeout)
time.sleep(timeout)
continue
except StopIteration:
@ -223,13 +226,15 @@ class APIClient(object):
except StopIteration:
raise e
def __get(self, addr, trace=False):
def __get(self, addr, trace=False, maintenance_timeouts=None, maintenance_msg=None):
return self.__make_api_request(
'GET',
addr,
trace=trace,
response_callback=lambda r: json.loads(
r.content.decode('utf8')))
response_callback=lambda r: json.loads(r.content.decode('utf8')),
maintenance_timeouts=maintenance_timeouts,
maintenance_msg=maintenance_msg
)
def __post_raw(self, addr, txt_data, trace=False):
return self.__make_api_request(
@ -542,23 +547,16 @@ class APIClient(object):
except StopIteration:
raise e
def send_console(self, jobno, console, trace=False):
if trace:
logger.debug("Sending console view [%s]: %s", len(console),
console[:64])
addr = "api/job/%s/console.txt" % jobno
self.__post_raw(addr, console, trace=trace)
def is_target_locked(self, target, trace=False):
addr = "api/server/lock.json?action=check&address=%s" % target
res = self.__get(addr, trace=trace)
return res[0]
def lock_target(self, target, duration, trace=False):
def lock_target(self, target, duration, trace=False, maintenance_timeouts=None, maintenance_msg=None):
addr = "api/server/lock.json?action=lock&" + \
"address=%s&duration=%s&jobno=None" % \
(target, int(duration))
res = self.__get(addr, trace=trace)
res = self.__get(addr, trace=trace, maintenance_timeouts=maintenance_timeouts, maintenance_msg=maintenance_msg)
return res[0]
def unlock_target(self, target):
@ -589,5 +587,8 @@ class OverloadClient(APIClient):
def send_status(self, jobno, upload_token, status, trace=False):
return
def lock_target(self, target, duration, trace=False):
def lock_target(self, target, duration, trace=False, **kwargs):
return
def unlock_target(self, *args, **kwargs):
return

View File

@ -62,7 +62,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
self.mon = None
self.regression_component = None
self.retcode = -1
self.target = None
self._target = None
self.task_name = ''
self.token_file = None
self.version_tested = None
@ -72,6 +72,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
self.backend_type = BackendTypes.identify_backend(self.SECTION)
self._task = None
self._api_token = ''
self._lp_job = None
@staticmethod
def get_key():
@ -181,9 +182,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
os.getcwd())
def prepare_test(self):
info = self.core.job.generator_plugin.get_info()
self.target = info.address
logger.info("Detected target: %s", self.target)
info = self.generator_info
port = info.port
instances = info.instances
if info.ammo_file.startswith(
@ -191,25 +190,24 @@ class Plugin(AbstractPlugin, AggregateResultListener,
ammo_path = info.ammo_file
else:
ammo_path = os.path.realpath(info.ammo_file)
loadscheme = [] if isinstance(info.rps_schedule,
str) else info.rps_schedule
duration = int(info.duration)
if duration:
self.lock_target_duration = duration
loop_count = info.loop_count
self.lp_job = self.__get_lp_job(self.target, port, loadscheme)
lp_job = self.lp_job
self.locked_targets = self.check_and_lock_targets(strict=bool(
int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)
try:
if self.lp_job._number:
self.make_symlink(self.lp_job._number)
if lp_job._number:
self.make_symlink(lp_job._number)
self.check_task_is_open()
else:
self.check_task_is_open()
self.lp_job.create()
self.make_symlink(self.lp_job.number)
lp_job.create()
self.make_symlink(lp_job.number)
self.core.publish(self.SECTION, 'jobno', lp_job.number)
except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:
logger.error(e.message)
logger.error(
@ -221,7 +219,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
return
cmdline = ' '.join(sys.argv)
self.lp_job.edit_metainfo(
lp_job.edit_metainfo(
instances=instances,
ammo_path=ammo_path,
loop_count=loop_count,
@ -242,7 +240,6 @@ class Plugin(AbstractPlugin, AggregateResultListener,
if console:
console.add_info_widget(JobInfoWidget(self))
console.remote_translator = self
self.set_option('target_host', self.target)
self.set_option('target_port', port)
@ -469,12 +466,6 @@ class Plugin(AbstractPlugin, AggregateResultListener,
with open(os.path.join(self.core.artifacts_dir, 'saved_conf.ini'), 'w') as f:
config.write(f)
def send_console(self, text):
try:
self.lp_job.send_console(text)
except Exception: # pylint: disable=W0703
logger.debug("Can't send console snapshot: %s", exc_info=True)
def parse_lock_targets(self):
# prepare target lock list
locks_list_cfg = self.get_option('lock_targets', 'auto').strip()
@ -555,10 +546,22 @@ class Plugin(AbstractPlugin, AggregateResultListener,
user_agent=self._get_user_agent(),
api_token=self.api_token)
def __get_lp_job(self, target, port, loadscheme):
@property
def lp_job(self):
if self._lp_job is None:
self._lp_job = self.__get_lp_job()
return self._lp_job
def __get_lp_job(self):
api_client = self.__get_api_client()
info = self.generator_info
port = info.port
loadscheme = [] if isinstance(info.rps_schedule,
str) else info.rps_schedule
return LPJob(client=api_client,
target_host=target,
target_host=self.target,
target_port=port,
number=self.get_option('jobno', ''),
token=self.get_option('upload_token', ''),
@ -619,6 +622,19 @@ class Plugin(AbstractPlugin, AggregateResultListener,
)
raise RuntimeError("API token error")
@property
def generator_info(self):
if self._generator_info is None:
self._generator_info = self.core.job.generator_plugin.get_info()
return self._generator_info
@property
def target(self):
if self._target is None:
self._target = self.generator_info.address
logger.info("Detected target: %s", self.target)
return self._target
class JobInfoWidget(AbstractInfoWidget):
def __init__(self, sender):
@ -785,17 +801,17 @@ class LPJob(object):
self.api_client.push_monitoring_data(
self.number, self.token, data, trace=self.log_monitoring_requests)
def send_console(self, text):
return self.api_client.send_console(
self.number, text, trace=self.log_other_requests)
def lock_target(self, lock_target, lock_target_duration, ignore, strict):
lock_wait_timeout = 10
maintenance_timeouts = iter([0]) if ignore else iter(lambda: lock_wait_timeout, 0)
while True:
try:
self.api_client.lock_target(
lock_target,
lock_target_duration,
trace=self.log_other_requests)
self.api_client.lock_target(lock_target, lock_target_duration, trace=self.log_other_requests,
maintenance_timeouts=maintenance_timeouts,
maintenance_msg="Target is locked.\nManual unlock link: %s%s" % (
self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target)
))
return True
except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e:
logger.info('Target is not locked due to %s', e.message)
@ -809,12 +825,11 @@ class LPJob(object):
return False
except APIClient.UnderMaintenance:
logger.info('Target is locked')
logger.info("Manual unlock link: %s%s", self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target))
if ignore:
logger.info('ignore_target_locks = 1')
return False
time.sleep(10)
logger.info("Manual unlock link: %s%s", self.api_client.base_url,
self.api_client.get_manual_unlock_link(lock_target))
continue
def set_imbalance_and_dsc(self, rps, comment):

View File

@ -53,7 +53,7 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
if data
]
def end_test(self, retcode):
def post_process(self, retcode):
self.data_and_stats_stream.close()
self.monitoring_stream.close()
return retcode

View File

@ -3,9 +3,9 @@ import logging
import subprocess
import time
from ...common.interfaces import AbstractPlugin, AbstractInfoWidget, GeneratorPlugin
from ...common.interfaces import AbstractPlugin, \
AbstractInfoWidget, GeneratorPlugin
from .config import PoolConfig, PandoraConfig, parse_schedule
from .reader import PandoraStatsReader
from ..Aggregator import Plugin as AggregatorPlugin
from ..Console import Plugin as ConsolePlugin
@ -29,6 +29,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
self.process_stderr = None
self.process_start_time = None
self.custom_config = False
self.sample_log = "./phout.log"
self.expvar = True
@staticmethod
def get_key():
@ -36,77 +38,43 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
def get_available_options(self):
opts = [
"pandora_cmd", "buffered_seconds", "ammo", "loop", "sample_log",
"config_file", "startup_schedule", "user_schedule", "gun_type",
"custom_config"
"pandora_cmd", "buffered_seconds",
"config_content", "config_file",
"expvar"
]
return opts
def configure(self):
self.custom_config = self.get_option("custom_config", "0") == "1"
self.expvar = self.get_option("expvar", "1") == "1"
self.pandora_cmd = self.get_option("pandora_cmd", "pandora")
self.buffered_seconds = int(
self.get_option("buffered_seconds", self.buffered_seconds))
pool_config = PoolConfig()
ammo = self.get_option("ammo", "")
if ammo:
pool_config.set_ammo(ammo)
loop_limit = int(self.get_option("loop", "0"))
pool_config.set_loop(loop_limit)
self.sample_log = self.get_option("sample_log", "")
if not self.sample_log:
self.sample_log = self.core.mkstemp(".samples", "results_")
with open(self.sample_log, 'w'):
pass
self.core.add_artifact_file(self.sample_log)
pool_config.set_sample_log(self.sample_log)
startup_schedule = self.get_option("startup_schedule", "")
if startup_schedule:
pool_config.set_startup_schedule(parse_schedule(startup_schedule))
else:
raise RuntimeError("startup_schedule not specified")
user_schedule = self.get_option("user_schedule", "")
if user_schedule:
pool_config.set_user_schedule(parse_schedule(user_schedule))
else:
raise RuntimeError("user_schedule not specified")
shared_schedule = bool(int(self.get_option("shared_schedule", "1")))
pool_config.set_shared_schedule(shared_schedule)
target = self.get_option("target", "localhost:3000")
pool_config.set_target(target)
gun_type = self.get_option("gun_type", "http")
if gun_type == 'https':
pool_config.set_ssl(True)
logger.info("SSL is on")
gun_type = "http"
logger.info("Pandora gun type is: %s", gun_type)
pool_config.set_gun_type(gun_type)
ammo_type = self.get_option("ammo_type", "jsonline/http")
logger.info("Pandora ammo type is: %s", ammo_type)
pool_config.set_ammo_type(ammo_type)
self.pandora_config = PandoraConfig()
self.pandora_config.add_pool(pool_config)
self.pandora_config_file = self.get_option("config_file", "")
if not self.pandora_config_file:
if self.custom_config:
raise RuntimeError(
"You said you would like to use custom config,"
" but you didn't specify it")
config_content = self.get_option("config_content", "")
if config_content:
self.pandora_config_file = self.core.mkstemp(
".json", "pandora_config_")
self.core.add_artifact_file(self.pandora_config_file)
if not self.custom_config:
self.core.add_artifact_file(self.pandora_config_file)
with open(self.pandora_config_file, 'w') as config_file:
config_file.write(self.pandora_config.json())
config_file.write(config_content)
else:
config_file = self.get_option("config_file", "")
if not config_file:
raise RuntimeError(
"neither pandora config content"
"nor pandora config file is specified")
else:
extension = config_file.rsplit(".", 1)[1]
self.pandora_config_file = self.core.mkstemp(
"." + extension, "pandora_config_")
self.core.add_artifact_file(self.pandora_config_file)
with open(config_file, 'rb') as config:
config_content = config.read()
with open(self.pandora_config_file, 'wb') as config_file:
config_file.write(config_content)
def prepare_test(self):
aggregator = None
@ -120,7 +88,7 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
"Linking sample and stats readers to aggregator. Reading samples from %s",
self.sample_log)
aggregator.reader = PhantomReader(self.sample_log)
aggregator.stats_reader = PandoraStatsReader()
aggregator.stats_reader = PandoraStatsReader(self.expvar)
try:
console = self.core.get_plugin_of_type(ConsolePlugin)

View File

@ -7,8 +7,21 @@ logger = logging.getLogger(__name__)
class PandoraStatsReader(object):
# TODO: maybe make stats collection asyncronous
def __init__(self, expvar):
self.closed = False
self.expvar = expvar
def next(self):
if self.closed:
raise StopIteration()
if not self.expvar:
return [{
'ts': int(time.time() - 1),
'metrics': {
'instances': 0,
'reqps': 0
}
}]
try:
pandora_response = requests.get("http://localhost:1234/debug/vars")
pandora_stat = pandora_response.json()
@ -40,7 +53,7 @@ class PandoraStatsReader(object):
}]
def close(self):
pass
self.closed = True
def __iter__(self):
return self

View File

@ -134,11 +134,20 @@ class MonitoringCollector(object):
def send_collected_data(self):
"""sends pending data set to listeners"""
data = copy.deepcopy(self.__collected_data)
data = self.__collected_data
self.__collected_data = []
for listener in self.listeners:
# deep copy to ensure each listener gets it's own copy
listener.monitoring_data(copy.deepcopy(data))
def not_empty(self):
return len(self.__collected_data) > 0
def send_rest_data(self):
while self.not_empty():
logger.info("Sending monitoring data rests...")
self.send_collected_data()
def hash_hostname(self, host):
if self.disguise_hostnames and host:
return hashlib.md5(host).hexdigest()

View File

@ -230,9 +230,7 @@ class Plugin(AbstractPlugin):
for log in self.monitoring.artifact_files:
self.core.add_artifact_file(log)
while self.monitoring.__collected_data:
logger.info("Sending monitoring data rests...")
self.monitoring.send_collected_data()
self.monitoring.send_rest_data()
if self.mon_saver:
self.mon_saver.close()
return retcode