Merge pull request #280 from fomars/disk_writer

Disk writer
This commit is contained in:
Alexey Lavrenuke 2016-10-25 21:08:56 +03:00 committed by GitHub
commit ecc1345590
9 changed files with 83 additions and 13 deletions

View File

@ -44,7 +44,7 @@ analytic tools for the results they produce.
entry_points={
'console_scripts': [
'yandex-tank = yandextank.core.cli:main',
'yandex-tank-check-ssh = yandextank.core.util:check_ssh_connection',
'yandex-tank-check-ssh = yandextank.common.util:check_ssh_connection',
],
},
package_data={

View File

@ -11,9 +11,11 @@ import tempfile
import time
import traceback
import uuid
import configparser
from builtins import str
from configparser import NoSectionError
from yandextank.common.exceptions import PluginNotPrepared
from configparser import NoSectionError
from yandextank.common.exceptions import PluginNotPrepared
from yandextank.common.interfaces import GeneratorPlugin
@ -38,12 +40,23 @@ class Job(object):
self.monitoring_plugin = monitoring_plugin
self.aggregator_plugin = aggregator_plugin
self.tank = tank
self._phantom_info = None
self.generator_plugin = generator_plugin
def subscribe_plugin(self, plugin):
self.aggregator_plugin.add_result_listener(plugin)
self.monitoring_plugin.monitoring.add_listener(plugin)
@property
def phantom_info(self):
if self._phantom_info is None:
raise PluginNotPrepared
return self._phantom_info
@phantom_info.setter
def phantom_info(self, info):
self._phantom_info = info
class TankCore(object):
"""
@ -186,8 +199,8 @@ class TankCore(object):
self.log.warning("Load generator not found:", exc_info=True)
gen = None
self.job = Job(name=str(self.get_option(self.SECTION_META, "job_name", 'none').decode('utf8')),
description=str(self.get_option(self.SECTION_META, "job_dsc", '').decode('utf8')),
self.job = Job(name=str(self.get_option(self.SECTION_META, "job_name", 'none')),
description=str(self.get_option(self.SECTION_META, "job_dsc", '')),
task=str(self.get_option(self.SECTION_META, 'task', 'dir')),
version=str(self.get_option(self.SECTION_META, 'ver', '')),
config_copy=self.get_option(self.SECTION_META, 'copy_config_to', 'config_copy'),
@ -550,7 +563,7 @@ class ConfigManager(object):
filename = self.file
if filename:
with open(filename, 'wb') as handle:
with open(filename, 'w') as handle:
self.config.write(handle)
def get_options(self, section, prefix=''):

View File

@ -50,7 +50,7 @@ class Plugin(AbstractPlugin):
def configure(self):
self.aggregator_config = json.loads(resource_string(
__name__, 'config/phout.json'))
__name__, 'config/phout.json').decode('utf8'))
verbose_histogram_option = self.get_option("verbose_histogram", "0")
self.verbose_histogram = (
verbose_histogram_option.lower() == "true") or (

View File

@ -0,0 +1 @@
from .plugin import Plugin

View File

@ -0,0 +1,55 @@
# TODO: make the next two lines unnecessary
# pylint: disable=line-too-long
# pylint: disable=missing-docstring
import logging
import os
from ..Aggregator import Plugin as AggregatorPlugin
from ..Monitoring import Plugin as MonitoringPlugin
from ..Telegraf import Plugin as TelegrafPlugin
from ...common.interfaces import AbstractPlugin, MonitoringDataListener, AggregateResultListener
logger = logging.getLogger(__name__) # pylint: disable=C0103
class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
# pylint:disable=R0902
SECTION = 'json_report'
def get_available_options(self):
return ['monitoring_log', 'test_data_log', 'test_stats_log']
def configure(self):
self.monitoring_logger = self.create_file_logger('monitoring',
self.get_option('monitoring_log',
'monitoring.log'))
self.aggregator_data_logger = self.create_file_logger('aggregator_data',
self.get_option('test_data_log',
'test_data.log'))
self.stats_logger = self.create_file_logger('stats',
self.get_option('test_stats_log',
'test_stats.log'))
self.core.job.subscribe_plugin(self)
def create_file_logger(self, logger_name, file_name, formatter=None):
loggr = logging.getLogger(logger_name)
loggr.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(self.core.artifacts_dir, file_name), mode='w')
handler.setLevel(logging.INFO)
if formatter:
handler.setFormatter(formatter)
loggr.addHandler(handler)
loggr.propagate = False
return loggr
def on_aggregated_data(self, data, stats):
"""
@data: aggregated data
@stats: stats about gun
"""
self.aggregator_data_logger.info(data)
self.stats_logger.info(stats)
def monitoring_data(self, data_list):
self.monitoring_logger.info(data_list)

View File

@ -136,6 +136,8 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
logger.debug("Console not found: %s", ex)
console = None
self.core.job.phantom_info = self.phantom.get_info()
if console and aggregator:
widget = PhantomProgressBarWidget(self)
console.add_info_widget(widget)

View File

@ -19,12 +19,11 @@ class Plugin(AbstractPlugin, AbstractInfoWidget):
def __init__(self, core):
AbstractPlugin.__init__(self, core)
AbstractInfoWidget.__init__(self)
self.lines = resource_stream(__name__, "config/tips.txt").readlines()
self.lines = [l.decode('utf-8') for l in resource_stream(__name__, "config/tips.txt").readlines()]
self.disable = 0
line = random.choice(self.lines)
self.section = line[:line.index(':')]
self.tip = line[line.index(':') + 1:].strip()
self.section, self.tip = [_.strip() for _ in line.split(':', 1)]
self.probability = 0.0
@staticmethod

View File

@ -29,9 +29,9 @@ class StpdReader(object):
def __iter__(self):
def read_chunk_header(ammo_file):
chunk_header = ''
while chunk_header is '':
line = ammo_file.readline()
if line is '':
while not chunk_header:
line = ammo_file.readline().decode('utf8')
if not line:
return line # EOF
chunk_header = line.strip('\r\n')
return chunk_header

View File

@ -253,7 +253,7 @@ class StepperWrapper(object):
hashed_str += sep + \
';'.join(self.uris) + sep + ';'.join(self.headers)
self.log.debug("stpd-hash source: %s", hashed_str)
hasher.update(hashed_str)
hasher.update(hashed_str.encode('utf8'))
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
stpd = self.cache_dir + '/' + \