mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Make non-async runner executions happen in a thread instead of a daemonized process.
This means we are backwards compatible with builtin print (yay!) and that you can control-c out of the execution (since it will kill the thread too). Conflicts: salt/client/mixins.py
This commit is contained in:
parent
12de6b5dfb
commit
f25505f0dd
@ -120,7 +120,7 @@ class LocalClient(object):
|
||||
)
|
||||
self.opts = salt.config.client_config(c_path)
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.salt_user = self.__get_user()
|
||||
self.salt_user = salt.utils.get_specific_user()
|
||||
self.skip_perm_errors = skip_perm_errors
|
||||
self.key = self.__read_master_key()
|
||||
self.event = salt.utils.event.get_event(
|
||||
@ -156,20 +156,6 @@ class LocalClient(object):
|
||||
# Fall back to eauth
|
||||
return ''
|
||||
|
||||
def __get_user(self):
|
||||
'''
|
||||
Determine the current user running the salt command
|
||||
'''
|
||||
user = salt.utils.get_user()
|
||||
# if our user is root, look for other ways to figure out
|
||||
# who we are
|
||||
env_vars = ('SUDO_USER',)
|
||||
if user == 'root' or user == self.opts['user']:
|
||||
for evar in env_vars:
|
||||
if evar in os.environ:
|
||||
return 'sudo_{0}'.format(os.environ[evar])
|
||||
return user
|
||||
|
||||
def _convert_range_to_list(self, tgt):
|
||||
'''
|
||||
convert a seco.range range into a list target
|
||||
|
@ -276,12 +276,13 @@ class AsyncClientMixin(object):
|
||||
client = None
|
||||
tag_prefix = None
|
||||
|
||||
def _proc_function(self, fun, low, user, tag, jid):
|
||||
def _proc_function(self, fun, low, user, tag, jid, daemonize=True):
|
||||
'''
|
||||
Run this method in a multiprocess target to execute the function in a
|
||||
multiprocess and fire the return data on the event bus
|
||||
'''
|
||||
salt.utils.daemonize()
|
||||
if daemonize:
|
||||
salt.utils.daemonize()
|
||||
|
||||
# pack a few things into low
|
||||
low['__jid__'] = jid
|
||||
@ -310,20 +311,24 @@ class AsyncClientMixin(object):
|
||||
'''
|
||||
return self.master_call(**low)
|
||||
|
||||
def _gen_async_pub(self):
|
||||
jid = salt.utils.jid.gen_jid()
|
||||
tag = tagify(jid, prefix=self.tag_prefix)
|
||||
return {'tag': tag, 'jid': jid}
|
||||
|
||||
def async(self, fun, low, user='UNKNOWN'):
|
||||
'''
|
||||
Execute the function in a multiprocess and return the event tag to use
|
||||
to watch for the return
|
||||
'''
|
||||
jid = salt.utils.jid.gen_jid()
|
||||
tag = tagify(jid, prefix=self.tag_prefix)
|
||||
async_pub = self._gen_async_pub()
|
||||
|
||||
proc = multiprocessing.Process(
|
||||
target=self._proc_function,
|
||||
args=(fun, low, user, tag, jid))
|
||||
args=(fun, low, user, async_pub['tag'], async_pub['jid']))
|
||||
proc.start()
|
||||
proc.join() # MUST join, otherwise we leave zombies all over
|
||||
return {'tag': tag, 'jid': jid}
|
||||
return async_pub
|
||||
|
||||
def print_async_event(self, suffix, event):
|
||||
'''
|
||||
|
@ -7,6 +7,8 @@ Execute salt convenience routines
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
import logging
|
||||
import threading
|
||||
import os
|
||||
|
||||
# Import salt libs
|
||||
import salt.exceptions
|
||||
@ -137,21 +139,40 @@ class Runner(RunnerClient):
|
||||
low['args'] = args
|
||||
low['kwargs'] = kwargs
|
||||
|
||||
async_pub = self.async(self.opts['fun'], low)
|
||||
user = salt.utils.get_specific_user()
|
||||
|
||||
# Run the runner!
|
||||
if self.opts.get('async', False):
|
||||
async_pub = self.async(self.opts['fun'], low, user=user)
|
||||
log.info('Running in async mode. Results of this execution may '
|
||||
'be collected by attaching to the master event bus or '
|
||||
'by examing the master job cache, if configured. '
|
||||
'This execution is running under tag {tag}'.format(**async_pub))
|
||||
return async_pub['jid'] # return the jid
|
||||
|
||||
# otherwise run it in a thread, so you can stop it *and* so we
|
||||
# can see raw "print" (the builtin)
|
||||
async_pub = self._gen_async_pub()
|
||||
t = threading.Thread(target=self._proc_function,
|
||||
args=(self.opts['fun'],
|
||||
low,
|
||||
user,
|
||||
async_pub['tag'],
|
||||
async_pub['jid'],
|
||||
False, # Don't daemonize
|
||||
))
|
||||
# Daemon thread, so if someone throws a keyboard exception
|
||||
# we will stop execution in the thread
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# output rets if you have some
|
||||
for suffix, event in self.get_async_returns(async_pub['tag'], event=self.event):
|
||||
if not self.opts.get('quiet', False):
|
||||
self.print_async_event(suffix, event)
|
||||
if suffix == 'ret':
|
||||
ret = event['return']
|
||||
t.join()
|
||||
|
||||
except salt.exceptions.SaltException as exc:
|
||||
ret = str(exc)
|
||||
|
@ -239,6 +239,19 @@ def get_user():
|
||||
else:
|
||||
return getpass.getuser()
|
||||
|
||||
def get_specific_user():
|
||||
'''
|
||||
Get a user name for publishing. If you find the user is "root" attempt to be
|
||||
more specific
|
||||
'''
|
||||
user = get_user()
|
||||
env_vars = ('SUDO_USER',)
|
||||
if user == 'root' or user == self.opts['user']:
|
||||
for evar in env_vars:
|
||||
if evar in os.environ:
|
||||
return 'sudo_{0}'.format(os.environ[evar])
|
||||
return user
|
||||
|
||||
|
||||
def daemonize(redirect_out=True):
|
||||
'''
|
||||
|
Loading…
Reference in New Issue
Block a user