Merge branch 'master' of github.com:yandex/yandex-tank

This commit is contained in:
Alexey Lavrenuke 2016-12-19 17:40:15 +03:00
commit 4f4ce80b15
4 changed files with 18 additions and 20 deletions

View File

@ -32,5 +32,5 @@ class TestDrain(object):
destination = Queue()
drain = Drain(source, destination)
drain.start()
drain.wait()
drain.join()
assert destination.qsize() == 1000000

View File

@ -31,7 +31,6 @@ class Drain(th.Thread):
super(Drain, self).__init__()
self.source = source
self.destination = destination
self._finished = th.Event()
self._interrupted = th.Event()
def run(self):
@ -39,10 +38,6 @@ class Drain(th.Thread):
self.destination.put(item)
if self._interrupted.is_set():
break
self._finished.set()
def wait(self, timeout=None):
self._finished.wait(timeout=timeout)
def close(self):
self._interrupted.set()

View File

@ -23,6 +23,16 @@ class LoggingListener(AggregateResultListener):
logger.info("Stats:\n%s", json.dumps(stats, indent=2))
def get_from_queue(queue):
data = []
for _ in range(queue.qsize()):
try:
data.append(queue.get_nowait())
except q.Empty:
break
return data
class Plugin(AbstractPlugin):
"""
Plugin that manages aggregation and stats collection
@ -83,18 +93,8 @@ class Plugin(AbstractPlugin):
"""
Collect data, cache it and send to listeners
"""
data = []
for _ in range(self.results.qsize()):
try:
data.append(self.results.get_nowait())
except q.Empty:
break
stats = []
for _ in range(self.stats.qsize()):
try:
stats += self.stats.get_nowait()
except q.Empty:
break
data = get_from_queue(self.results)
stats = get_from_queue(self.stats)
logger.debug("Data timestamps:\n%s" % [d.get('ts') for d in data])
logger.debug("Stats timestamps:\n%s" % [d.get('ts') for d in stats])
logger.debug("Data cache timestamps:\n%s" % self.data_cache.keys())
@ -126,11 +126,11 @@ class Plugin(AbstractPlugin):
if self.reader:
self.reader.close()
if self.drain:
self.drain.wait()
self.drain.join()
if self.stats_reader:
self.stats_reader.close()
if self.stats_drain:
self.stats_drain.wait()
self.stats_drain.join()
self._collect_data()
return retcode

View File

@ -51,6 +51,9 @@ class Plugin(AbstractPlugin, AggregateResultListener):
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)
aggregator.add_result_listener(self)
import pdb
pdb.set_trace()
self.criterion_str = " ".join(self.get_option("autostop", '').split(
"\n"))
self._stop_report_path = os.path.join(