Merge pull request #12135 from jacksontj/job_cache

Master_job_caches
This commit is contained in:
Thomas S Hatch 2014-04-23 13:55:51 -06:00
commit ff30f81933
13 changed files with 668 additions and 624 deletions

View File

@ -117,6 +117,8 @@ class LocalClient(object):
self.opts['transport'],
listen=not self.opts.get('__worker', False))
self.mminion = salt.minion.MasterMinion(self.opts)
def __read_master_key(self):
'''
Read in the rotating master authentication key
@ -740,89 +742,30 @@ class LocalClient(object):
if timeout is None:
timeout = self.opts['timeout']
fret = {}
inc_timeout = timeout
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = int(time.time())
# make sure the minions is a set (since we do set operations on it)
minions = set(minions)
found = set()
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
yield {}
last_time = False
# Wait for the hosts to check in
while True:
for fn_ in os.listdir(jid_dir):
ret = {}
if fn_.startswith('.'):
continue
if fn_ not in found:
retp = os.path.join(jid_dir, fn_, 'return.p')
outp = os.path.join(jid_dir, fn_, 'out.p')
if not os.path.isfile(retp):
continue
while fn_ not in ret:
try:
check = True
ret_data = self.serial.load(
salt.utils.fopen(retp, 'rb')
)
if ret_data is None:
# Sometimes the ret data is read at the wrong
# time and returns None, do a quick re-read
if check:
continue
ret[fn_] = {'ret': ret_data}
if os.path.isfile(outp):
ret[fn_]['out'] = self.serial.load(
salt.utils.fopen(outp, 'rb')
)
except Exception:
pass
found.add(fn_)
fret.update(ret)
yield ret
if glob.glob(wtag) and int(time.time()) <= start + timeout + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
# start this before the cache lookup-- in case new stuff comes in
event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
# get the info from the cache
ret = self.get_cache_returns(jid)
if ret != {}:
found.update(set(ret.keys()))
yield ret
# if you have all the returns, stop
if len(found.intersection(minions)) >= len(minions):
raise StopIteration()
# otherwise, get them from the event system
for event in event_iter:
if event != {}:
found.update(set(event.keys()))
yield event
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
break
if last_time:
if verbose:
if self.opts.get('minion_data_cache', False) \
or tgt_type in ('glob', 'pcre', 'list'):
if len(found.intersection(minions)) >= len(minions):
fail = sorted(list(minions.difference(found)))
for minion in fail:
yield({
minion: {
'out': 'no_return',
'ret': 'Minion did not return'
}
})
break
if int(time.time()) > start + timeout:
# The timeout has been reached, check the jid to see if the
# timeout needs to be increased
jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs)
more_time = False
for id_ in jinfo:
if jinfo[id_]:
if verbose:
print(
'Execution is still running on {0}'.format(id_)
)
more_time = True
if more_time:
timeout += inc_timeout
continue
else:
last_time = True
continue
time.sleep(0.01)
raise StopIteration()
def get_iter_returns(
self,
@ -845,16 +788,15 @@ class LocalClient(object):
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = int(time.time())
timeout_at = start + timeout
found = set()
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning("jid does not exist")
yield {}
# stop the iteration, since the jid is invalid
raise StopIteration()
# Wait for the hosts to check in
syndic_wait = 0
last_time = False
@ -911,10 +853,6 @@ class LocalClient(object):
jid, syndic_wait, datetime.fromtimestamp(timeout_at).time())
continue
break
if glob.glob(wtag) and int(time.time()) <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if last_time:
if len(found) < len(minions):
log.info('jid %s minions %s did not return in time',
@ -949,9 +887,6 @@ class LocalClient(object):
minions = set(minions)
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = int(time.time())
timeout_at = start + timeout
log.debug("get_returns for jid %s sent to %s will timeout at %s",
@ -959,11 +894,11 @@ class LocalClient(object):
found = set()
ret = {}
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
log.warning("jid_dir (%s) does not exist", jid_dir)
if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning("jid does not exist")
return ret
# Wait for the hosts to check in
while True:
time_left = timeout_at - int(time.time())
@ -982,10 +917,6 @@ class LocalClient(object):
# All minions have returned, break out of the loop
log.debug("jid %s found all minions", jid)
break
if glob.glob(wtag) and int(time.time()) <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if int(time.time()) > timeout_at:
log.info('jid %s minions %s did not return in time',
jid, (minions - found))
@ -998,85 +929,69 @@ class LocalClient(object):
This method starts off a watcher looking at the return data for
a specified jid, it returns all of the information for the jid
'''
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = 999999999999
gstart = int(time.time())
# TODO: change this from ret to return... or the other way.
# Its inconsistent, we should pick one
ret = {}
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
# create the iterator-- since we want to get anyone in the middle
event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid)
for minion in data:
m_data = {}
if u'return' in data[minion]:
m_data['ret'] = data[minion].get(u'return')
else:
m_data['ret'] = data[minion].get('return')
if 'out' in data[minion]:
m_data['out'] = data[minion]['out']
if minion in ret:
ret[minion].update(m_data)
else:
ret[minion] = m_data
# if we have all the minion returns, lets just return
if len(set(ret.keys()).intersection(minions)) >= len(minions):
return ret
# Wait for the hosts to check in
while True:
for fn_ in os.listdir(jid_dir):
if fn_.startswith('.'):
continue
if fn_ not in ret:
retp = os.path.join(jid_dir, fn_, 'return.p')
outp = os.path.join(jid_dir, fn_, 'out.p')
if not os.path.isfile(retp):
continue
while fn_ not in ret:
try:
ret_data = self.serial.load(
salt.utils.fopen(retp, 'rb'))
ret[fn_] = {'ret': ret_data}
if os.path.isfile(outp):
ret[fn_]['out'] = self.serial.load(
salt.utils.fopen(outp, 'rb'))
except Exception:
pass
if ret and start == 999999999999:
start = int(time.time())
if glob.glob(wtag) and int(time.time()) <= start + timeout + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
# otherwise lets use the listener we created above to get the rest
for event_ret in event_iter:
# if nothing in the event_ret, skip
if event_ret == {}:
time.sleep(0.02)
continue
for minion, m_data in event_ret.iteritems():
if minion in ret:
ret[minion].update(m_data)
else:
ret[minion] = m_data
# are we done yet?
if len(set(ret.keys()).intersection(minions)) >= len(minions):
return ret
if int(time.time()) > start + timeout:
return ret
if int(time.time()) > gstart + timeout and not ret:
# No minions have replied within the specified global timeout,
# return an empty dict
return ret
time.sleep(0.02)
# otherwise we hit the timeout, return what we have
return ret
def get_cache_returns(self, jid):
'''
Execute a single pass to gather the contents of the job cache
'''
ret = {}
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
# If someone asks for the cache returns before we created them, we don't
# want to explode
try:
for fn_ in os.listdir(jid_dir):
if fn_.startswith('.'):
continue
if fn_ not in ret:
retp = os.path.join(jid_dir, fn_, 'return.p')
outp = os.path.join(jid_dir, fn_, 'out.p')
if not os.path.isfile(retp):
continue
while fn_ not in ret:
try:
ret_data = self.serial.load(
salt.utils.fopen(retp, 'rb'))
ret[fn_] = {'ret': ret_data}
if os.path.isfile(outp):
ret[fn_]['out'] = self.serial.load(
salt.utils.fopen(outp, 'rb'))
except Exception:
pass
except IOError:
pass
data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid)
for minion in data:
m_data = {}
if u'return' in data[minion]:
m_data['ret'] = data[minion].get(u'return')
else:
m_data['ret'] = data[minion].get('return')
if 'out' in data[minion]:
m_data['out'] = data[minion]['out']
if minion in ret:
ret[minion].update(m_data)
else:
ret[minion] = m_data
return ret
@ -1103,16 +1018,14 @@ class LocalClient(object):
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = int(time.time())
timeout_at = start + timeout
found = set()
ret = {}
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning("jid does not exist")
return ret
# Wait for the hosts to check in
while True:
@ -1138,10 +1051,6 @@ class LocalClient(object):
if len(found.intersection(minions)) >= len(minions):
# All minions have returned, break out of the loop
break
if glob.glob(wtag) and int(time.time()) <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if int(time.time()) > timeout_at:
if verbose:
if self.opts.get('minion_data_cache', False) \
@ -1187,16 +1096,17 @@ class LocalClient(object):
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
start = time.time()
timeout_at = start + timeout
found = set()
wtag = os.path.join(jid_dir, 'wtag*')
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning("jid does not exist")
yield {}
# stop the iteration, since the jid is invalid
raise StopIteration()
# Wait for the hosts to check in
syndic_wait = 0
last_time = False
@ -1245,10 +1155,6 @@ class LocalClient(object):
timeout_at = time.time() + 1
continue
break
if glob.glob(wtag) and time.time() <= timeout_at + 1:
# The timeout +1 has not been reached and there is still a
# write tag for the syndic
continue
if last_time:
if verbose or show_timeout:
if self.opts.get('minion_data_cache', False) \
@ -1290,13 +1196,14 @@ class LocalClient(object):
log.trace('entered - function get_event_iter_returns()')
if timeout is None:
timeout = self.opts['timeout']
jid_dir = salt.utils.jid_dir(jid,
self.opts['cachedir'],
self.opts['hash_type'])
found = set()
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}:
log.warning("jid does not exist")
yield {}
# stop the iteration, since the jid is invalid
raise StopIteration()
# Wait for the hosts to check in
while True:
raw = self.event.get_event(timeout)

