Merge pull request #113 from direvius/zmq

Zmq
This commit is contained in:
Alexey Lavrenuke 2014-10-13 17:15:27 +04:00
commit 61e4bff168
3 changed files with 37 additions and 6 deletions

View File

@ -42,6 +42,7 @@ class BFGPlugin(AbstractPlugin):
self.stepper_wrapper.read_config()
def prepare_test(self):
self.log.info(self.get_option("ammo_type"))
self.stepper_wrapper.prepare_stepper()
gun_type = self.get_option("gun_type")
if gun_type in self.gun_classes:
@ -60,6 +61,7 @@ class BFGPlugin(AbstractPlugin):
threads=self.get_option("threads", '10'),
stpd_filename=self.stepper_wrapper.stpd,
cached_stpd=cached_stpd,
zmq=self.get_option("zmq", '0') == '1'
)
aggregator = None
try:

View File

@ -1,6 +1,7 @@
import logging
import time
from Tank.stepper import StpdReader
from zmq_reader import ZmqReader
import multiprocessing as mp
import threading as th
from Queue import Empty, Full
@ -10,7 +11,7 @@ def signal_handler(signum, frame):
class BFG(object):
def __init__(self, gun, instances, threads, stpd_filename, cached_stpd=False):
def __init__(self, gun, instances, threads, stpd_filename, zmq=False, cached_stpd=False):
self.log = logging.getLogger(__name__)
self.log.info(
"BFG using stpd from %s", stpd_filename)
@ -24,6 +25,7 @@ class BFG(object):
self.stpd_filename = stpd_filename
self.pool = [mp.Process(target=self._worker) for i in xrange(0, self.instances)]
self.feeder = th.Thread(target=self._feed)
self.zmq = zmq
def start(self):
self.start_time = time.time()
@ -37,9 +39,12 @@ class BFG(object):
self.quit.set()
def _feed(self):
plan = StpdReader(self.stpd_filename)
if self.cached_stpd:
plan = list(plan)
if self.zmq:
plan = ZmqReader()
else:
plan = StpdReader(self.stpd_filename)
if self.cached_stpd:
plan = list(plan)
for task in plan:
if self.quit.is_set():
self.log.info("Stop feeding: gonna quit")
@ -79,8 +84,9 @@ class BFG(object):
except (KeyboardInterrupt, SystemExit):
self.quit.set()
except Empty:
self.log.info("Empty queue. Exiting thread.")
return
if self.quit.is_set():
self.log.info("Empty queue. Exiting thread.")
return
except Full:
self.log.warning("Couldn't put to result queue because it's full")
self.log.debug("Exiting shooter thread...")

View File

@ -0,0 +1,23 @@
import zmq
import logging
import json
class ZmqReader(object):
'''Read missiles from zmq'''
def __init__(self):
self.queue = "tcp://127.0.0.1:43000" #queue
self.log = logging.getLogger(__name__)
context = zmq.Context()
self.socket = context.socket(zmq.PULL)
self.socket.connect(self.queue)
self.log.info("ZMQ: Waiting for missiles from '%s'" % self.queue)
def __iter__(self):
while True:
try:
data = self.socket.recv()
yield(tuple(json.loads(data)))
except Exception as e:
print(e)