diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 66891b2002..50b0a20f0d 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -117,6 +117,8 @@ class LocalClient(object): self.opts['transport'], listen=not self.opts.get('__worker', False)) + self.mminion = salt.minion.MasterMinion(self.opts) + def __read_master_key(self): ''' Read in the rotating master authentication key @@ -740,89 +742,30 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] fret = {} - inc_timeout = timeout - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - start = int(time.time()) + # make sure the minions is a set (since we do set operations on it) + minions = set(minions) + found = set() - wtag = os.path.join(jid_dir, 'wtag*') - # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): - yield {} - last_time = False - # Wait for the hosts to check in - while True: - for fn_ in os.listdir(jid_dir): - ret = {} - if fn_.startswith('.'): - continue - if fn_ not in found: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - check = True - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb') - ) - if ret_data is None: - # Sometimes the ret data is read at the wrong - # time and returns None, do a quick re-read - if check: - continue - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb') - ) - except Exception: - pass - found.add(fn_) - fret.update(ret) - yield ret - if glob.glob(wtag) and int(time.time()) <= start + timeout + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue + # start this before the cache lookup-- in case new stuff comes in + event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) + + # get the info from the cache + ret = self.get_cache_returns(jid) + if ret != {}: + found.update(set(ret.keys())) + yield ret + + # if you have all the returns, stop + if len(found.intersection(minions)) >= len(minions): + raise StopIteration() + + # otherwise, get them from the event system + for event in event_iter: + if event != {}: + found.update(set(event.keys())) + yield event if len(found.intersection(minions)) >= len(minions): - # All minions have returned, break out of the loop - break - if last_time: - if verbose: - if self.opts.get('minion_data_cache', False) \ - or tgt_type in ('glob', 'pcre', 'list'): - if len(found.intersection(minions)) >= len(minions): - fail = sorted(list(minions.difference(found))) - for minion in fail: - yield({ - minion: { - 'out': 'no_return', - 'ret': 'Minion did not return' - } - }) - break - if int(time.time()) > start + timeout: - # The timeout has been reached, check the jid to see if the - # timeout needs to be increased - jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs) - more_time = False - for id_ in jinfo: - if jinfo[id_]: - if verbose: - print( - 'Execution is still running on {0}'.format(id_) - ) - more_time = True - if more_time: - timeout += inc_timeout - continue - else: - last_time = True - continue - time.sleep(0.01) + raise StopIteration() def get_iter_returns( self, @@ -845,16 +788,15 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) start = int(time.time()) timeout_at = start + timeout found = set() - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() # Wait for the hosts to check in syndic_wait = 0 last_time = False @@ -911,10 +853,6 @@ class LocalClient(object): jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()) continue break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if last_time: if len(found) < len(minions): log.info('jid %s minions %s did not return in time', @@ -949,9 +887,6 @@ class LocalClient(object): minions = set(minions) if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) start = int(time.time()) timeout_at = start + timeout log.debug("get_returns for jid %s sent to %s will timeout at %s", @@ -959,11 +894,11 @@ class LocalClient(object): found = set() ret = {} - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): - log.warning("jid_dir (%s) does not exist", jid_dir) + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: + log.warning("jid does not exist") return ret + # Wait for the hosts to check in while True: time_left = timeout_at - int(time.time()) @@ -982,10 +917,6 @@ class LocalClient(object): # All minions have returned, break out of the loop log.debug("jid %s found all minions", jid) break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if int(time.time()) > timeout_at: log.info('jid %s minions %s did not return in time', jid, (minions - found)) @@ -998,85 +929,69 @@ class LocalClient(object): This method starts off a watcher looking at the return data for a specified jid, it returns all of the information for the jid ''' - if timeout is None: - timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - start = 999999999999 - gstart = int(time.time()) + # TODO: change this from ret to return... or the other way. + # Its inconsistent, we should pick one + ret = {} - wtag = os.path.join(jid_dir, 'wtag*') - # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + # create the iterator-- since we want to get anyone in the middle + event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) + + data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + m_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data + + # if we have all the minion returns, lets just return + if len(set(ret.keys()).intersection(minions)) >= len(minions): return ret - # Wait for the hosts to check in - while True: - for fn_ in os.listdir(jid_dir): - if fn_.startswith('.'): - continue - if fn_ not in ret: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb')) - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb')) - except Exception: - pass - if ret and start == 999999999999: - start = int(time.time()) - if glob.glob(wtag) and int(time.time()) <= start + timeout + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic + + # otherwise lets use the listener we created above to get the rest + for event_ret in event_iter: + # if nothing in the event_ret, skip + if event_ret == {}: + time.sleep(0.02) continue + for minion, m_data in event_ret.iteritems(): + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data + + # are we done yet? if len(set(ret.keys()).intersection(minions)) >= len(minions): return ret - if int(time.time()) > start + timeout: - return ret - if int(time.time()) > gstart + timeout and not ret: - # No minions have replied within the specified global timeout, - # return an empty dict - return ret - time.sleep(0.02) + + # otherwise we hit the timeout, return what we have + return ret def get_cache_returns(self, jid): ''' Execute a single pass to gather the contents of the job cache ''' ret = {} - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - # If someone asks for the cache returns before we created them, we don't - # want to explode - try: - for fn_ in os.listdir(jid_dir): - if fn_.startswith('.'): - continue - if fn_ not in ret: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb')) - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb')) - except Exception: - pass - except IOError: - pass + + data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + m_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data return ret @@ -1103,16 +1018,14 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + start = int(time.time()) timeout_at = start + timeout found = set() ret = {} - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: + log.warning("jid does not exist") return ret # Wait for the hosts to check in while True: @@ -1138,10 +1051,6 @@ class LocalClient(object): if len(found.intersection(minions)) >= len(minions): # All minions have returned, break out of the loop break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if int(time.time()) > timeout_at: if verbose: if self.opts.get('minion_data_cache', False) \ @@ -1187,16 +1096,17 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + start = time.time() timeout_at = start + timeout found = set() - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() + # Wait for the hosts to check in syndic_wait = 0 last_time = False @@ -1245,10 +1155,6 @@ class LocalClient(object): timeout_at = time.time() + 1 continue break - if glob.glob(wtag) and time.time() <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if last_time: if verbose or show_timeout: if self.opts.get('minion_data_cache', False) \ @@ -1290,13 +1196,14 @@ class LocalClient(object): log.trace('entered - function get_event_iter_returns()') if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + found = set() # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() # Wait for the hosts to check in while True: raw = self.event.get_event(timeout) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index 40123c8fda..8524d1a1f0 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -236,6 +236,7 @@ class SSH(object): ), } self.serial = salt.payload.Serial(opts) + self.mminion = salt.minion.MasterMinion(self.opts) def verify_env(self): ''' @@ -422,52 +423,17 @@ class SSH(object): ''' Cache the job information ''' - jid_dir = salt.utils.jid_dir( - jid, - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {0}'.format(jid) - ) - return False - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - hn_dir = os.path.join(jid_dir, id_) - if not os.path.isdir(hn_dir): - os.makedirs(hn_dir) - # Otherwise the minion has already returned this jid and it should - # be dropped - else: - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - id_ - ) - ) - return False - - self.serial.dump( - ret, - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) + self.mminion.returners['{0}.returner'.format(self.opts['master_job_cache'])]({'jid': jid, + 'id': id_, + 'return': ret}) def run(self): ''' Execute the overall routine ''' - jid = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - self.opts['user']) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + jid = self.mminion.returners[fstr]() - jid_dir = salt.utils.jid_dir(jid, self.opts['cachedir'], self.opts['hash_type']) # Save the invocation information arg_str = self.opts['arg_str'] @@ -489,17 +455,9 @@ class SSH(object): 'fun': fun, 'arg': args, } - self.serial.dump( - job_load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the targets to a cache so we can see them in the UI - targets = self.targets.keys() - targets.sort() - self.serial.dump( - targets, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) + + # save load to the master job cache + self.mminion.returners['{0}.save_load'.format(self.opts['master_job_cache'])](jid, job_load) if self.opts.get('verbose'): msg = 'Executing job with jid {0}'.format(jid) diff --git a/salt/config.py b/salt/config.py index 6d4cc9ea5b..b7437c874d 100644 --- a/salt/config.py +++ b/salt/config.py @@ -187,7 +187,7 @@ VALID_OPTS = { 'order_masters': bool, 'job_cache': bool, 'ext_job_cache': str, - 'master_ext_job_cache': str, + 'master_job_cache': str, 'minion_data_cache': bool, 'publish_session': int, 'reactor': list, @@ -413,7 +413,7 @@ DEFAULT_MASTER_OPTS = { 'order_masters': False, 'job_cache': True, 'ext_job_cache': '', - 'master_ext_job_cache': '', + 'master_job_cache': 'local_cache', 'minion_data_cache': True, 'enforce_mine_cache': False, 'ipv6': False, diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index fd457d78dc..3c7c8eadc0 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -8,8 +8,6 @@ involves preparing the three listeners and the workers needed by the master. import os import re import logging -import shutil -import datetime try: import pwd except ImportError: @@ -100,43 +98,16 @@ def clean_old_jobs(opts): ''' Clean out the old jobs from the job cache ''' - if opts['keep_jobs'] != 0: - jid_root = os.path.join(opts['cachedir'], 'jobs') - cur = datetime.datetime.now() - - if os.path.exists(jid_root): - for top in os.listdir(jid_root): - t_path = os.path.join(jid_root, top) - for final in os.listdir(t_path): - f_path = os.path.join(t_path, final) - jid_file = os.path.join(f_path, 'jid') - if not os.path.isfile(jid_file): - # No jid file means corrupted cache entry, scrub it - shutil.rmtree(f_path) - else: - with salt.utils.fopen(jid_file, 'r') as fn_: - jid = fn_.read() - if len(jid) < 18: - # Invalid jid, scrub the dir - shutil.rmtree(f_path) - else: - # Parse the jid into a proper datetime object. - # We only parse down to the minute, since keep - # jobs is measured in hours, so a minute - # difference is not important. - try: - jidtime = datetime.datetime(int(jid[0:4]), - int(jid[4:6]), - int(jid[6:8]), - int(jid[8:10]), - int(jid[10:12])) - except ValueError as e: - # Invalid jid, scrub the dir - shutil.rmtree(f_path) - difference = cur - jidtime - hours_difference = difference.seconds / 3600.0 - if hours_difference > opts['keep_jobs']: - shutil.rmtree(f_path) + # TODO: better way to not require creating the masterminion every time? + mminion = salt.minion.MasterMinion( + opts, + states=False, + rend=False, + ) + # If the master job cache has a clean_old_jobs, call it + fstr = '{0}.clean_old_jobs'.format(opts['master_job_cache']) + if fstr in mminion.returners: + mminion.returners[fstr]() def access_keys(opts): @@ -505,64 +476,21 @@ class RemoteFuncs(object): return False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False)) + + # save the load, since we don't have it + saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[saveload_fstr](load['jid'], load) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) self.event.fire_ret_load(load) - if self.opts['master_ext_job_cache']: - fstr = '{0}.returner'.format(self.opts['master_ext_job_cache']) - self.mminion.returners[fstr](load) - return if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): return - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {jid}'.format(**load) - ) - return False - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - hn_dir = os.path.join(jid_dir, load['id']) - if not os.path.isdir(hn_dir): - os.makedirs(hn_dir) - # Otherwise the minion has already returned this jid and it should - # be dropped - else: - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - load['id'] - ) - ) - return False - self.serial.dump( - load['return'], - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) - if 'out' in load: - self.serial.dump( - load['out'], - # Use atomic open here to avoid the file being read before - # it's completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'out.p'), 'w+b' - ) - ) + fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load) def _syndic_return(self, load): ''' @@ -572,29 +500,10 @@ class RemoteFuncs(object): # Verify the load if any(key not in load for key in ('return', 'jid', 'id')): return None - # set the write flag - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - if 'load' in load: - with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: - self.serial.dump(load['load'], fp_) - wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id'])) - try: - with salt.utils.fopen(wtag, 'w+') as fp_: - fp_.write('') - except (IOError, OSError): - log.error( - 'Failed to commit the write tag for the syndic return, are ' - 'permissions correct in the cache dir: {0}?'.format( - self.opts['cachedir'] - ) - ) - return False + # if we have a load, save it + if 'load' in load: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load['load']) # Format individual return loads for key, item in load['return'].items(): @@ -604,8 +513,6 @@ class RemoteFuncs(object): if 'out' in load: ret['out'] = load['out'] self._return(ret) - if os.path.isfile(wtag): - os.remove(wtag) def minion_runner(self, load): ''' @@ -1326,17 +1233,9 @@ class LocalFuncs(object): } # Retrieve the jid if not load['jid']: - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, load['jid']) - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) new_job_load = { 'jid': load['jid'], @@ -1352,19 +1251,7 @@ class LocalFuncs(object): self.event.fire_event(new_job_load, 'new_job') # old dup event self.event.fire_event(new_job_load, tagify([load['jid'], 'new'], 'job')) - # Verify the jid dir - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) # Save the invocation information - self.serial.dump( - load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the minions to a cache so we can see in the UI - self.serial.dump( - minions, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) if self.opts['ext_job_cache']: try: fstr = '{0}.save_load'.format(self.opts['ext_job_cache']) @@ -1381,6 +1268,23 @@ class LocalFuncs(object): 'The specified returner threw a stack trace:\n', exc_info=True ) + + # always write out to the master job cache + try: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load) + except KeyError: + log.critical( + 'The specified returner used for the master job cache ' + '"{0}" does not have a save_load function!'.format( + self.opts['master_job_cache'] + ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) # Set up the payload payload = {'enc': 'aes'} # Altering the contents of the publish load is serious!! Changes here diff --git a/salt/master.py b/salt/master.py index 126b1b273b..5aa9efe6a3 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1244,78 +1244,30 @@ class AESFuncs(object): return False if not salt.utils.verify.valid_id(self.opts, load['id']): return False - new_loadp = False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid load['arg'] = load.get('arg', load.get('fun_args', [])) load['tgt_type'] = 'glob' load['tgt'] = load['id'] - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) - new_loadp = load.get('nocache', True) and True + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False)) + + # save the load, since we don't have it + saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[saveload_fstr](load['jid'], load) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event( load, tagify([load['jid'], 'ret', load['id']], 'job')) self.event.fire_ret_load(load) - if self.opts['master_ext_job_cache']: - fstr = '{0}.returner'.format(self.opts['master_ext_job_cache']) - self.mminion.returners[fstr](load) - return + + # if you have a job_cache, or an ext_job_cache, don't write to the regular master cache if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): return - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - if new_loadp: - with salt.utils.fopen( - os.path.join(jid_dir, '.load.p'), 'w+b' - ) as fp_: - self.serial.dump(load, fp_) - hn_dir = os.path.join(jid_dir, load['id']) - try: - os.mkdir(hn_dir) - except OSError as e: - if e.errno == errno.EEXIST: - # Minion has already returned this jid and it should be dropped - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - load['id'] - ) - ) - return False - elif e.errno == errno.ENOENT: - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {jid}'.format(**load) - ) - return False - raise - self.serial.dump( - load['return'], - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) - if 'out' in load: - self.serial.dump( - load['out'], - # Use atomic open here to avoid the file being read before - # it's completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'out.p'), 'w+b' - ) - ) + # otherwise, write to the master cache + fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load) def _syndic_return(self, load): ''' @@ -1325,31 +1277,10 @@ class AESFuncs(object): # Verify the load if any(key not in load for key in ('return', 'jid', 'id')): return None - if not salt.utils.verify.valid_id(self.opts, load['id']): - return False - # set the write flag - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - if 'load' in load: - with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: - self.serial.dump(load['load'], fp_) - wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id'])) - try: - with salt.utils.fopen(wtag, 'w+b') as fp_: - fp_.write('') - except (IOError, OSError): - log.error( - 'Failed to commit the write tag for the syndic return, are ' - 'permissions correct in the cache dir: {0}?'.format( - self.opts['cachedir'] - ) - ) - return False + # if we have a load, save it + if 'load' in load: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load) # Format individual return loads for key, item in load['return'].items(): @@ -1359,8 +1290,6 @@ class AESFuncs(object): if 'out' in load: ret['out'] = load['out'] self._return(ret) - if os.path.isfile(wtag): - os.remove(wtag) def minion_runner(self, clear_load): ''' @@ -2537,17 +2466,9 @@ class ClearFuncs(object): } # Retrieve the jid if not clear_load['jid']: - clear_load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + clear_load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, clear_load['jid']) - jid_dir = salt.utils.jid_dir( - clear_load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) new_job_load = { 'jid': clear_load['jid'], @@ -2563,19 +2484,6 @@ class ClearFuncs(object): self.event.fire_event(new_job_load, 'new_job') # old dup event self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job')) - # Verify the jid dir - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - # Save the invocation information - self.serial.dump( - clear_load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the minions to a cache so we can see in the UI - self.serial.dump( - minions, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) if self.opts['ext_job_cache']: try: fstr = '{0}.save_load'.format(self.opts['ext_job_cache']) @@ -2592,6 +2500,24 @@ class ClearFuncs(object): 'The specified returner threw a stack trace:\n', exc_info=True ) + + # always write out to the master job caches + try: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](clear_load['jid'], clear_load) + except KeyError: + log.critical( + 'The specified returner used for the master job cache ' + '"{0}" does not have a save_load function!'.format( + self.opts['master_job_cache'] + ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) + # Set up the payload payload = {'enc': 'aes'} # Altering the contents of the publish load is serious!! Changes here diff --git a/salt/minion.py b/salt/minion.py index 4586bc7970..5694f7f5b0 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1480,6 +1480,7 @@ class Syndic(Minion): self._syndic = True opts['loop_interval'] = 1 super(Syndic, self).__init__(opts) + self.mminion = salt.minion.MasterMinion(opts) def _handle_aes(self, load, sig=None): ''' @@ -1670,10 +1671,11 @@ class Syndic(Minion): if not jdict: jdict['__fun__'] = event['data'].get('fun') jdict['__jid__'] = event['data']['jid'] - jdict['__load__'] = salt.utils.jid_load( - event['data']['jid'], - self.local.opts['cachedir'], - self.opts['hash_type']) + jdict['__load__'] = {} + fstr = '{0}.get_jid'.format(self.opts['master_job_cache']) + jdict['__load__'].update( + self.mminion.returners[fstr](event['data']['jid']) + ) jdict[event['data']['id']] = event['data']['return'] else: # Add generic event aggregation here diff --git a/salt/output/__init__.py b/salt/output/__init__.py index 75964c9cee..776d652574 100644 --- a/salt/output/__init__.py +++ b/salt/output/__init__.py @@ -27,7 +27,7 @@ STATIC = ( ) -def display_output(data, out, opts=None): +def display_output(data, out=None, opts=None): ''' Print the passed data using the desired output ''' diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py new file mode 100644 index 0000000000..978b8524cd --- /dev/null +++ b/salt/returners/local_cache.py @@ -0,0 +1,297 @@ +# -*- coding: utf-8 -*- +''' +Return data to local job cache + +''' + +# Import python libs +import errno +import logging +import os +import shutil +import datetime +import hashlib + +# Import salt libs +import salt.payload +import salt.utils + +log = logging.getLogger(__name__) + +# load is the published job +LOAD_P = '.load.p' +# the list of minions that the job is targeted to (best effort match on the master side) +MINIONS_P = '.minions.p' +# return is the "return" from the minion data +RETURN_P = 'return.p' +# out is the "out" from the minion data +OUT_P = 'out.p' + + +def _job_dir(): + ''' + Return root of the jobs cache directory + ''' + return os.path.join(__opts__['cachedir'], + 'jobs') + + +def _jid_dir(jid): + ''' + Return the jid_dir for the given job id + ''' + jid = str(jid) + jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest() + return os.path.join(_job_dir(), + jhash[:2], + jhash[2:]) + + +def _walk_through(job_dir): + serial = salt.payload.Serial(__opts__) + + for top in os.listdir(job_dir): + t_path = os.path.join(job_dir, top) + + for final in os.listdir(t_path): + load_path = os.path.join(t_path, final, LOAD_P) + + if not os.path.isfile(load_path): + continue + + job = serial.load(salt.utils.fopen(load_path, 'rb')) + jid = job['jid'] + yield jid, job, t_path, final + + +def _format_job_instance(job): + return {'Function': job.get('fun', 'unknown-function'), + 'Arguments': list(job.get('arg', [])), + # unlikely but safeguard from invalid returns + 'Target': job.get('tgt', 'unknown-target'), + 'Target-type': job.get('tgt_type', []), + 'User': job.get('user', 'root')} + + +def _format_jid_instance(jid, job): + ret = _format_job_instance(job) + ret.update({'StartTime': salt.utils.jid_to_time(jid)}) + return ret + + +#TODO: add to returner docs-- this is a new one +def prep_jid(nocache=False): + ''' + Return a job id and prepare the job id directory + This is the function responsible for making sure jids don't collide (unless its passed a jid) + So do what you have to do to make sure that stays the case + ''' + jid = salt.utils.gen_jid() + + jid_dir_ = _jid_dir(jid) + + # make sure we create the jid dir, otherwise someone else is using it, + # meaning we need a new jid + try: + os.makedirs(jid_dir_) + except OSError: + # TODO: some sort of sleep or something? Spinning is generally bad practice + return prep_jid(nocache=nocache) + + with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_: + fn_.write(jid) + if nocache: + with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'w+') as fn_: + fn_.write('') + + return jid + + +def returner(load): + ''' + Return data to the local job cache + ''' + serial = salt.payload.Serial(__opts__) + jid_dir = _jid_dir(load['jid']) + if os.path.exists(os.path.join(jid_dir, 'nocache')): + return + + # do we need to rewrite the load? + if load['jid'] == 'req' and bool(load.get('nocache', True)): + with salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b') as fp_: + serial.dump(load, fp_) + + hn_dir = os.path.join(jid_dir, load['id']) + + try: + os.mkdir(hn_dir) + except OSError as e: + if e.errno == errno.EEXIST: + # Minion has already returned this jid and it should be dropped + log.error( + 'An extra return was detected from minion {0}, please verify ' + 'the minion, this could be a replay attack'.format( + load['id'] + ) + ) + return False + elif e.errno == errno.ENOENT: + log.error( + 'An inconsistency occurred, a job was received with a job id ' + 'that is not present in the local cache: {jid}'.format(**load) + ) + return False + raise + + serial.dump( + load['return'], + # Use atomic open here to avoid the file being read before it's + # completely written to. Refs #1935 + salt.utils.atomicfile.atomic_open( + os.path.join(hn_dir, RETURN_P), 'w+b' + ) + ) + + if 'out' in load: + serial.dump( + load['out'], + # Use atomic open here to avoid the file being read before + # it's completely written to. Refs #1935 + salt.utils.atomicfile.atomic_open( + os.path.join(hn_dir, OUT_P), 'w+b' + ) + ) + + +def save_load(jid, clear_load): + ''' + Save the load to the specified jid + ''' + jid_dir = _jid_dir(clear_load['jid']) + + serial = salt.payload.Serial(__opts__) + + # if you have a tgt, save that for the UI etc + if 'tgt' in clear_load: + ckminions = salt.utils.minions.CkMinions(__opts__) + # Retrieve the minions list + minions = ckminions.check_minions( + clear_load['tgt'], + clear_load.get('tgt_type', 'glob') + ) + # save the minions to a cache so we can see in the UI + serial.dump( + minions, + salt.utils.fopen(os.path.join(jid_dir, MINIONS_P), 'w+b') + ) + + # Save the invocation information + serial.dump( + clear_load, + salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b') + ) + + +def get_load(jid): + ''' + Return the load data that marks a specified jid + ''' + jid_dir = _jid_dir(jid) + if not os.path.exists(jid_dir): + return {} + serial = salt.payload.Serial(__opts__) + + ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'rb')) + + minions_path = os.path.join(jid_dir, MINIONS_P) + if os.path.isfile(minions_path): + ret['Minions'] = serial.load(salt.utils.fopen(minions_path, 'rb')) + + return ret + + +def get_jid(jid): + ''' + Return the information returned when the specified job id was executed + ''' + jid_dir = _jid_dir(jid) + serial = salt.payload.Serial(__opts__) + + ret = {} + # Check to see if the jid is real, if not return the empty dict + if not os.path.isdir(jid_dir): + return ret + for fn_ in os.listdir(jid_dir): + if fn_.startswith('.'): + continue + if fn_ not in ret: + retp = os.path.join(jid_dir, fn_, RETURN_P) + outp = os.path.join(jid_dir, fn_, OUT_P) + if not os.path.isfile(retp): + continue + while fn_ not in ret: + try: + ret_data = serial.load( + salt.utils.fopen(retp, 'rb')) + ret[fn_] = {'return': ret_data} + if os.path.isfile(outp): + ret[fn_]['out'] = serial.load( + salt.utils.fopen(outp, 'rb')) + except Exception: + pass + return ret + + +def get_jids(): + ''' + Return a list of all job ids + ''' + ret = {} + for jid, job, t_path, final in _walk_through(_job_dir()): + ret[jid] = _format_jid_instance(jid, job) + return ret + + +def clean_old_jobs(): + ''' + Clean out the old jobs from the job cache + ''' + if __opts__['keep_jobs'] != 0: + cur = datetime.datetime.now() + + jid_root = _job_dir() + if not os.path.exists(jid_root): + return + + for top in os.listdir(jid_root): + t_path = os.path.join(jid_root, top) + for final in os.listdir(t_path): + f_path = os.path.join(t_path, final) + jid_file = os.path.join(f_path, 'jid') + if not os.path.isfile(jid_file): + # No jid file means corrupted cache entry, scrub it + shutil.rmtree(f_path) + else: + with salt.utils.fopen(jid_file, 'r') as fn_: + jid = fn_.read() + if len(jid) < 18: + # Invalid jid, scrub the dir + shutil.rmtree(f_path) + else: + # Parse the jid into a proper datetime object. + # We only parse down to the minute, since keep + # jobs is measured in hours, so a minute + # difference is not important. + try: + jidtime = datetime.datetime(int(jid[0:4]), + int(jid[4:6]), + int(jid[6:8]), + int(jid[8:10]), + int(jid[10:12])) + except ValueError as e: + # Invalid jid, scrub the dir + shutil.rmtree(f_path) + difference = cur - jidtime + hours_difference = difference.seconds / 3600.0 + if hours_difference > __opts__['keep_jobs']: + shutil.rmtree(f_path) diff --git a/salt/returners/multi_returner.py b/salt/returners/multi_returner.py new file mode 100644 index 0000000000..87434827e3 --- /dev/null +++ b/salt/returners/multi_returner.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +''' +Read/Write multiple returners + +''' + +# Import python libs +import logging + +# Import salt libs +import salt.minion + +log = logging.getLogger(__name__) + +CONFIG_KEY = 'multi_returner' + +# cache of the master mininon for this returner +MMINION = None + + +def _mminion(): + ''' + Create a single mminion for this module to use, instead of reloading all the time + ''' + global MMINION + + if MMINION is None: + MMINION = salt.minion.MasterMinion(__opts__) + + return MMINION + + +def prep_jid(nocache=False): + ''' + Call both with prep_jid on all returners in multi_returner + + TODO: finish this, what do do when you get different jids from 2 returners... + since our jids are time based, this make this problem hard, beacuse they + aren't unique, meaning that we have to make sure that no one else got the jid + and if they did we spin to get a new one, which means "locking" the jid in 2 + returners is non-trivial + ''' + + jid = None + for returner in __opts__[CONFIG_KEY]: + if jid is None: + jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache) + else: + r_jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache) + if r_jid != jid: + print 'Uhh.... crud the jids do not match' + return jid + + +def returner(load): + ''' + Write return to all returners in multi_returner + ''' + for returner in __opts__[CONFIG_KEY]: + _mminion().returners['{0}.returner'.format(returner)](load) + + +def save_load(jid, clear_load): + ''' + Write load to all returners in multi_returner + ''' + for returner in __opts__[CONFIG_KEY]: + _mminion().returners['{0}.save_load'.format(returner)](jid, clear_load) + + +def get_load(jid): + ''' + Merge the load data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_load'.format(returner)](jid)) + + return ret + + +def get_jid(jid): + ''' + Merge the return data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_jid'.format(returner)](jid)) + + return ret + + +def get_jids(): + ''' + Return all job data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_jids'.format(returner)]()) + + return ret + + +def clean_old_jobs(): + ''' + Clean out the old jobs from all returners (if you have it) + ''' + for returner in __opts__[CONFIG_KEY]: + fstr = '{0}.clean_old_jobs'.format(returner) + if fstr in _mminion().returners: + _mminion().returners[fstr]() diff --git a/salt/runners/jobs.py b/salt/runners/jobs.py index c5ce2a373d..67517bdc08 100644 --- a/salt/runners/jobs.py +++ b/salt/runners/jobs.py @@ -37,19 +37,16 @@ def active(): ret[job['jid']].update({'Running': [{minion: job.get('pid', None)}], 'Returned': []}) else: ret[job['jid']]['Running'].append({minion: job['pid']}) + + mminion = salt.minion.MasterMinion(__opts__) for jid in ret: - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type']) - if not os.path.isdir(jid_dir): - continue - for minion in os.listdir(jid_dir): - if minion.startswith('.') or minion == 'jid': - continue - if os.path.exists(os.path.join(jid_dir, minion)): + returner = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_cache'])) + data = mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + if minion not in ret[jid]['Returned']: ret[jid]['Returned'].append(minion) - salt.output.display_output(ret, 'yaml', __opts__) + + salt.output.display_output(ret, opts=__opts__) return ret @@ -64,32 +61,18 @@ def lookup_jid(jid, ext_source=None, output=True): salt-run jobs.lookup_jid 20130916125524463507 ''' ret = {} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: - out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) - data = mminion.returners['{0}.get_jid'.format(returner)](jid) - for minion in data: - if u'return' in data[minion]: - ret[minion] = data[minion].get(u'return') - else: - ret[minion] = data[minion].get('return') - if 'out' in data[minion]: - out = data[minion]['out'] - salt.output.display_output(ret, out, __opts__) - return ret - - # Fall back to the local job cache - client = salt.client.get_local_client(__opts__['conf_file']) - - for mid, data in client.get_full_returns(jid, [], 0).items(): - ret[mid] = data.get('ret') - if output: - salt.output.display_output( - {mid: ret[mid]}, - data.get('out', None), - __opts__) + mminion = salt.minion.MasterMinion(__opts__) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) + data = mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + if u'return' in data[minion]: + ret[minion] = data[minion].get(u'return') + else: + ret[minion] = data[minion].get('return') + if 'out' in data[minion]: + out = data[minion]['out'] + salt.output.display_output(ret, opts=__opts__) return ret @@ -104,43 +87,13 @@ def list_job(jid, ext_source=None): salt-run jobs.list_job 20130916125524463507 ''' ret = {'jid': jid} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: - out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) - job = mminion.returners['{0}.get_load'.format(returner)](jid) - ret.update(_format_jid_instance(jid, job)) - ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) - return ret + mminion = salt.minion.MasterMinion(__opts__) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type'] - ) - - if not os.path.exists(jid_dir): - return ret - - # we have to copy/paste this code, because we don't seem to have a good API - serial = salt.payload.Serial(__opts__) - - # get the load info - load_path = os.path.join(jid_dir, '.load.p') - job = serial.load(salt.utils.fopen(load_path, 'rb')) + job = mminion.returners['{0}.get_load'.format(returner)](jid) ret.update(_format_jid_instance(jid, job)) - - # get the hosts information using the localclient (instead of re-implementing the code...) - client = salt.client.get_local_client(__opts__['conf_file']) - - ret['Result'] = {} - minions_path = os.path.join(jid_dir, '.minions.p') - if os.path.isfile(minions_path): - minions = serial.load(salt.utils.fopen(minions_path, 'rb')) - ret['Minions'] = minions - - salt.output.display_output(ret, 'yaml', __opts__) + ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) + salt.output.display_output(ret, opts=__opts__) return ret @@ -154,19 +107,12 @@ def list_jobs(ext_source=None): salt-run jobs.list_jobs ''' - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: - out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) - ret = mminion.returners['{0}.get_jids'.format(returner)]() - salt.output.display_output(ret, out, __opts__) - return ret + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) + mminion = salt.minion.MasterMinion(__opts__) + + ret = mminion.returners['{0}.get_jids'.format(returner)]() + salt.output.display_output(ret, opts=__opts__) - ret = {} - job_dir = os.path.join(__opts__['cachedir'], 'jobs') - for jid, job, t_path, final in _walk_through(job_dir): - ret[jid] = _format_jid_instance(jid, job) - salt.output.display_output(ret, 'yaml', __opts__) return ret @@ -182,61 +128,37 @@ def print_job(jid, ext_source=None): ''' ret = {} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: - out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) - job = mminion.returners['{0}.get_load'.format(returner)](jid) - ret[jid] = _format_jid_instance(jid, job) - ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) - return ret + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) + mminion = salt.minion.MasterMinion(__opts__) - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type'] - ) - - if not os.path.exists(jid_dir): - return ret - - # we have to copy/paste this code, because we don't seem to have a good API - serial = salt.payload.Serial(__opts__) - - # get the load info - load_path = os.path.join(jid_dir, '.load.p') - job = serial.load(salt.utils.fopen(load_path, 'rb')) + job = mminion.returners['{0}.get_load'.format(returner)](jid) ret[jid] = _format_jid_instance(jid, job) + ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) + salt.output.display_output(ret, opts=__opts__) - # get the hosts information using the localclient (instead of re-implementing the code...) - client = salt.client.get_local_client(__opts__['conf_file']) - - ret[jid]['Result'] = {} - for mid, data in client.get_full_returns(jid, [], 0).items(): - # have to make it say return so that it matches everyone else... - minion_data = {'return': data.get('ret')} - ret[jid]['Result'].update({mid: minion_data}) - salt.output.display_output(ret, 'yaml', __opts__) return ret -def _get_returner(returners): +def _get_returner(returner_types): ''' - Helper to iterate over retuerners and pick the first one + Helper to iterate over retuerner_types and pick the first one ''' - for ret in returners: - if ret: - return ret + for returner in returner_types: + if returner: + return returner def _format_job_instance(job): - return {'Function': job.get('fun', 'unknown-function'), - 'Arguments': list(job.get('arg', [])), - # unlikely but safeguard from invalid returns - 'Target': job.get('tgt', 'unknown-target'), - 'Target-type': job.get('tgt_type', []), - 'User': job.get('user', 'root')} + ret = {'Function': job.get('fun', 'unknown-function'), + 'Arguments': list(job.get('arg', [])), + # unlikely but safeguard from invalid returns + 'Target': job.get('tgt', 'unknown-target'), + 'Target-type': job.get('tgt_type', []), + 'User': job.get('user', 'root')} + + if 'Minions' in job: + ret['Minions'] = job['Minions'] + return ret def _format_jid_instance(jid, job): diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index 717ea8b401..743c190f1a 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -573,6 +573,12 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False): ''' Return a job id and prepare the job id directory ''' + salt.utils.warn_until( + 'Boron', + 'All job_cache management has been moved into the local_cache ' + 'returner, this util function will be removed-- please use ' + 'the returner' + ) jid = gen_jid() jid_dir_ = jid_dir(jid, cachedir, sum_type) @@ -596,6 +602,12 @@ def jid_dir(jid, cachedir, sum_type): ''' Return the jid_dir for the given job id ''' + salt.utils.warn_until( + 'Boron', + 'All job_cache management has been moved into the local_cache ' + 'returner, this util function will be removed-- please use ' + 'the returner' + ) jid = str(jid) jhash = getattr(hashlib, sum_type)(jid).hexdigest() return os.path.join(cachedir, 'jobs', jhash[:2], jhash[2:]) @@ -605,6 +617,11 @@ def jid_load(jid, cachedir, sum_type, serial='msgpack'): ''' Return the load data for a given job id ''' + salt.utils.warn_until( + 'Boron', + 'Getting the load has been moved into the returner interface ' + 'please get the data from the master_job_cache ' + ) _dir = jid_dir(jid, cachedir, sum_type) load_fn = os.path.join(_dir, '.load.p') if not os.path.isfile(load_fn): diff --git a/tests/integration/runners/jobs.py b/tests/integration/runners/jobs.py index 4aacb9bbe1..2356dda045 100644 --- a/tests/integration/runners/jobs.py +++ b/tests/integration/runners/jobs.py @@ -22,7 +22,7 @@ class ManageTest(integration.ShellCase): ''' ret = self.run_run_plus('jobs.active') self.assertEqual(ret['fun'], {}) - self.assertEqual(ret['out'], ['{}']) + self.assertEqual(ret['out'], []) def test_lookup_jid(self): ''' diff --git a/tests/integration/shell/call.py b/tests/integration/shell/call.py index bda7d9e2dc..65dc54d76b 100644 --- a/tests/integration/shell/call.py +++ b/tests/integration/shell/call.py @@ -116,9 +116,9 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn): if 'returnTOmaster' in j][0] jid, idx = None, first_match[0] while idx > 0: - jid = re.match("('|\")([0-9]+)('|\"):", jobs[idx]) + jid = re.match("([0-9]+):", jobs[idx]) if jid: - jid = jid.group(2) + jid = jid.group(1) break idx -= 1 assert idx > 0