View File

@ -236,6 +236,7 @@ class SSH(object):
),
}
self.serial = salt.payload.Serial(opts)
self.mminion = salt.minion.MasterMinion(self.opts)
def verify_env(self):
'''
@ -422,52 +423,17 @@ class SSH(object):
'''
Cache the job information
'''
jid_dir = salt.utils.jid_dir(
jid,
self.opts['cachedir'],
self.opts['hash_type']
)
if not os.path.isdir(jid_dir):
log.error(
'An inconsistency occurred, a job was received with a job id '
'that is not present on the master: {0}'.format(jid)
)
return False
if os.path.exists(os.path.join(jid_dir, 'nocache')):
return
hn_dir = os.path.join(jid_dir, id_)
if not os.path.isdir(hn_dir):
os.makedirs(hn_dir)
# Otherwise the minion has already returned this jid and it should
# be dropped
else:
log.error(
'An extra return was detected from minion {0}, please verify '
'the minion, this could be a replay attack'.format(
id_
)
)
return False
self.serial.dump(
ret,
# Use atomic open here to avoid the file being read before it's
# completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, 'return.p'), 'w+b'
)
)
self.mminion.returners['{0}.returner'.format(self.opts['master_job_cache'])]({'jid': jid,
'id': id_,
'return': ret})
def run(self):
'''
Execute the overall routine
'''
jid = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
self.opts['user'])
fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
jid = self.mminion.returners[fstr]()
jid_dir = salt.utils.jid_dir(jid, self.opts['cachedir'], self.opts['hash_type'])
# Save the invocation information
arg_str = self.opts['arg_str']
@ -489,17 +455,9 @@ class SSH(object):
'fun': fun,
'arg': args,
}
self.serial.dump(
job_load,
salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b')
)
# save the targets to a cache so we can see them in the UI
targets = self.targets.keys()
targets.sort()
self.serial.dump(
targets,
salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b')
)
# save load to the master job cache
self.mminion.returners['{0}.save_load'.format(self.opts['master_job_cache'])](jid, job_load)
if self.opts.get('verbose'):
msg = 'Executing job with jid {0}'.format(jid)

View File

@ -187,7 +187,7 @@ VALID_OPTS = {
'order_masters': bool,
'job_cache': bool,
'ext_job_cache': str,
'master_ext_job_cache': str,
'master_job_cache': str,
'minion_data_cache': bool,
'publish_session': int,
'reactor': list,
@ -413,7 +413,7 @@ DEFAULT_MASTER_OPTS = {
'order_masters': False,
'job_cache': True,
'ext_job_cache': '',
'master_ext_job_cache': '',
'master_job_cache': 'local_cache',
'minion_data_cache': True,
'enforce_mine_cache': False,
'ipv6': False,

View File

@ -8,8 +8,6 @@ involves preparing the three listeners and the workers needed by the master.
import os
import re
import logging
import shutil
import datetime
try:
import pwd
except ImportError:
@ -100,43 +98,16 @@ def clean_old_jobs(opts):
'''
Clean out the old jobs from the job cache
'''
if opts['keep_jobs'] != 0:
jid_root = os.path.join(opts['cachedir'], 'jobs')
cur = datetime.datetime.now()
if os.path.exists(jid_root):
for top in os.listdir(jid_root):
t_path = os.path.join(jid_root, top)
for final in os.listdir(t_path):
f_path = os.path.join(t_path, final)
jid_file = os.path.join(f_path, 'jid')
if not os.path.isfile(jid_file):
# No jid file means corrupted cache entry, scrub it
shutil.rmtree(f_path)
else:
with salt.utils.fopen(jid_file, 'r') as fn_:
jid = fn_.read()
if len(jid) < 18:
# Invalid jid, scrub the dir
shutil.rmtree(f_path)
else:
# Parse the jid into a proper datetime object.
# We only parse down to the minute, since keep
# jobs is measured in hours, so a minute
# difference is not important.
try:
jidtime = datetime.datetime(int(jid[0:4]),
int(jid[4:6]),
int(jid[6:8]),
int(jid[8:10]),
int(jid[10:12]))
except ValueError as e:
# Invalid jid, scrub the dir
shutil.rmtree(f_path)
difference = cur - jidtime
hours_difference = difference.seconds / 3600.0
if hours_difference > opts['keep_jobs']:
shutil.rmtree(f_path)
# TODO: better way to not require creating the masterminion every time?
mminion = salt.minion.MasterMinion(
opts,
states=False,
rend=False,
)
# If the master job cache has a clean_old_jobs, call it
fstr = '{0}.clean_old_jobs'.format(opts['master_job_cache'])
if fstr in mminion.returners:
mminion.returners[fstr]()
def access_keys(opts):
@ -505,64 +476,21 @@ class RemoteFuncs(object):
return False
if load['jid'] == 'req':
# The minion is returning a standalone job, request a jobid
load['jid'] = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
load.get('nocache', False))
prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False))
# save the load, since we don't have it
saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[saveload_fstr](load['jid'], load)
log.info('Got return from {id} for job {jid}'.format(**load))
self.event.fire_event(load, load['jid']) # old dup event
self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job'))
self.event.fire_ret_load(load)
if self.opts['master_ext_job_cache']:
fstr = '{0}.returner'.format(self.opts['master_ext_job_cache'])
self.mminion.returners[fstr](load)
return
if not self.opts['job_cache'] or self.opts.get('ext_job_cache'):
return
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
if not os.path.isdir(jid_dir):
log.error(
'An inconsistency occurred, a job was received with a job id '
'that is not present on the master: {jid}'.format(**load)
)
return False
if os.path.exists(os.path.join(jid_dir, 'nocache')):
return
hn_dir = os.path.join(jid_dir, load['id'])
if not os.path.isdir(hn_dir):
os.makedirs(hn_dir)
# Otherwise the minion has already returned this jid and it should
# be dropped
else:
log.error(
'An extra return was detected from minion {0}, please verify '
'the minion, this could be a replay attack'.format(
load['id']
)
)
return False
self.serial.dump(
load['return'],
# Use atomic open here to avoid the file being read before it's
# completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, 'return.p'), 'w+b'
)
)
if 'out' in load:
self.serial.dump(
load['out'],
# Use atomic open here to avoid the file being read before
# it's completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, 'out.p'), 'w+b'
)
)
fstr = '{0}.returner'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load)
def _syndic_return(self, load):
'''
@ -572,29 +500,10 @@ class RemoteFuncs(object):
# Verify the load
if any(key not in load for key in ('return', 'jid', 'id')):
return None
# set the write flag
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
if 'load' in load:
with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_:
self.serial.dump(load['load'], fp_)
wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id']))
try:
with salt.utils.fopen(wtag, 'w+') as fp_:
fp_.write('')
except (IOError, OSError):
log.error(
'Failed to commit the write tag for the syndic return, are '
'permissions correct in the cache dir: {0}?'.format(
self.opts['cachedir']
)
)
return False
# if we have a load, save it
if 'load' in load:
fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load['jid'], load['load'])
# Format individual return loads
for key, item in load['return'].items():
@ -604,8 +513,6 @@ class RemoteFuncs(object):
if 'out' in load:
ret['out'] = load['out']
self._return(ret)
if os.path.isfile(wtag):
os.remove(wtag)
def minion_runner(self, load):
'''
@ -1326,17 +1233,9 @@ class LocalFuncs(object):
}
# Retrieve the jid
if not load['jid']:
load['jid'] = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
extra.get('nocache', False)
)
fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False))
self.event.fire_event({'minions': minions}, load['jid'])
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
new_job_load = {
'jid': load['jid'],
@ -1352,19 +1251,7 @@ class LocalFuncs(object):
self.event.fire_event(new_job_load, 'new_job') # old dup event
self.event.fire_event(new_job_load, tagify([load['jid'], 'new'], 'job'))
# Verify the jid dir
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
# Save the invocation information
self.serial.dump(
load,
salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b')
)
# save the minions to a cache so we can see in the UI
self.serial.dump(
minions,
salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b')
)
if self.opts['ext_job_cache']:
try:
fstr = '{0}.save_load'.format(self.opts['ext_job_cache'])
@ -1381,6 +1268,23 @@ class LocalFuncs(object):
'The specified returner threw a stack trace:\n',
exc_info=True
)
# always write out to the master job cache
try:
fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load['jid'], load)
except KeyError:
log.critical(
'The specified returner used for the master job cache '
'"{0}" does not have a save_load function!'.format(
self.opts['master_job_cache']
)
)
except Exception:
log.critical(
'The specified returner threw a stack trace:\n',
exc_info=True
)
# Set up the payload
payload = {'enc': 'aes'}
# Altering the contents of the publish load is serious!! Changes here

View File

@ -1244,78 +1244,30 @@ class AESFuncs(object):
return False
if not salt.utils.verify.valid_id(self.opts, load['id']):
return False
new_loadp = False
if load['jid'] == 'req':
# The minion is returning a standalone job, request a jobid
load['arg'] = load.get('arg', load.get('fun_args', []))
load['tgt_type'] = 'glob'
load['tgt'] = load['id']
load['jid'] = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
load.get('nocache', False))
new_loadp = load.get('nocache', True) and True
prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False))
# save the load, since we don't have it
saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[saveload_fstr](load['jid'], load)
log.info('Got return from {id} for job {jid}'.format(**load))
self.event.fire_event(load, load['jid']) # old dup event
self.event.fire_event(
load, tagify([load['jid'], 'ret', load['id']], 'job'))
self.event.fire_ret_load(load)
if self.opts['master_ext_job_cache']:
fstr = '{0}.returner'.format(self.opts['master_ext_job_cache'])
self.mminion.returners[fstr](load)
return
# if you have a job_cache, or an ext_job_cache, don't write to the regular master cache
if not self.opts['job_cache'] or self.opts.get('ext_job_cache'):
return
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
if os.path.exists(os.path.join(jid_dir, 'nocache')):
return
if new_loadp:
with salt.utils.fopen(
os.path.join(jid_dir, '.load.p'), 'w+b'
) as fp_:
self.serial.dump(load, fp_)
hn_dir = os.path.join(jid_dir, load['id'])
try:
os.mkdir(hn_dir)
except OSError as e:
if e.errno == errno.EEXIST:
# Minion has already returned this jid and it should be dropped
log.error(
'An extra return was detected from minion {0}, please verify '
'the minion, this could be a replay attack'.format(
load['id']
)
)
return False
elif e.errno == errno.ENOENT:
log.error(
'An inconsistency occurred, a job was received with a job id '
'that is not present on the master: {jid}'.format(**load)
)
return False
raise
self.serial.dump(
load['return'],
# Use atomic open here to avoid the file being read before it's
# completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, 'return.p'), 'w+b'
)
)
if 'out' in load:
self.serial.dump(
load['out'],
# Use atomic open here to avoid the file being read before
# it's completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, 'out.p'), 'w+b'
)
)
# otherwise, write to the master cache
fstr = '{0}.returner'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load)
def _syndic_return(self, load):
'''
@ -1325,31 +1277,10 @@ class AESFuncs(object):
# Verify the load
if any(key not in load for key in ('return', 'jid', 'id')):
return None
if not salt.utils.verify.valid_id(self.opts, load['id']):
return False
# set the write flag
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
if 'load' in load:
with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_:
self.serial.dump(load['load'], fp_)
wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id']))
try:
with salt.utils.fopen(wtag, 'w+b') as fp_:
fp_.write('')
except (IOError, OSError):
log.error(
'Failed to commit the write tag for the syndic return, are '
'permissions correct in the cache dir: {0}?'.format(
self.opts['cachedir']
)
)
return False
# if we have a load, save it
if 'load' in load:
fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load['jid'], load)
# Format individual return loads
for key, item in load['return'].items():
@ -1359,8 +1290,6 @@ class AESFuncs(object):
if 'out' in load:
ret['out'] = load['out']
self._return(ret)
if os.path.isfile(wtag):
os.remove(wtag)
def minion_runner(self, clear_load):
'''
@ -2537,17 +2466,9 @@ class ClearFuncs(object):
}
# Retrieve the jid
if not clear_load['jid']:
clear_load['jid'] = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
extra.get('nocache', False)
)
fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
clear_load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False))
self.event.fire_event({'minions': minions}, clear_load['jid'])
jid_dir = salt.utils.jid_dir(
clear_load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
new_job_load = {
'jid': clear_load['jid'],
@ -2563,19 +2484,6 @@ class ClearFuncs(object):
self.event.fire_event(new_job_load, 'new_job') # old dup event
self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job'))
# Verify the jid dir
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
# Save the invocation information
self.serial.dump(
clear_load,
salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b')
)
# save the minions to a cache so we can see in the UI
self.serial.dump(
minions,
salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b')
)
if self.opts['ext_job_cache']:
try:
fstr = '{0}.save_load'.format(self.opts['ext_job_cache'])
@ -2592,6 +2500,24 @@ class ClearFuncs(object):
'The specified returner threw a stack trace:\n',
exc_info=True
)
# always write out to the master job caches
try:
fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](clear_load['jid'], clear_load)
except KeyError:
log.critical(
'The specified returner used for the master job cache '
'"{0}" does not have a save_load function!'.format(
self.opts['master_job_cache']
)
)
except Exception:
log.critical(
'The specified returner threw a stack trace:\n',
exc_info=True
)
# Set up the payload
payload = {'enc': 'aes'}
# Altering the contents of the publish load is serious!! Changes here

View File

@ -1480,6 +1480,7 @@ class Syndic(Minion):
self._syndic = True
opts['loop_interval'] = 1
super(Syndic, self).__init__(opts)
self.mminion = salt.minion.MasterMinion(opts)
def _handle_aes(self, load, sig=None):
'''
@ -1670,10 +1671,11 @@ class Syndic(Minion):
if not jdict:
jdict['__fun__'] = event['data'].get('fun')
jdict['__jid__'] = event['data']['jid']
jdict['__load__'] = salt.utils.jid_load(
event['data']['jid'],
self.local.opts['cachedir'],
self.opts['hash_type'])
jdict['__load__'] = {}
fstr = '{0}.get_jid'.format(self.opts['master_job_cache'])
jdict['__load__'].update(
self.mminion.returners[fstr](event['data']['jid'])
)
jdict[event['data']['id']] = event['data']['return']
else:
# Add generic event aggregation here

