add timeout to feeder put

This commit is contained in:
Alexey Lavrenuke 2015-06-11 17:38:47 +03:00
parent 231ce4b69e
commit b7c12d0edb
2 changed files with 24 additions and 26 deletions

View File

@ -21,7 +21,7 @@ from contextlib import contextmanager
@contextmanager
def measure(marker, err, results):
def measure(marker, results):
start_time = time.time()
yield
response_time = int((time.time() - start_time) * 1000)
@ -29,7 +29,7 @@ def measure(marker, err, results):
marker, # marker
th.active_count(), # threads
response_time, # overall response time
err, # httpCode
200, # httpCode
0, # netCode
0, # sent
0, # received
@ -130,7 +130,7 @@ class CustomGun(AbstractPlugin):
self.module = __import__(module_name)
def shoot(self, missile, marker, results):
results.put(self.module.shoot(self, missile, marker), timeout=1)
self.module.shoot(self, missile, marker, results)
class HttpGun(AbstractPlugin):
SECTION = 'http_gun'
@ -163,6 +163,7 @@ class HttpGun(AbstractPlugin):
)
results.put((int(time.time()), data_item), timeout=1)
class ScenarioGun(AbstractPlugin):
SECTION = 'scenario_gun'
@ -171,16 +172,5 @@ class ScenarioGun(AbstractPlugin):
AbstractPlugin.__init__(self, core)
def shoot(self, missile, marker, results):
err = 0
with measure("root", err, results):
#resp = requests.get("http://bsgraphite-gfe01h.yandex.ru/")
#err = resp.status_code
time.sleep(1)
with measure("afdb_load", err, results):
#resp = requests.get("http://bsgraphite-gfe01h.yandex.ru/afdb_load/")
#err = resp.status_code
time.sleep(1)
with measure("afdb", err, results):
#resp = requests.get("http://bsgraphite-gfe01h.yandex.ru/afdb/")
#err = resp.status_code
time.sleep(1)
with measure("logon", results):
requests.get("http://google.com/")

View File

@ -38,6 +38,7 @@ Gun: {gun.__class__}
self.feeder = th.Thread(target=self._feed, name="Feeder")
self.zmq = zmq
self.workers_finished = False
self.start_time = time.time()
def start(self):
self.start_time = time.time()
@ -45,6 +46,13 @@ Gun: {gun.__class__}
process.daemon = True
process.start()
self.feeder.start()
try:
self.log.info("Waiting for workers")
map(lambda x: x.join(), self.pool)
self.log.info("All workers exited.")
self.workers_finished = True
except (KeyboardInterrupt, SystemExit):
self.quit.set()
def running(self):
return not self.workers_finished
@ -64,20 +72,20 @@ Gun: {gun.__class__}
for task in self.plan:
if self.quit.is_set():
self.log.info("Stop feeding: gonna quit")
# self.task_queue.close()
return
self.task_queue.put(task)
while 1:
try:
self.task_queue.put(task, timeout=1)
except Full:
if self.quit.is_set() or self.workers_finished:
return
self.log.info(
"Feeded all data. Publishing %d killer tasks" % (
self.threads * self.instances))
[self.task_queue.put(None) for _ in range(
self.threads * self.instances)]
try:
self.log.info("Waiting for workers")
map(lambda x: x.join(), self.pool)
self.log.info("All workers exited.")
self.workers_finished = True
except (KeyboardInterrupt, SystemExit):
self.quit.set()
[self.task_queue.put(None) for _ in xrange(
0, self.threads * self.instances)]
def _worker(self):
self.log.info(