Merge branch 'timeout-for-run-command' of git://github.com/hulu/salt into hulu-timeout-for-run-command

Conflicts:
	salt/states/cmd.py
This commit is contained in:
Thomas S Hatch 2013-06-11 16:30:41 -06:00
commit 36bf8122e9
5 changed files with 124 additions and 20 deletions

View File

@ -83,6 +83,11 @@ class SaltReqTimeoutError(SaltException):
Thrown when a salt master request call fails to return within the timeout Thrown when a salt master request call fails to return within the timeout
''' '''
class TimedProcTimeoutError(SaltException):
'''
Thrown when a timed subprocess does not terminate within the timeout,
or if the specified timeout is not an int or a float
'''
class EauthAuthenticationError(SaltException): class EauthAuthenticationError(SaltException):
''' '''

View File

@ -17,7 +17,9 @@ import yaml
# Import salt libs # Import salt libs
import salt.utils import salt.utils
import salt.utils.timed_subprocess
from salt.exceptions import CommandExecutionError from salt.exceptions import CommandExecutionError
import salt.exceptions
import salt.grains.extra import salt.grains.extra
# Only available on POSIX systems, nonfatal on windows # Only available on POSIX systems, nonfatal on windows
@ -163,7 +165,8 @@ def _run(cmd,
env=(), env=(),
rstrip=True, rstrip=True,
template=None, template=None,
umask=None): umask=None,
timeout=None):
''' '''
Do the DRY thing and only call subprocess.Popen() once Do the DRY thing and only call subprocess.Popen() once
''' '''
@ -295,8 +298,18 @@ def _run(cmd,
kwargs['close_fds'] = True kwargs['close_fds'] = True
# This is where the magic happens # This is where the magic happens
proc = subprocess.Popen(cmd, **kwargs) proc = salt.utils.timed_subprocess.TimedProc(cmd, **kwargs)
out, err = proc.communicate() try:
proc.wait(timeout)
except salt.exceptions.TimedProcTimeoutError, e:
ret['stdout'] = e.message
ret['stderr'] = ''
ret['pid'] = proc.process.pid
# ok return code for timeouts?
ret['retcode'] = 1
return ret
out, err = proc.stdout, proc.stderr
if rstrip: if rstrip:
if out is not None: if out is not None:
@ -306,8 +319,8 @@ def _run(cmd,
ret['stdout'] = out ret['stdout'] = out
ret['stderr'] = err ret['stderr'] = err
ret['pid'] = proc.pid ret['pid'] = proc.process.pid
ret['retcode'] = proc.returncode ret['retcode'] = proc.process.returncode
return ret return ret
@ -317,7 +330,8 @@ def _run_quiet(cmd,
shell=DEFAULT_SHELL, shell=DEFAULT_SHELL,
env=(), env=(),
template=None, template=None,
umask=None): umask=None,
timeout=None):
''' '''
Helper for running commands quietly for minion startup Helper for running commands quietly for minion startup
''' '''
@ -329,7 +343,8 @@ def _run_quiet(cmd,
shell=shell, shell=shell,
env=env, env=env,
template=template, template=template,
umask=umask)['stdout'] umask=umask,
timeout=timeout)['stdout']
def _run_all_quiet(cmd, def _run_all_quiet(cmd,
@ -338,7 +353,8 @@ def _run_all_quiet(cmd,
shell=DEFAULT_SHELL, shell=DEFAULT_SHELL,
env=(), env=(),
template=None, template=None,
umask=None): umask=None,
timeout=None):
''' '''
Helper for running commands quietly for minion startup. Helper for running commands quietly for minion startup.
Returns a dict of return data Returns a dict of return data
@ -350,7 +366,8 @@ def _run_all_quiet(cmd,
env=env, env=env,
quiet=True, quiet=True,
template=template, template=template,
umask=umask) umask=umask,
timeout=timeout)
def run(cmd, def run(cmd,
@ -362,6 +379,7 @@ def run(cmd,
rstrip=True, rstrip=True,
umask=None, umask=None,
quiet=False, quiet=False,
timeout=None,
**kwargs): **kwargs):
''' '''
Execute the passed command and return the output as a string Execute the passed command and return the output as a string
@ -386,7 +404,8 @@ def run(cmd,
template=template, template=template,
rstrip=rstrip, rstrip=rstrip,
umask=umask, umask=umask,
quiet=quiet)['stdout'] quiet=quiet,
timeout=timeout)['stdout']
if not quiet: if not quiet:
log.debug('output: {0}'.format(out)) log.debug('output: {0}'.format(out))
return out return out
@ -401,6 +420,7 @@ def run_stdout(cmd,
rstrip=True, rstrip=True,
umask=None, umask=None,
quiet=False, quiet=False,
timeout=None,
**kwargs): **kwargs):
''' '''
Execute a command, and only return the standard out Execute a command, and only return the standard out
@ -424,7 +444,8 @@ def run_stdout(cmd,
template=template, template=template,
rstrip=rstrip, rstrip=rstrip,
umask=umask, umask=umask,
quiet=quiet)["stdout"] quiet=quiet,
timeout=timeout)["stdout"]
if not quiet: if not quiet:
log.debug('stdout: {0}'.format(stdout)) log.debug('stdout: {0}'.format(stdout))
return stdout return stdout
@ -439,6 +460,7 @@ def run_stderr(cmd,
rstrip=True, rstrip=True,
umask=None, umask=None,
quiet=False, quiet=False,
timeout=None,
**kwargs): **kwargs):
''' '''
Execute a command and only return the standard error Execute a command and only return the standard error
@ -462,7 +484,8 @@ def run_stderr(cmd,
template=template, template=template,
rstrip=rstrip, rstrip=rstrip,
umask=umask, umask=umask,
quiet=quiet)["stderr"] quiet=quiet,
timeout=timeout)["stderr"]
if not quiet: if not quiet:
log.debug('stderr: {0}'.format(stderr)) log.debug('stderr: {0}'.format(stderr))
return stderr return stderr
@ -477,6 +500,7 @@ def run_all(cmd,
rstrip=True, rstrip=True,
umask=None, umask=None,
quiet=False, quiet=False,
timeout=None,
**kwargs): **kwargs):
''' '''
Execute the passed command and return a dict of return data Execute the passed command and return a dict of return data
@ -500,7 +524,8 @@ def run_all(cmd,
template=template, template=template,
rstrip=rstrip, rstrip=rstrip,
umask=umask, umask=umask,
quiet=quiet) quiet=quiet,
timeout=timeout)
if not quiet: if not quiet:
if ret['retcode'] != 0: if ret['retcode'] != 0:
@ -528,7 +553,8 @@ def retcode(cmd,
env=(), env=(),
template=None, template=None,
umask=None, umask=None,
quiet=False): quiet=False,
timeout=None):
''' '''
Execute a shell command and return the command's return code. Execute a shell command and return the command's return code.
@ -551,7 +577,8 @@ def retcode(cmd,
env=env, env=env,
template=template, template=template,
umask=umask, umask=umask,
quiet=quiet)['retcode'] quiet=quiet,
timeout=timeout)['retcode']
def script( def script(
@ -563,6 +590,7 @@ def script(
env='base', env='base',
template='jinja', template='jinja',
umask=None, umask=None,
timeout=None,
**kwargs): **kwargs):
''' '''
Download a script from a remote location and execute the script locally. Download a script from a remote location and execute the script locally.
@ -599,7 +627,8 @@ def script(
quiet=kwargs.get('quiet', False), quiet=kwargs.get('quiet', False),
runas=runas, runas=runas,
shell=shell, shell=shell,
umask=umask umask=umask,
timeout=timeout
) )
os.remove(path) os.remove(path)
return ret return ret
@ -613,6 +642,7 @@ def script_retcode(
env='base', env='base',
template='jinja', template='jinja',
umask=None, umask=None,
timeout=None,
**kwargs): **kwargs):
''' '''
Download a script from a remote location and execute the script locally. Download a script from a remote location and execute the script locally.
@ -638,6 +668,7 @@ def script_retcode(
env, env,
template, template,
umask=umask, umask=umask,
timeout=timeout,
**kwargs)['retcode'] **kwargs)['retcode']