View File

@ -27,7 +27,7 @@ STATIC = (
)
def display_output(data, out, opts=None):
def display_output(data, out=None, opts=None):
'''
Print the passed data using the desired output
'''

View File

@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
'''
Return data to local job cache
'''
# Import python libs
import errno
import logging
import os
import shutil
import datetime
import hashlib
# Import salt libs
import salt.payload
import salt.utils
log = logging.getLogger(__name__)
# load is the published job
LOAD_P = '.load.p'
# the list of minions that the job is targeted to (best effort match on the master side)
MINIONS_P = '.minions.p'
# return is the "return" from the minion data
RETURN_P = 'return.p'
# out is the "out" from the minion data
OUT_P = 'out.p'
def _job_dir():
'''
Return root of the jobs cache directory
'''
return os.path.join(__opts__['cachedir'],
'jobs')
def _jid_dir(jid):
'''
Return the jid_dir for the given job id
'''
jid = str(jid)
jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest()
return os.path.join(_job_dir(),
jhash[:2],
jhash[2:])
def _walk_through(job_dir):
serial = salt.payload.Serial(__opts__)
for top in os.listdir(job_dir):
t_path = os.path.join(job_dir, top)
for final in os.listdir(t_path):
load_path = os.path.join(t_path, final, LOAD_P)
if not os.path.isfile(load_path):
continue
job = serial.load(salt.utils.fopen(load_path, 'rb'))
jid = job['jid']
yield jid, job, t_path, final
def _format_job_instance(job):
return {'Function': job.get('fun', 'unknown-function'),
'Arguments': list(job.get('arg', [])),
# unlikely but safeguard from invalid returns
'Target': job.get('tgt', 'unknown-target'),
'Target-type': job.get('tgt_type', []),
'User': job.get('user', 'root')}
def _format_jid_instance(jid, job):
ret = _format_job_instance(job)
ret.update({'StartTime': salt.utils.jid_to_time(jid)})
return ret
#TODO: add to returner docs-- this is a new one
def prep_jid(nocache=False):
'''
Return a job id and prepare the job id directory
This is the function responsible for making sure jids don't collide (unless its passed a jid)
So do what you have to do to make sure that stays the case
'''
jid = salt.utils.gen_jid()
jid_dir_ = _jid_dir(jid)
# make sure we create the jid dir, otherwise someone else is using it,
# meaning we need a new jid
try:
os.makedirs(jid_dir_)
except OSError:
# TODO: some sort of sleep or something? Spinning is generally bad practice
return prep_jid(nocache=nocache)
with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_:
fn_.write(jid)
if nocache:
with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'w+') as fn_:
fn_.write('')
return jid
def returner(load):
'''
Return data to the local job cache
'''
serial = salt.payload.Serial(__opts__)
jid_dir = _jid_dir(load['jid'])
if os.path.exists(os.path.join(jid_dir, 'nocache')):
return
# do we need to rewrite the load?
if load['jid'] == 'req' and bool(load.get('nocache', True)):
with salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b') as fp_:
serial.dump(load, fp_)
hn_dir = os.path.join(jid_dir, load['id'])
try:
os.mkdir(hn_dir)
except OSError as e:
if e.errno == errno.EEXIST:
# Minion has already returned this jid and it should be dropped
log.error(
'An extra return was detected from minion {0}, please verify '
'the minion, this could be a replay attack'.format(
load['id']
)
)
return False
elif e.errno == errno.ENOENT:
log.error(
'An inconsistency occurred, a job was received with a job id '
'that is not present in the local cache: {jid}'.format(**load)
)
return False
raise
serial.dump(
load['return'],
# Use atomic open here to avoid the file being read before it's
# completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, RETURN_P), 'w+b'
)
)
if 'out' in load:
serial.dump(
load['out'],
# Use atomic open here to avoid the file being read before
# it's completely written to. Refs #1935
salt.utils.atomicfile.atomic_open(
os.path.join(hn_dir, OUT_P), 'w+b'
)
)
def save_load(jid, clear_load):
'''
Save the load to the specified jid
'''
jid_dir = _jid_dir(clear_load['jid'])
serial = salt.payload.Serial(__opts__)
# if you have a tgt, save that for the UI etc
if 'tgt' in clear_load:
ckminions = salt.utils.minions.CkMinions(__opts__)
# Retrieve the minions list
minions = ckminions.check_minions(
clear_load['tgt'],
clear_load.get('tgt_type', 'glob')
)
# save the minions to a cache so we can see in the UI
serial.dump(
minions,
salt.utils.fopen(os.path.join(jid_dir, MINIONS_P), 'w+b')
)
# Save the invocation information
serial.dump(
clear_load,
salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b')
)
def get_load(jid):
'''
Return the load data that marks a specified jid
'''
jid_dir = _jid_dir(jid)
if not os.path.exists(jid_dir):
return {}
serial = salt.payload.Serial(__opts__)
ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'rb'))
minions_path = os.path.join(jid_dir, MINIONS_P)
if os.path.isfile(minions_path):
ret['Minions'] = serial.load(salt.utils.fopen(minions_path, 'rb'))
return ret
def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
jid_dir = _jid_dir(jid)
serial = salt.payload.Serial(__opts__)
ret = {}
# Check to see if the jid is real, if not return the empty dict
if not os.path.isdir(jid_dir):
return ret
for fn_ in os.listdir(jid_dir):
if fn_.startswith('.'):
continue
if fn_ not in ret:
retp = os.path.join(jid_dir, fn_, RETURN_P)
outp = os.path.join(jid_dir, fn_, OUT_P)
if not os.path.isfile(retp):
continue
while fn_ not in ret:
try:
ret_data = serial.load(
salt.utils.fopen(retp, 'rb'))
ret[fn_] = {'return': ret_data}
if os.path.isfile(outp):
ret[fn_]['out'] = serial.load(
salt.utils.fopen(outp, 'rb'))
except Exception:
pass
return ret
def get_jids():
'''
Return a list of all job ids
'''
ret = {}
for jid, job, t_path, final in _walk_through(_job_dir()):
ret[jid] = _format_jid_instance(jid, job)
return ret
def clean_old_jobs():
'''
Clean out the old jobs from the job cache
'''
if __opts__['keep_jobs'] != 0:
cur = datetime.datetime.now()
jid_root = _job_dir()
if not os.path.exists(jid_root):
return
for top in os.listdir(jid_root):
t_path = os.path.join(jid_root, top)
for final in os.listdir(t_path):
f_path = os.path.join(t_path, final)
jid_file = os.path.join(f_path, 'jid')
if not os.path.isfile(jid_file):
# No jid file means corrupted cache entry, scrub it
shutil.rmtree(f_path)
else:
with salt.utils.fopen(jid_file, 'r') as fn_:
jid = fn_.read()
if len(jid) < 18:
# Invalid jid, scrub the dir
shutil.rmtree(f_path)
else:
# Parse the jid into a proper datetime object.
# We only parse down to the minute, since keep
# jobs is measured in hours, so a minute
# difference is not important.
try:
jidtime = datetime.datetime(int(jid[0:4]),
int(jid[4:6]),
int(jid[6:8]),
int(jid[8:10]),
int(jid[10:12]))
except ValueError as e:
# Invalid jid, scrub the dir
shutil.rmtree(f_path)
difference = cur - jidtime
hours_difference = difference.seconds / 3600.0
if hours_difference > __opts__['keep_jobs']:
shutil.rmtree(f_path)

