Gevent-based BFG worker

This commit is contained in:
Anton Kirilenko 2017-03-07 10:53:06 +00:00
parent 62cf5c8720
commit 94e2aebfa8
4 changed files with 128 additions and 5 deletions

View File

@ -579,6 +579,24 @@ How it works
.. image:: ./pic/tank-bfg.png
BFG Worker Type
-----------
By default, BFG will create lots of processes (number is defined by ``instances`` option).
Every process will execute requests in a single thread. These processes will comsume a lot of memory.
It's also possible to switch this behavior and use ``gevent`` to power up every worker process,
allowing it to have multiple concurrent threads executing HTTP requests.
With green worker, it's recommended to set ``instances`` to number of CPU cores,
and adjust the number of real threads by ``green_threads_per_instance`` option.
INI file section: **[bfg]**
:worker_type:
Set it to ``green`` to let every process have multiple concurrent green threads.
:green_threads_per_instance:
Number of green threads every worker process will execute. Only affects ``green`` worker type.
BFG Options
-----------

View File

@ -152,6 +152,7 @@ class TankCore(object):
self.taskset_affinity = self.get_option(self.SECTION, 'affinity', '')
options = self.config.get_options(self.SECTION, self.PLUGIN_PREFIX)
for (plugin_name, plugin) in options:
plugin_path, config_section = parse_plugin(plugin)
if not plugin_path:

View File

@ -7,7 +7,7 @@ from ...common.interfaces import AbstractPlugin, GeneratorPlugin
from .guns import LogGun, SqlGun, CustomGun, HttpGun, ScenarioGun, UltimateGun
from .reader import BfgReader, BfgStatsReader
from .widgets import BfgInfoWidget
from .worker import BFG
from .worker import BFGMultiprocessing, BFGGreen
from ..Aggregator import Plugin as AggregatorPlugin
from ..Console import Plugin as ConsolePlugin
from ...stepper import StepperWrapper
@ -69,11 +69,19 @@ class Plugin(AbstractPlugin, GeneratorPlugin):
cached_stpd = True
else:
cached_stpd = False
if self.get_option("worker_type", "") == "green":
BFG = BFGGreen
else:
BFG = BFGMultiprocessing
self.bfg = BFG(
gun=self.gun,
instances=self.stepper_wrapper.instances,
stpd_filename=self.stepper_wrapper.stpd,
cached_stpd=cached_stpd)
cached_stpd=cached_stpd,
green_threads_per_instance=int(self.get_option('green_threads_per_instance', 1000)),
)
aggregator = None
try:
aggregator = self.core.get_plugin_of_type(AggregatorPlugin)

View File

@ -9,13 +9,14 @@ from ...stepper import StpdReader
logger = logging.getLogger(__name__)
class BFG(object):
class BFGBase(object):
"""
A BFG load generator that manages multiple workers as processes and
threads in each of them and feeds them with tasks
"""
def __init__(self, gun, instances, stpd_filename, cached_stpd=False):
def __init__(self, gun, instances, stpd_filename, cached_stpd=False,
green_threads_per_instance=None):
logger.info(
"""
BFG using stpd from {stpd_filename}
@ -35,13 +36,14 @@ Gun: {gun.__class__.__name__}
self.cached_stpd = cached_stpd
self.stpd_filename = stpd_filename
self.pool = [
mp.Process(target=self._worker) for _ in xrange(0, self.instances)
mp.Process(target=self._worker) for _ in xrange(self.instances)
]
self.feeder = th.Thread(target=self._feed, name="Feeder")
self.feeder.daemon = True
self.workers_finished = False
self.start_time = None
self.plan = None
self.green_threads_per_instance = green_threads_per_instance
def start(self):
self.start_time = time.time()
@ -129,6 +131,12 @@ Gun: {gun.__class__.__name__}
map(lambda x: x.join(), self.pool)
self.workers_finished = True
class BFGMultiprocessing(BFGBase):
"""
Default worker type, creates process per worker,
every process executes requests synchronously inside.
"""
def _worker(self):
"""
A worker that does actual jobs
@ -176,3 +184,91 @@ Gun: {gun.__class__.__name__}
logger.exception("Couldn't finalize gun. Exit shooter process")
return
logger.debug("Exit shooter process")
class BFGGreen(BFGBase):
"""
Green version of the worker. Starts `self.instances` processes,
each of process has a pool of `self.green_threads_per_instance` green threads.
"""
def _worker(self):
from gevent import monkey, spawn
from gevent.queue import Queue as GreenQueue
# NOTE: Patching everything will conflict with multiprocessing
monkey.patch_all(thread=False, select=False)
logger.debug("Init shooter process")
try:
self.gun.setup()
except Exception:
logger.exception("Couldn't initialize gun. Exit shooter process")
return
self.green_queue = GreenQueue(self.green_threads_per_instance)
self.green_pool = [spawn(self._green_worker) for _ in xrange(0, self.green_threads_per_instance)]
# Keep track of tasks sent to greenlets. If all greenlets are busy -
# don't pull more tasks from the main queue, let other workers do that.
self._free_threads_count = self.green_threads_per_instance
while not self.quit.is_set():
while not self.task_queue.empty() and self._free_threads_count:
try:
task = self.task_queue.get_nowait()
except Empty:
continue
self._free_threads_count -= 1
if not task:
logger.debug("Got killer task.")
self.quit.set()
break
self.green_queue.put(task)
time.sleep(0.1)
map(lambda g: g.join(), self.green_pool)
try:
self.gun.teardown()
except Exception:
logger.exception("Couldn't finalize gun. Exit shooter process")
return
logger.debug("Exit shooter process")
def _green_worker(self):
"""
A worker that does actual jobs
"""
while not self.quit.is_set():
try:
task = self.green_queue.get(timeout=1)
timestamp, missile, marker = task
planned_time = self.start_time + (timestamp / 1000.0)
delay = planned_time - time.time()
if delay > 0:
time.sleep(delay)
try:
with self.instance_counter.get_lock():
self.instance_counter.value += 1
self.gun.shoot(missile, marker)
finally:
with self.instance_counter.get_lock():
self.instance_counter.value -= 1
self._free_threads_count += 1
except (KeyboardInterrupt, SystemExit):
break
except Empty:
continue
except Full:
logger.warning("Couldn't put to result queue because it's full")
except Exception:
logger.exception("Bfg shoot exception")