View File

@ -367,6 +367,7 @@ def run(name,
stateful=False, stateful=False,
umask=None, umask=None,
quiet=False, quiet=False,
timeout=None,
**kwargs): **kwargs):
''' '''
Run a command if certain circumstances are met Run a command if certain circumstances are met
@ -410,6 +411,10 @@ def run(name,
quiet quiet
The command will be executed quietly, meaning no log entries of the The command will be executed quietly, meaning no log entries of the
actual command or its return data actual command or its return data
timeout
If the command has not terminated after timeout seconds, send the
subprocess sigterm, and if sigterm is ignored, follow up with sigkill
''' '''
ret = {'name': name, ret = {'name': name,
'changes': {}, 'changes': {},
@ -481,7 +486,7 @@ def run(name,
# Wow, we passed the test, run this sucker! # Wow, we passed the test, run this sucker!
if not __opts__['test']: if not __opts__['test']:
try: try:
cmd_all = __salt__['cmd.run_all'](name, **cmd_kwargs) cmd_all = __salt__['cmd.run_all'](name, timeout=timeout, **cmd_kwargs)
except CommandExecutionError as err: except CommandExecutionError as err:
ret['comment'] = str(err) ret['comment'] = str(err)
return ret return ret
@ -511,6 +516,7 @@ def script(name,
env=None, env=None,
stateful=False, stateful=False,
umask=None, umask=None,
timeout=None,
**kwargs): **kwargs):
''' '''
Download a script from a remote source and execute it. The name can be the Download a script from a remote source and execute it. The name can be the
@ -562,6 +568,11 @@ def script(name,
stateful stateful
The command being executed is expected to return data about executing The command being executed is expected to return data about executing
a state a state
timeout
If the command has not terminated after timeout seconds, send the
subprocess sigterm, and if sigterm is ignored, follow up with sigkill
''' '''
ret = {'changes': {}, ret = {'changes': {},
'comment': '', 'comment': '',
@ -588,7 +599,8 @@ def script(name,
'group': group, 'group': group,
'cwd': cwd, 'cwd': cwd,
'template': template, 'template': template,
'umask': umask}) 'umask': umask,
'timeout': timeout})
run_check_cmd_kwargs = { run_check_cmd_kwargs = {
'cwd': cwd, 'cwd': cwd,
@ -599,11 +611,11 @@ def script(name,
# Change the source to be the name arg if it is not specified # Change the source to be the name arg if it is not specified
if source is None: if source is None:
source = name source = name
# If script args present split from name and define args # If script args present split from name and define args
if len(name.split()) > 1: if len(name.split()) > 1:
cmd_kwargs.update({'args': name.split(' ', 1)[1]}) cmd_kwargs.update({'args': name.split(' ', 1)[1]})
try: try:
cret = _run_check( cret = _run_check(
run_check_cmd_kwargs, onlyif, unless, group run_check_cmd_kwargs, onlyif, unless, group

View File

@ -0,0 +1,45 @@
"""For running command line executables with a timeout"""
import subprocess
import threading
import salt.exceptions
class TimedProc(object):
'''
Create a TimedProc object, calls subprocess.Popen with passed args and **kwargs
'''
def __init__(self, args, **kwargs):
self.command = args
self.process = subprocess.Popen(args, **kwargs)
def wait(self, timeout=None):
'''
wait for subprocess to terminate and return subprocess' return code.
If timeout is reached, throw TimedProcTimeoutError
'''
def receive():
(self.stdout, self.stderr) = self.process.communicate()
if timeout:
if not isinstance(timeout, (int,float)):
raise salt.exceptions.TimedProcTimeoutError('Error: timeout must be a number')
rt = threading.Thread(target=receive)
rt.start()
rt.join(timeout)
if rt.isAlive():
# Subprocess cleanup (best effort)
self.process.kill()
def terminate():
if rt.isAlive():
self.process.terminate()
threading.Timer(10, terminate)
raise salt.exceptions.TimedProcTimeoutError('%s : Timed out after %s seconds' % (
self.command,
str(timeout),
))
else:
receive()
return self.process.returncode

View File

@ -180,6 +180,17 @@ sys.stdout.write('cheese')
runas=runas).strip() runas=runas).strip()
self.assertEqual(result, expected_result) self.assertEqual(result, expected_result)
def test_timeout(self):
'''
cmd.run trigger timeout
'''
self.assertTrue('Timed out' in self.run_function('cmd.run', ['sleep 2 && echo hello', 'timeout=1']))
def test_timeout_success(self):
'''
cmd.run sufficient timeout to succeed
'''
self.assertTrue('hello' == self.run_function('cmd.run', ['sleep 1 && echo hello', 'timeout=2']))
if __name__ == '__main__': if __name__ == '__main__':
from integration import run_tests from integration import run_tests