View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
'''
Read/Write multiple returners
'''
# Import python libs
import logging
# Import salt libs
import salt.minion
log = logging.getLogger(__name__)
CONFIG_KEY = 'multi_returner'
# cache of the master mininon for this returner
MMINION = None
def _mminion():
'''
Create a single mminion for this module to use, instead of reloading all the time
'''
global MMINION
if MMINION is None:
MMINION = salt.minion.MasterMinion(__opts__)
return MMINION
def prep_jid(nocache=False):
'''
Call both with prep_jid on all returners in multi_returner
TODO: finish this, what do do when you get different jids from 2 returners...
since our jids are time based, this make this problem hard, beacuse they
aren't unique, meaning that we have to make sure that no one else got the jid
and if they did we spin to get a new one, which means "locking" the jid in 2
returners is non-trivial
'''
jid = None
for returner in __opts__[CONFIG_KEY]:
if jid is None:
jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache)
else:
r_jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache)
if r_jid != jid:
print 'Uhh.... crud the jids do not match'
return jid
def returner(load):
'''
Write return to all returners in multi_returner
'''
for returner in __opts__[CONFIG_KEY]:
_mminion().returners['{0}.returner'.format(returner)](load)
def save_load(jid, clear_load):
'''
Write load to all returners in multi_returner
'''
for returner in __opts__[CONFIG_KEY]:
_mminion().returners['{0}.save_load'.format(returner)](jid, clear_load)
def get_load(jid):
'''
Merge the load data from all returners
'''
ret = {}
for returner in __opts__[CONFIG_KEY]:
ret.update(_mminion().returners['{0}.get_load'.format(returner)](jid))
return ret
def get_jid(jid):
'''
Merge the return data from all returners
'''
ret = {}
for returner in __opts__[CONFIG_KEY]:
ret.update(_mminion().returners['{0}.get_jid'.format(returner)](jid))
return ret
def get_jids():
'''
Return all job data from all returners
'''
ret = {}
for returner in __opts__[CONFIG_KEY]:
ret.update(_mminion().returners['{0}.get_jids'.format(returner)]())
return ret
def clean_old_jobs():
'''
Clean out the old jobs from all returners (if you have it)
'''
for returner in __opts__[CONFIG_KEY]:
fstr = '{0}.clean_old_jobs'.format(returner)
if fstr in _mminion().returners:
_mminion().returners[fstr]()

