Close ZMQ socket's when finished with them and term() contexts too.

This commit is contained in:
Pedro Algarvio 2012-12-03 23:34:34 +00:00
parent 2246bf3d33
commit 955c06abc2
4 changed files with 82 additions and 30 deletions

View File

@ -926,9 +926,14 @@ class LocalClient(object):
if expr_form == 'nodegroup':
if tgt not in self.opts['nodegroups']:
conf_file = self.opts.get('conf_file', 'the master config file')
err = 'Node group {0} unavailable in {1}'.format(tgt, conf_file)
raise SaltInvocationError(err)
conf_file = self.opts.get(
'conf_file', 'the master config file'
)
raise SaltInvocationError(
'Node group {0} unavailable in {1}'.format(
tgt, conf_file
)
)
tgt = salt.utils.minions.nodegroup_comp(
tgt,
self.opts['nodegroups']
@ -978,9 +983,13 @@ class LocalClient(object):
payload_kwargs['to'] = timeout
sreq = salt.payload.SREQ(
'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
)
'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
)
payload = sreq.send('clear', payload_kwargs)
# We have the payload, let's get rid of SREQ fast(GC'ed faster)
del(sreq)
if not payload:
return payload
return {'jid': payload['load']['jid'],
@ -1035,6 +1044,7 @@ class FunctionWrapper(dict):
args.append('{0}={1}'.format(_key, _val))
return self.local.cmd(self.minion, key, args)
class Caller(object):
'''
Create an object used to call salt functions directly on a minion

View File

@ -326,9 +326,11 @@ class Publisher(multiprocessing.Process):
continue
raise exc
except KeyboardInterrupt:
#except KeyboardInterrupt:
finally:
pub_sock.close()
pull_sock.close()
context.term()
class ReqServer(object):
@ -409,6 +411,11 @@ class ReqServer(object):
'''
self.__bind()
def __del__(self):
self.clients.close()
self.workers.close()
self.context.term()
class MWorker(multiprocessing.Process):
'''
@ -451,8 +458,10 @@ class MWorker(multiprocessing.Process):
if exc.errno == errno.EINTR:
continue
raise exc
except KeyboardInterrupt:
finally:
#except KeyboardInterrupt:
socket.close()
context.term()
def _handle_payload(self, payload):
'''
@ -1045,14 +1054,18 @@ class AESFuncs(object):
else:
ret_form = 'clean'
if ret_form == 'clean':
return self.local.get_returns(
try:
return self.local.get_returns(
jid,
self.ckminions.check_minions(
clear_load['tgt'],
expr_form
),
),
timeout
)
)
finally:
pub_sock.close()
context.term()
elif ret_form == 'full':
ret = self.local.get_full_returns(
jid,
@ -1063,7 +1076,11 @@ class AESFuncs(object):
timeout
)
ret['__jid__'] = jid
return ret
try:
return ret
finally:
pub_sock.close()
context.term()
def run_func(self, func, load):
'''
@ -1647,6 +1664,14 @@ class ClearFuncs(object):
load['tgt'],
load.get('tgt_type', 'glob')
)
return {'enc': 'clear',
'load': {'jid': clear_load['jid'],
'minions': minions}}
try:
return {
'enc': 'clear',
'load': {
'jid': clear_load['jid'],
'minions': minions
}
}
finally:
pub_sock.close()
context.term()

View File

@ -123,8 +123,8 @@ class SREQ(object):
def __init__(self, master, id_='', serial='msgpack', linger=0):
self.master = master
self.serial = Serial(serial)
context = zmq.Context()
self.socket = context.socket(zmq.REQ)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.linger = linger
if id_:
self.socket.setsockopt(zmq.IDENTITY, id_)
@ -147,9 +147,10 @@ class SREQ(object):
else:
break
tried += 1
ret = self.serial.loads(self.socket.recv())
self.poller.unregister(self.socket)
return ret
try:
return self.serial.loads(self.socket.recv())
finally:
self.poller.unregister(self.socket)
def send_auto(self, payload):
'''
@ -158,3 +159,7 @@ class SREQ(object):
enc = payload.get('enc', 'clear')
load = payload.get('load', {})
return self.send(enc, load)
def __del__(self):
self.socket.close()
self.context.term()

View File

@ -33,6 +33,7 @@ from salt._compat import string_types
log = logging.getLogger(__name__)
class SaltEvent(object):
'''
The base class used to manage salt events
@ -133,11 +134,13 @@ class SaltEvent(object):
if data is None:
continue
yield data
except KeyboardInterrupt:
finally:
#except KeyboardInterrupt:
if self.cpub:
self.sub.close()
if self.cpush:
self.push.close()
self.context.term()
def fire_event(self, data, tag=''):
'''
@ -150,6 +153,13 @@ class SaltEvent(object):
self.push.send(event)
return True
def __del__(self):
if self.cpub:
self.sub.close()
if self.cpush:
self.push.close()
self.context.term()
class MasterEvent(SaltEvent):
'''
@ -181,20 +191,20 @@ class EventPublisher(Process):
Bind the pub and pull sockets for events
'''
# Set up the context
context = zmq.Context(1)
self.context = zmq.Context(1)
# Prepare the master event publisher
epub_sock = context.socket(zmq.PUB)
self.epub_sock = self.context.socket(zmq.PUB)
epub_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'master_event_pub.ipc')
)
# Prepare master event pull socket
epull_sock = context.socket(zmq.PULL)
self.epull_sock = self.context.socket(zmq.PULL)
epull_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'master_event_pull.ipc')
)
# Start the master event publisher
epub_sock.bind(epub_uri)
epull_sock.bind(epull_uri)
self.epub_sock.bind(epub_uri)
self.epull_sock.bind(epull_uri)
# Restrict access to the sockets
pub_mode = 448
if self.opts.get('client_acl') or self.opts.get('external_auth'):
@ -215,15 +225,17 @@ class EventPublisher(Process):
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = epull_sock.recv()
epub_sock.send(package)
package = self.epull_sock.recv()
self.epub_sock.send(package)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
raise exc
except KeyboardInterrupt:
epub_sock.close()
epull_sock.close()
finally:
#except KeyboardInterrupt:
self.epub_sock.close()
self.epull_sock.close()
self.context.term()
class Reactor(multiprocessing.Process, salt.state.Compiler):