Merge pull request #44769 from thatch45/pause_states

Pause states
This commit is contained in:
Thomas S Hatch 2017-12-01 15:56:56 -07:00 committed by GitHub
commit 6d815440f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 0 deletions

View File

@ -43,6 +43,7 @@ from salt.runners.state import orchestrate as _orchestrate
# Import 3rd-party libs
from salt.ext import six
import msgpack
__proxyenabled__ = ['*']
@ -165,6 +166,99 @@ def _snapper_post(opts, jid, pre_num):
log.error('Failed to create snapper pre snapshot for jid: {0}'.format(jid))
def pause(jid, state_id=None, duration=None):
'''
Set up a state id pause, this instructs a running state to pause at a given
state id. This needs to pass in the jid of the running state and can
optionally pass in a duration in seconds. If a state_id is not passed then
the jid referenced will be paused at the begining of the next state run.
The given state id is the id got a given state execution, so given a state
that looks like this:
.. code-block:: yaml
vim:
pkg.installed: []
The state_id to pass to `pause` is `vim`
CLI Examples:
.. code-block:: bash
salt '*' state.pause 20171130110407769519
salt '*' state.pause 20171130110407769519 vim
salt '*' state.pause 20171130110407769519 vim 20
'''
jid = str(jid)
if state_id is None:
state_id = '__all__'
pause_dir = os.path.join(__opts__[u'cachedir'], 'state_pause')
pause_path = os.path.join(pause_dir, jid)
if not os.path.exists(pause_dir):
try:
os.makedirs(pause_dir)
except OSError:
# File created in the gap
pass
data = {}
if os.path.exists(pause_path):
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
data = msgpack.loads(fp_.read())
if state_id not in data:
data[state_id] = {}
if duration:
data[state_id]['duration'] = int(duration)
with salt.utils.files.fopen(pause_path, 'wb') as fp_:
fp_.write(msgpack.dumps(data))
def resume(jid, state_id=None):
'''
Remove a pause from a jid, allowing it to continue. If the state_id is
not specified then the a general pause will be resumed.
The given state_id is the id got a given state execution, so given a state
that looks like this:
.. code-block:: yaml
vim:
pkg.installed: []
The state_id to pass to `rm_pause` is `vim`
CLI Examples:
.. code-block:: bash
salt '*' state.resume 20171130110407769519
salt '*' state.resume 20171130110407769519 vim
'''
jid = str(jid)
if state_id is None:
state_id = '__all__'
pause_dir = os.path.join(__opts__[u'cachedir'], 'state_pause')
pause_path = os.path.join(pause_dir, jid)
if not os.path.exists(pause_dir):
try:
os.makedirs(pause_dir)
except OSError:
# File created in the gap
pass
data = {}
if os.path.exists(pause_path):
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
data = msgpack.loads(fp_.read())
else:
return True
if state_id in data:
data.pop(state_id)
with salt.utils.files.fopen(pause_path, 'wb') as fp_:
fp_.write(msgpack.dumps(data))
def orchestrate(mods,
saltenv='base',
test=None,

View File

@ -15,6 +15,24 @@ from salt.exceptions import SaltInvocationError
LOGGER = logging.getLogger(__name__)
def set_pause(jid, state_id, duration=None):
'''
Set up a state id pause, this instructs a running state to pause at a given
state id. This needs to pass in the jid of the running state and can
optionally pass in a duration in seconds.
'''
minion = salt.minion.MasterMinion(__opts__)
minion['state.set_pause'](jid, state_id, duration)
def rm_pause(jid, state_id, duration=None):
'''
Remove a pause from a jid, allowing it to continue
'''
minion = salt.minion.MasterMinion(__opts__)
minion['state.rm_pause'](jid, state_id)
def orchestrate(mods,
saltenv='base',
test=None,

View File

@ -1918,6 +1918,8 @@ class State(object):
if self.mocked:
ret = mock_ret(cdata)
else:
# Check if this low chunk is paused
self.check_pause(low)
# Execute the state function
if not low.get(u'__prereq__') and low.get(u'parallel'):
# run the state call in parallel, but only if not in a prereq
@ -2127,6 +2129,48 @@ class State(object):
return not running[tag][u'result']
return False
def check_pause(self, low):
'''
Check to see if this low chunk has been paused
'''
if not self.jid:
# Can't pause on salt-ssh since we can't track continuous state
return
pause_path = os.path.join(self.opts[u'cachedir'], 'state_pause', self.jid)
start = time.time()
if os.path.isfile(pause_path):
try:
while True:
tries = 0
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
try:
pdat = msgpack.loads(fp_.read())
except msgpack.UnpackValueError:
# Reading race condition
if tries > 10:
# Break out if there are a ton of read errors
return
tries += 1
time.sleep(1)
continue
id_ = low[u'__id__']
key = u''
if id_ in pdat:
key = id_
elif u'__all__' in pdat:
key = u'__all__'
if key:
if u'duration' in pdat[key]:
now = time.time()
if now - start > pdat[key][u'duration']:
return
else:
return
time.sleep(1)
except Exception as exc:
log.error('Failed to read in pause data for file located at: %s', pause_path)
return
def reconcile_procs(self, running):
'''
Check the running dict for processes and resolve them
@ -2682,6 +2726,14 @@ class State(object):
except OSError:
log.debug(u'File %s does not exist, no need to cleanup', accum_data_path)
_cleanup_accumulator_data()
if self.jid is not None:
pause_path = os.path.join(self.opts[u'cachedir'], u'state_pause', self.jid)
if os.path.isfile(pause_path):
try:
os.remove(pause_path)
except OSError:
# File is not present, all is well
pass
return ret