View File

@ -37,19 +37,16 @@ def active():
ret[job['jid']].update({'Running': [{minion: job.get('pid', None)}], 'Returned': []})
else:
ret[job['jid']]['Running'].append({minion: job['pid']})
mminion = salt.minion.MasterMinion(__opts__)
for jid in ret:
jid_dir = salt.utils.jid_dir(
jid,
__opts__['cachedir'],
__opts__['hash_type'])
if not os.path.isdir(jid_dir):
continue
for minion in os.listdir(jid_dir):
if minion.startswith('.') or minion == 'jid':
continue
if os.path.exists(os.path.join(jid_dir, minion)):
returner = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_cache']))
data = mminion.returners['{0}.get_jid'.format(returner)](jid)
for minion in data:
if minion not in ret[jid]['Returned']:
ret[jid]['Returned'].append(minion)
salt.output.display_output(ret, 'yaml', __opts__)
salt.output.display_output(ret, opts=__opts__)
return ret
@ -64,32 +61,18 @@ def lookup_jid(jid, ext_source=None, output=True):
salt-run jobs.lookup_jid 20130916125524463507
'''
ret = {}
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache']))
if returner:
out = 'nested'
mminion = salt.minion.MasterMinion(__opts__)
data = mminion.returners['{0}.get_jid'.format(returner)](jid)
for minion in data:
if u'return' in data[minion]:
ret[minion] = data[minion].get(u'return')
else:
ret[minion] = data[minion].get('return')
if 'out' in data[minion]:
out = data[minion]['out']
salt.output.display_output(ret, out, __opts__)
return ret
# Fall back to the local job cache
client = salt.client.get_local_client(__opts__['conf_file'])
for mid, data in client.get_full_returns(jid, [], 0).items():
ret[mid] = data.get('ret')
if output:
salt.output.display_output(
{mid: ret[mid]},
data.get('out', None),
__opts__)
mminion = salt.minion.MasterMinion(__opts__)
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
data = mminion.returners['{0}.get_jid'.format(returner)](jid)
for minion in data:
if u'return' in data[minion]:
ret[minion] = data[minion].get(u'return')
else:
ret[minion] = data[minion].get('return')
if 'out' in data[minion]:
out = data[minion]['out']
salt.output.display_output(ret, opts=__opts__)
return ret
@ -104,43 +87,13 @@ def list_job(jid, ext_source=None):
salt-run jobs.list_job 20130916125524463507
'''
ret = {'jid': jid}
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache']))
if returner:
out = 'nested'
mminion = salt.minion.MasterMinion(__opts__)
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret.update(_format_jid_instance(jid, job))
ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)
salt.output.display_output(ret, out, __opts__)
return ret
mminion = salt.minion.MasterMinion(__opts__)
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
jid_dir = salt.utils.jid_dir(
jid,
__opts__['cachedir'],
__opts__['hash_type']
)
if not os.path.exists(jid_dir):
return ret
# we have to copy/paste this code, because we don't seem to have a good API
serial = salt.payload.Serial(__opts__)
# get the load info
load_path = os.path.join(jid_dir, '.load.p')
job = serial.load(salt.utils.fopen(load_path, 'rb'))
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret.update(_format_jid_instance(jid, job))
# get the hosts information using the localclient (instead of re-implementing the code...)
client = salt.client.get_local_client(__opts__['conf_file'])
ret['Result'] = {}
minions_path = os.path.join(jid_dir, '.minions.p')
if os.path.isfile(minions_path):
minions = serial.load(salt.utils.fopen(minions_path, 'rb'))
ret['Minions'] = minions
salt.output.display_output(ret, 'yaml', __opts__)
ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)
salt.output.display_output(ret, opts=__opts__)
return ret
@ -154,19 +107,12 @@ def list_jobs(ext_source=None):
salt-run jobs.list_jobs
'''
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache']))
if returner:
out = 'nested'
mminion = salt.minion.MasterMinion(__opts__)
ret = mminion.returners['{0}.get_jids'.format(returner)]()
salt.output.display_output(ret, out, __opts__)
return ret
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
mminion = salt.minion.MasterMinion(__opts__)
ret = mminion.returners['{0}.get_jids'.format(returner)]()
salt.output.display_output(ret, opts=__opts__)
ret = {}
job_dir = os.path.join(__opts__['cachedir'], 'jobs')
for jid, job, t_path, final in _walk_through(job_dir):
ret[jid] = _format_jid_instance(jid, job)
salt.output.display_output(ret, 'yaml', __opts__)
return ret
@ -182,61 +128,37 @@ def print_job(jid, ext_source=None):
'''
ret = {}
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache']))
if returner:
out = 'nested'
mminion = salt.minion.MasterMinion(__opts__)
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret[jid] = _format_jid_instance(jid, job)
ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)
salt.output.display_output(ret, out, __opts__)
return ret
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
mminion = salt.minion.MasterMinion(__opts__)
jid_dir = salt.utils.jid_dir(
jid,
__opts__['cachedir'],
__opts__['hash_type']
)
if not os.path.exists(jid_dir):
return ret
# we have to copy/paste this code, because we don't seem to have a good API
serial = salt.payload.Serial(__opts__)
# get the load info
load_path = os.path.join(jid_dir, '.load.p')
job = serial.load(salt.utils.fopen(load_path, 'rb'))
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret[jid] = _format_jid_instance(jid, job)
ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)
salt.output.display_output(ret, opts=__opts__)
# get the hosts information using the localclient (instead of re-implementing the code...)
client = salt.client.get_local_client(__opts__['conf_file'])
ret[jid]['Result'] = {}
for mid, data in client.get_full_returns(jid, [], 0).items():
# have to make it say return so that it matches everyone else...
minion_data = {'return': data.get('ret')}
ret[jid]['Result'].update({mid: minion_data})
salt.output.display_output(ret, 'yaml', __opts__)
return ret
def _get_returner(returners):
def _get_returner(returner_types):
'''
Helper to iterate over retuerners and pick the first one
Helper to iterate over retuerner_types and pick the first one
'''
for ret in returners:
if ret:
return ret
for returner in returner_types:
if returner:
return returner
def _format_job_instance(job):
return {'Function': job.get('fun', 'unknown-function'),
'Arguments': list(job.get('arg', [])),
# unlikely but safeguard from invalid returns
'Target': job.get('tgt', 'unknown-target'),
'Target-type': job.get('tgt_type', []),
'User': job.get('user', 'root')}
ret = {'Function': job.get('fun', 'unknown-function'),
'Arguments': list(job.get('arg', [])),
# unlikely but safeguard from invalid returns
'Target': job.get('tgt', 'unknown-target'),
'Target-type': job.get('tgt_type', []),
'User': job.get('user', 'root')}
if 'Minions' in job:
ret['Minions'] = job['Minions']
return ret
def _format_jid_instance(jid, job):

