move workers to ipc instread of the thread bridge, the master no longer

uses python threading at all!
This commit is contained in:
Thomas S Hatch 2011-08-26 22:47:55 -06:00
parent da653d5e08
commit a2a431e2e2

View File

@ -9,7 +9,6 @@ import shutil
import hashlib
import logging
import tempfile
import threading
import multiprocessing
import time
import datetime
@ -165,41 +164,16 @@ class ReqServer(object):
self.aes_funcs = aes_funcs
self.clear_funcs = clear_funcs
self.master_key = mkey
self.context = zmq.Context(1)
self.context = zmq.Context(self.opts['worker_threads'])
# Prepare the zeromq sockets
self.uri = 'tcp://%(interface)s:%(ret_port)s' % self.opts
self.clients = self.context.socket(zmq.XREP)
self.workers = self.context.socket(zmq.XREQ)
self.w_uri = 'inproc://workers'
self.w_uri = 'ipc:///tmp/.salt.ipc'
# Prepare the aes key
self.key = key
self.crypticle = crypticle
def __worker(self, ind):
'''
Starts up a worker thread
'''
in_socket = self.context.socket(zmq.REP)
in_socket.connect(self.w_uri)
m_worker = MWorker(self.opts,
ind,
self.master_key,
self.key,
self.crypticle,
self.aes_funcs,
self.clear_funcs)
work_port = m_worker.port
m_worker.start()
out_socket = self.context.socket(zmq.REQ)
out_socket.connect('tcp://127.0.0.1:%s' % work_port)
while True:
package = in_socket.recv()
out_socket.send(package)
ret = out_socket.recv()
in_socket.send(ret)
def __bind(self):
'''
Binds the reply server
@ -207,12 +181,17 @@ class ReqServer(object):
log.info('Setting up the master communication server')
self.clients.bind(self.uri)
self.workers.bind(self.w_uri)
for ind in range(int(self.opts['worker_threads'])):
threading.Thread(target=lambda: self.__worker(ind)).start()
time.sleep(0.1)
log.info('Starting Salt worker process {}'.format(ind))
MWorker(self.opts,
self.master_key,
self.key,
self.crypticle,
self.aes_funcs,
self.clear_funcs).start()
self.workers.bind(self.w_uri)
zmq.device(zmq.QUEUE, self.clients, self.workers)
def start_publisher(self):
@ -237,7 +216,6 @@ class MWorker(multiprocessing.Process):
'''
def __init__(self,
opts,
ind,
mkey,
key,
crypticle,
@ -246,7 +224,6 @@ class MWorker(multiprocessing.Process):
multiprocessing.Process.__init__(self)
self.opts = opts
self.crypticle = crypticle
self.port = str(ind + int(self.opts['worker_start_port']))
self.aes_funcs = aes_funcs
self.clear_funcs = clear_funcs
@ -256,7 +233,8 @@ class MWorker(multiprocessing.Process):
'''
context = zmq.Context(1)
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:%s' % self.port)
log.info('Worker binding to socket /tmp/.salt.ipc')
socket.connect('ipc:///tmp/.salt.ipc')
while True:
package = socket.recv()