View File

@ -573,6 +573,12 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False):
'''
Return a job id and prepare the job id directory
'''
salt.utils.warn_until(
'Boron',
'All job_cache management has been moved into the local_cache '
'returner, this util function will be removed-- please use '
'the returner'
)
jid = gen_jid()
jid_dir_ = jid_dir(jid, cachedir, sum_type)
@ -596,6 +602,12 @@ def jid_dir(jid, cachedir, sum_type):
'''
Return the jid_dir for the given job id
'''
salt.utils.warn_until(
'Boron',
'All job_cache management has been moved into the local_cache '
'returner, this util function will be removed-- please use '
'the returner'
)
jid = str(jid)
jhash = getattr(hashlib, sum_type)(jid).hexdigest()
return os.path.join(cachedir, 'jobs', jhash[:2], jhash[2:])
@ -605,6 +617,11 @@ def jid_load(jid, cachedir, sum_type, serial='msgpack'):
'''
Return the load data for a given job id
'''
salt.utils.warn_until(
'Boron',
'Getting the load has been moved into the returner interface '
'please get the data from the master_job_cache '
)
_dir = jid_dir(jid, cachedir, sum_type)
load_fn = os.path.join(_dir, '.load.p')
if not os.path.isfile(load_fn):

View File

@ -22,7 +22,7 @@ class ManageTest(integration.ShellCase):
'''
ret = self.run_run_plus('jobs.active')
self.assertEqual(ret['fun'], {})
self.assertEqual(ret['out'], ['{}'])
self.assertEqual(ret['out'], [])
def test_lookup_jid(self):
'''

View File

@ -116,9 +116,9 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn):
if 'returnTOmaster' in j][0]
jid, idx = None, first_match[0]
while idx > 0:
jid = re.match("('|\")([0-9]+)('|\"):", jobs[idx])
jid = re.match("([0-9]+):", jobs[idx])
if jid:
jid = jid.group(2)
jid = jid.group(1)
break
idx -= 1
assert idx > 0