From 3ed1dc035c85010bff5f40a07549575d44c65199 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 14:43:27 -0700 Subject: [PATCH 01/20] Major rework of the master job cache. This moves the local cache on the master off into a returner (local_cache) and changes everyone to use that interface. --- salt/client/__init__.py | 291 ++++++++++++---------------------- salt/client/ssh/__init__.py | 54 +------ salt/config.py | 4 +- salt/daemons/masterapi.py | 118 ++++---------- salt/master.py | 129 ++++----------- salt/minion.py | 8 +- salt/returners/local_cache.py | 218 +++++++++++++++++++++++++ salt/runners/jobs.py | 149 +++++------------ salt/utils/__init__.py | 5 + salt/utils/returners.py | 17 ++ 10 files changed, 451 insertions(+), 542 deletions(-) create mode 100644 salt/returners/local_cache.py create mode 100644 salt/utils/returners.py diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 66891b2002..3a100f76af 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -35,6 +35,7 @@ import salt.utils import salt.utils.args import salt.utils.event import salt.utils.minions +import salt.utils.returners import salt.utils.verify import salt.syspaths as syspaths from salt.exceptions import ( @@ -117,6 +118,8 @@ class LocalClient(object): self.opts['transport'], listen=not self.opts.get('__worker', False)) + self.mminion = salt.minion.MasterMinion(self.opts) + def __read_master_key(self): ''' Read in the rotating master authentication key @@ -740,89 +743,31 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] fret = {} - inc_timeout = timeout - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - start = int(time.time()) + # make sure the minions is a set (since we do set operations on it) + minions = set(minions) + found = set() - wtag = os.path.join(jid_dir, 'wtag*') - # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): - yield {} - last_time = False - # Wait for the hosts to check in - while True: - for fn_ in os.listdir(jid_dir): - ret = {} - if fn_.startswith('.'): - continue - if fn_ not in found: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - check = True - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb') - ) - if ret_data is None: - # Sometimes the ret data is read at the wrong - # time and returns None, do a quick re-read - if check: - continue - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb') - ) - except Exception: - pass - found.add(fn_) - fret.update(ret) - yield ret - if glob.glob(wtag) and int(time.time()) <= start + timeout + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue + # start this before the cache lookup-- in case new stuff comes in + event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) + + # get the info from the cache + ret = self.get_cache_returns(jid) + if ret != {}: + found.update(set(ret.keys())) + yield ret + + # if you have all the returns, stop + if len(found.intersection(minions)) >= len(minions): + raise StopIteration() + + # otherwise, get them from the event system + for event in event_iter: + if event != {}: + found.update(set(event.keys())) + yield event if len(found.intersection(minions)) >= len(minions): - # All minions have returned, break out of the loop - break - if last_time: - if verbose: - if self.opts.get('minion_data_cache', False) \ - or tgt_type in ('glob', 'pcre', 'list'): - if len(found.intersection(minions)) >= len(minions): - fail = sorted(list(minions.difference(found))) - for minion in fail: - yield({ - minion: { - 'out': 'no_return', - 'ret': 'Minion did not return' - } - }) - break - if int(time.time()) > start + timeout: - # The timeout has been reached, check the jid to see if the - # timeout needs to be increased - jinfo = self.gather_job_info(jid, tgt, tgt_type, minions - found, **kwargs) - more_time = False - for id_ in jinfo: - if jinfo[id_]: - if verbose: - print( - 'Execution is still running on {0}'.format(id_) - ) - more_time = True - if more_time: - timeout += inc_timeout - continue - else: - last_time = True - continue - time.sleep(0.01) + raise StopIteration() + def get_iter_returns( self, @@ -845,16 +790,15 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) start = int(time.time()) timeout_at = start + timeout found = set() - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() # Wait for the hosts to check in syndic_wait = 0 last_time = False @@ -911,10 +855,6 @@ class LocalClient(object): jid, syndic_wait, datetime.fromtimestamp(timeout_at).time()) continue break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if last_time: if len(found) < len(minions): log.info('jid %s minions %s did not return in time', @@ -949,9 +889,6 @@ class LocalClient(object): minions = set(minions) if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) start = int(time.time()) timeout_at = start + timeout log.debug("get_returns for jid %s sent to %s will timeout at %s", @@ -959,11 +896,11 @@ class LocalClient(object): found = set() ret = {} - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): - log.warning("jid_dir (%s) does not exist", jid_dir) + if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + log.warning("jid does not exist") return ret + # Wait for the hosts to check in while True: time_left = timeout_at - int(time.time()) @@ -982,10 +919,6 @@ class LocalClient(object): # All minions have returned, break out of the loop log.debug("jid %s found all minions", jid) break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if int(time.time()) > timeout_at: log.info('jid %s minions %s did not return in time', jid, (minions - found)) @@ -998,85 +931,71 @@ class LocalClient(object): This method starts off a watcher looking at the return data for a specified jid, it returns all of the information for the jid ''' - if timeout is None: - timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - start = 999999999999 - gstart = int(time.time()) + # TODO: change this from ret to return... or the other way. + # Its inconsistent, we should pick one + ret = {} - wtag = os.path.join(jid_dir, 'wtag*') - # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + # create the iterator-- since we want to get anyone in the middle + event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) + + for returner in self.opts['master_job_caches']: + data = self.mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + md_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data + + # if we have all the minion returns, lets just return + if len(set(ret.keys()).intersection(minions)) >= len(minions): return ret - # Wait for the hosts to check in - while True: - for fn_ in os.listdir(jid_dir): - if fn_.startswith('.'): - continue - if fn_ not in ret: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb')) - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb')) - except Exception: - pass - if ret and start == 999999999999: - start = int(time.time()) - if glob.glob(wtag) and int(time.time()) <= start + timeout + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic + + # otherwise lets use the listener we created above to get the rest + for event_ret in event_iter: + # if nothing in the event_ret, skip + if event_ret == {}: + time.sleep(0.02) continue + for minion, m_data in event_ret.iteritems(): + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data + + # are we done yet? if len(set(ret.keys()).intersection(minions)) >= len(minions): return ret - if int(time.time()) > start + timeout: - return ret - if int(time.time()) > gstart + timeout and not ret: - # No minions have replied within the specified global timeout, - # return an empty dict - return ret - time.sleep(0.02) + + # otherwise we hit the timeout, return what we have + return ret def get_cache_returns(self, jid): ''' Execute a single pass to gather the contents of the job cache ''' ret = {} - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) - # If someone asks for the cache returns before we created them, we don't - # want to explode - try: - for fn_ in os.listdir(jid_dir): - if fn_.startswith('.'): - continue - if fn_ not in ret: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') - if not os.path.isfile(retp): - continue - while fn_ not in ret: - try: - ret_data = self.serial.load( - salt.utils.fopen(retp, 'rb')) - ret[fn_] = {'ret': ret_data} - if os.path.isfile(outp): - ret[fn_]['out'] = self.serial.load( - salt.utils.fopen(outp, 'rb')) - except Exception: - pass - except IOError: - pass + + for returner in self.opts['master_job_caches']: + data = self.mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + md_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data return ret @@ -1103,16 +1022,14 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + start = int(time.time()) timeout_at = start + timeout found = set() ret = {} - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + log.warning("jid does not exist") return ret # Wait for the hosts to check in while True: @@ -1138,10 +1055,6 @@ class LocalClient(object): if len(found.intersection(minions)) >= len(minions): # All minions have returned, break out of the loop break - if glob.glob(wtag) and int(time.time()) <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if int(time.time()) > timeout_at: if verbose: if self.opts.get('minion_data_cache', False) \ @@ -1187,16 +1100,17 @@ class LocalClient(object): if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + start = time.time() timeout_at = start + timeout found = set() - wtag = os.path.join(jid_dir, 'wtag*') # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() + # Wait for the hosts to check in syndic_wait = 0 last_time = False @@ -1245,10 +1159,6 @@ class LocalClient(object): timeout_at = time.time() + 1 continue break - if glob.glob(wtag) and time.time() <= timeout_at + 1: - # The timeout +1 has not been reached and there is still a - # write tag for the syndic - continue if last_time: if verbose or show_timeout: if self.opts.get('minion_data_cache', False) \ @@ -1290,13 +1200,14 @@ class LocalClient(object): log.trace('entered - function get_event_iter_returns()') if timeout is None: timeout = self.opts['timeout'] - jid_dir = salt.utils.jid_dir(jid, - self.opts['cachedir'], - self.opts['hash_type']) + found = set() # Check to see if the jid is real, if not return the empty dict - if not os.path.isdir(jid_dir): + if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + log.warning("jid does not exist") yield {} + # stop the iteration, since the jid is invalid + raise StopIteration() # Wait for the hosts to check in while True: raw = self.event.get_event(timeout) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index 24ea03fac4..43d0a5b3e9 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -237,6 +237,7 @@ class SSH(object): ), } self.serial = salt.payload.Serial(opts) + self.mminion = salt.minion.MasterMinion(self.opts) def verify_env(self): ''' @@ -423,41 +424,8 @@ class SSH(object): ''' Cache the job information ''' - jid_dir = salt.utils.jid_dir( - jid, - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {0}'.format(jid) - ) - return False - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - hn_dir = os.path.join(jid_dir, id_) - if not os.path.isdir(hn_dir): - os.makedirs(hn_dir) - # Otherwise the minion has already returned this jid and it should - # be dropped - else: - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - id_ - ) - ) - return False - - self.serial.dump( - ret, - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) + for returner in self.opts['master_job_caches']: + self.mminion.returners['{0}.returner'.format(returner)]({'jid': jid, 'id': id_, 'return': ret}) def run(self): ''' @@ -468,7 +436,6 @@ class SSH(object): self.opts['hash_type'], self.opts['user']) - jid_dir = salt.utils.jid_dir(jid, self.opts['cachedir'], self.opts['hash_type']) # Save the invocation information arg_str = self.opts['arg_str'] @@ -490,17 +457,10 @@ class SSH(object): 'fun': fun, 'arg': args, } - self.serial.dump( - job_load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the targets to a cache so we can see them in the UI - targets = self.targets.keys() - targets.sort() - self.serial.dump( - targets, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) + + for returner in self.opts['master_job_caches']: + self.mminion.returners['{0}.save_load'.format(returner)](jid, job_load) + if self.opts.get('verbose'): msg = 'Executing job with jid {0}'.format(jid) diff --git a/salt/config.py b/salt/config.py index 7ae45f6f51..5289d8ed7b 100644 --- a/salt/config.py +++ b/salt/config.py @@ -187,7 +187,7 @@ VALID_OPTS = { 'order_masters': bool, 'job_cache': bool, 'ext_job_cache': str, - 'master_ext_job_cache': str, + 'master_job_caches': list, 'minion_data_cache': bool, 'publish_session': int, 'reactor': list, @@ -413,7 +413,7 @@ DEFAULT_MASTER_OPTS = { 'order_masters': False, 'job_cache': True, 'ext_job_cache': '', - 'master_ext_job_cache': '', + 'master_job_caches': ['local_cache'], 'minion_data_cache': True, 'enforce_mine_cache': False, 'ipv6': False, diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index fd457d78dc..bf6b035110 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -513,56 +513,13 @@ class RemoteFuncs(object): self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) self.event.fire_ret_load(load) - if self.opts['master_ext_job_cache']: - fstr = '{0}.returner'.format(self.opts['master_ext_job_cache']) - self.mminion.returners[fstr](load) - return if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): return - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {jid}'.format(**load) - ) - return False - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - hn_dir = os.path.join(jid_dir, load['id']) - if not os.path.isdir(hn_dir): - os.makedirs(hn_dir) - # Otherwise the minion has already returned this jid and it should - # be dropped - else: - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - load['id'] - ) - ) - return False + for returner in self.opts['master_job_caches']: + fstr = '{0}.returner'.format(returner) + self.mminion.returners[fstr](load) + return - self.serial.dump( - load['return'], - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) - if 'out' in load: - self.serial.dump( - load['out'], - # Use atomic open here to avoid the file being read before - # it's completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'out.p'), 'w+b' - ) - ) def _syndic_return(self, load): ''' @@ -572,29 +529,11 @@ class RemoteFuncs(object): # Verify the load if any(key not in load for key in ('return', 'jid', 'id')): return None - # set the write flag - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - if 'load' in load: - with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: - self.serial.dump(load['load'], fp_) - wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id'])) - try: - with salt.utils.fopen(wtag, 'w+') as fp_: - fp_.write('') - except (IOError, OSError): - log.error( - 'Failed to commit the write tag for the syndic return, are ' - 'permissions correct in the cache dir: {0}?'.format( - self.opts['cachedir'] - ) - ) - return False + # if we have a load, save it + if 'load' in load: + for returner in self.opts['master_job_caches']: + fstr = '{0}.save_load'.format(returner) + self.mminion.returners[fstr](load['jid'], load['load']) # Format individual return loads for key, item in load['return'].items(): @@ -604,8 +543,6 @@ class RemoteFuncs(object): if 'out' in load: ret['out'] = load['out'] self._return(ret) - if os.path.isfile(wtag): - os.remove(wtag) def minion_runner(self, load): ''' @@ -1332,11 +1269,6 @@ class LocalFuncs(object): extra.get('nocache', False) ) self.event.fire_event({'minions': minions}, load['jid']) - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) new_job_load = { 'jid': load['jid'], @@ -1352,23 +1284,11 @@ class LocalFuncs(object): self.event.fire_event(new_job_load, 'new_job') # old dup event self.event.fire_event(new_job_load, tagify([load['jid'], 'new'], 'job')) - # Verify the jid dir - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) # Save the invocation information - self.serial.dump( - load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the minions to a cache so we can see in the UI - self.serial.dump( - minions, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) if self.opts['ext_job_cache']: try: fstr = '{0}.save_load'.format(self.opts['ext_job_cache']) - self.mminion.returners[fstr](load['jid'], load) + self.mminion.returners[fstr](clear_load['jid'], clear_load) except KeyError: log.critical( 'The specified returner used for the external job cache ' @@ -1381,6 +1301,24 @@ class LocalFuncs(object): 'The specified returner threw a stack trace:\n', exc_info=True ) + + # always write out to the master job caches + for returner in self.opts['master_job_caches']: + try: + fstr = '{0}.save_load'.format(returner) + self.mminion.returners[fstr](clear_load['jid'], clear_load) + except KeyError: + log.critical( + 'The specified returner used for the external job cache ' + '"{0}" does not have a save_load function!'.format( + returner + ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) # Set up the payload payload = {'enc': 'aes'} # Altering the contents of the publish load is serious!! Changes here diff --git a/salt/master.py b/salt/master.py index 1377486d90..138dde4f62 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1259,62 +1259,16 @@ class AESFuncs(object): self.event.fire_event( load, tagify([load['jid'], 'ret', load['id']], 'job')) self.event.fire_ret_load(load) - if self.opts['master_ext_job_cache']: - fstr = '{0}.returner'.format(self.opts['master_ext_job_cache']) - self.mminion.returners[fstr](load) - return + + # if you have a job_cache, or an ext_job_cache, don't write to the regular master cache if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): return - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if os.path.exists(os.path.join(jid_dir, 'nocache')): - return - if new_loadp: - with salt.utils.fopen( - os.path.join(jid_dir, '.load.p'), 'w+b' - ) as fp_: - self.serial.dump(load, fp_) - hn_dir = os.path.join(jid_dir, load['id']) - try: - os.mkdir(hn_dir) - except OSError as e: - if e.errno == errno.EEXIST: - # Minion has already returned this jid and it should be dropped - log.error( - 'An extra return was detected from minion {0}, please verify ' - 'the minion, this could be a replay attack'.format( - load['id'] - ) - ) - return False - elif e.errno == errno.ENOENT: - log.error( - 'An inconsistency occurred, a job was received with a job id ' - 'that is not present on the master: {jid}'.format(**load) - ) - return False - raise - self.serial.dump( - load['return'], - # Use atomic open here to avoid the file being read before it's - # completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' - ) - ) - if 'out' in load: - self.serial.dump( - load['out'], - # Use atomic open here to avoid the file being read before - # it's completely written to. Refs #1935 - salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'out.p'), 'w+b' - ) - ) + # otherwise, write to the master cache + for returner in self.opts['master_job_caches']: + fstr = '{0}.returner'.format(returner) + self.mminion.returners[fstr](load) + def _syndic_return(self, load): ''' @@ -1324,31 +1278,11 @@ class AESFuncs(object): # Verify the load if any(key not in load for key in ('return', 'jid', 'id')): return None - if not salt.utils.verify.valid_id(self.opts, load['id']): - return False - # set the write flag - jid_dir = salt.utils.jid_dir( - load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - if 'load' in load: - with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: - self.serial.dump(load['load'], fp_) - wtag = os.path.join(jid_dir, 'wtag_{0}'.format(load['id'])) - try: - with salt.utils.fopen(wtag, 'w+b') as fp_: - fp_.write('') - except (IOError, OSError): - log.error( - 'Failed to commit the write tag for the syndic return, are ' - 'permissions correct in the cache dir: {0}?'.format( - self.opts['cachedir'] - ) - ) - return False + # if we have a load, save it + if 'load' in load: + for returner in self.opts['master_job_caches']: + fstr = '{0}.save_load'.format(returner) + self.mminion.returners[fstr](load['jid'], load) # Format individual return loads for key, item in load['return'].items(): @@ -1358,8 +1292,6 @@ class AESFuncs(object): if 'out' in load: ret['out'] = load['out'] self._return(ret) - if os.path.isfile(wtag): - os.remove(wtag) def minion_runner(self, clear_load): ''' @@ -2542,11 +2474,6 @@ class ClearFuncs(object): extra.get('nocache', False) ) self.event.fire_event({'minions': minions}, clear_load['jid']) - jid_dir = salt.utils.jid_dir( - clear_load['jid'], - self.opts['cachedir'], - self.opts['hash_type'] - ) new_job_load = { 'jid': clear_load['jid'], @@ -2562,19 +2489,6 @@ class ClearFuncs(object): self.event.fire_event(new_job_load, 'new_job') # old dup event self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job')) - # Verify the jid dir - if not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - # Save the invocation information - self.serial.dump( - clear_load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') - ) - # save the minions to a cache so we can see in the UI - self.serial.dump( - minions, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') - ) if self.opts['ext_job_cache']: try: fstr = '{0}.save_load'.format(self.opts['ext_job_cache']) @@ -2591,6 +2505,25 @@ class ClearFuncs(object): 'The specified returner threw a stack trace:\n', exc_info=True ) + + # always write out to the master job caches + for returner in self.opts['master_job_caches']: + try: + fstr = '{0}.save_load'.format(returner) + self.mminion.returners[fstr](clear_load['jid'], clear_load) + except KeyError: + log.critical( + 'The specified returner used for the external job cache ' + '"{0}" does not have a save_load function!'.format( + returner + ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) + # Set up the payload payload = {'enc': 'aes'} # Altering the contents of the publish load is serious!! Changes here diff --git a/salt/minion.py b/salt/minion.py index 8a7b1b8ffa..fc577a49b2 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1475,6 +1475,7 @@ class Syndic(Minion): self._syndic = True opts['loop_interval'] = 1 super(Syndic, self).__init__(opts) + self.mminion = salt.minion.MasterMinion(opts) def _handle_aes(self, load, sig=None): ''' @@ -1665,10 +1666,9 @@ class Syndic(Minion): if not jdict: jdict['__fun__'] = event['data'].get('fun') jdict['__jid__'] = event['data']['jid'] - jdict['__load__'] = salt.utils.jid_load( - event['data']['jid'], - self.local.opts['cachedir'], - self.opts['hash_type']) + jdict['__load__'] = {} + for returner in self.opts['master_job_caches']: + jdict['__load__'].update(self.mminion.returners['{0}.get_jid'.format(returner)](event['data']['jid'])) jdict[event['data']['id']] = event['data']['return'] else: # Add generic event aggregation here diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py new file mode 100644 index 0000000000..c5d944fb0b --- /dev/null +++ b/salt/returners/local_cache.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +''' +Return data to local job cache + +''' + +# Import python libs +import json +import errno +import logging +import os +import time +import glob +from copy import deepcopy + +# Import salt libs +import salt.payload +import salt.utils + +log = logging.getLogger(__name__) + + +def _jid_dir(jid, makedirs=False): + ''' + Return the jid_dir + ''' + jid_dir = salt.utils.jid_dir( + jid, + __opts__['cachedir'], + __opts__['hash_type'] + ) + + # TODO: remove, this is just to make sure we don't have other refs to another place + jid_dir = os.path.join('/tmp/jids', str(jid)) + if makedirs and not os.path.isdir(jid_dir): + os.makedirs(jid_dir) + + return jid_dir + +def _walk_through(job_dir): + serial = salt.payload.Serial(__opts__) + + for top in os.listdir(job_dir): + t_path = os.path.join(job_dir, top) + + for final in os.listdir(t_path): + load_path = os.path.join(t_path, final, '.load.p') + + if not os.path.isfile(load_path): + continue + + job = serial.load(salt.utils.fopen(load_path, 'rb')) + jid = job['jid'] + yield jid, job, t_path, final + +def _format_job_instance(job): + return {'Function': job.get('fun', 'unknown-function'), + 'Arguments': list(job.get('arg', [])), + # unlikely but safeguard from invalid returns + 'Target': job.get('tgt', 'unknown-target'), + 'Target-type': job.get('tgt_type', []), + 'User': job.get('user', 'root')} + + +def _format_jid_instance(jid, job): + ret = _format_job_instance(job) + ret.update({'StartTime': salt.utils.jid_to_time(jid)}) + return ret + + +def returner(load): + ''' + Return data to the local job cache + ''' + new_loadp = False + if load['jid'] == 'req': + new_loadp = load.get('nocache', True) and True + + serial = salt.payload.Serial(__opts__) + jid_dir = _jid_dir(load['jid'], makedirs=True) + if os.path.exists(os.path.join(jid_dir, 'nocache')): + return + if new_loadp: + with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: + serial.dump(load, fp_) + hn_dir = os.path.join(jid_dir, load['id']) + + try: + os.mkdir(hn_dir) + except OSError as e: + if e.errno == errno.EEXIST: + # Minion has already returned this jid and it should be dropped + log.error( + 'An extra return was detected from minion {0}, please verify ' + 'the minion, this could be a replay attack'.format( + load['id'] + ) + ) + return False + elif e.errno == errno.ENOENT: + log.error( + 'An inconsistency occurred, a job was received with a job id ' + 'that is not present in the local cache: {jid}'.format(**load) + ) + return False + raise + + serial.dump( + load['return'], + # Use atomic open here to avoid the file being read before it's + # completely written to. Refs #1935 + salt.utils.atomicfile.atomic_open( + os.path.join(hn_dir, 'return.p'), 'w+b' + ) + ) + + if 'out' in load: + serial.dump( + load['out'], + # Use atomic open here to avoid the file being read before + # it's completely written to. Refs #1935 + salt.utils.atomicfile.atomic_open( + os.path.join(hn_dir, 'out.p'), 'w+b' + ) + ) + + +def save_load(jid, clear_load): + ''' + Save the load to the specified jid + ''' + jid_dir = _jid_dir(clear_load['jid'], makedirs=True) + + + serial = salt.payload.Serial(__opts__) + + # if you have a tgt, save that for the UI etc + if 'tgt' in clear_load: + ckminions = salt.utils.minions.CkMinions(__opts__) + # Retrieve the minions list + minions = ckminions.check_minions( + clear_load['tgt'], + clear_load.get('tgt_type', 'glob') + ) + # save the minions to a cache so we can see in the UI + serial.dump( + minions, + salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') + ) + + # Save the invocation information + serial.dump( + clear_load, + salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') + ) + + + +def get_load(jid): + ''' + Return the load data that marks a specified jid + ''' + jid_dir = _jid_dir(jid) + if not os.path.exists(jid_dir): + return {} + serial = salt.payload.Serial(__opts__) + + ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'rb')) + + minions_path = os.path.join(jid_dir, '.minions.p') + if os.path.isfile(minions_path): + ret['Minions'] = serial.load(salt.utils.fopen(minions_path, 'rb')) + + return ret + + +def get_jid(jid): + ''' + Return the information returned when the specified job id was executed + ''' + jid_dir = _jid_dir(jid) + serial = salt.payload.Serial(__opts__) + + ret = {} + # Check to see if the jid is real, if not return the empty dict + if not os.path.isdir(jid_dir): + return ret + for fn_ in os.listdir(jid_dir): + if fn_.startswith('.'): + continue + if fn_ not in ret: + retp = os.path.join(jid_dir, fn_, 'return.p') + outp = os.path.join(jid_dir, fn_, 'out.p') + if not os.path.isfile(retp): + continue + while fn_ not in ret: + try: + ret_data = serial.load( + salt.utils.fopen(retp, 'rb')) + ret[fn_] = {'return': ret_data} + if os.path.isfile(outp): + ret[fn_]['out'] = serial.load( + salt.utils.fopen(outp, 'rb')) + except Exception: + pass + return ret + + +def get_jids(): + ''' + Return a list of all job ids + ''' + ret = {} + job_dir = os.path.join(__opts__['cachedir'], 'jobs') + for jid, job, t_path, final in _walk_through(job_dir): + ret[jid] = _format_jid_instance(jid, job) + return ret + diff --git a/salt/runners/jobs.py b/salt/runners/jobs.py index 8065b10441..cc5aa15e43 100644 --- a/salt/runners/jobs.py +++ b/salt/runners/jobs.py @@ -37,18 +37,16 @@ def active(): ret[job['jid']].update({'Running': [{minion: job.get('pid', None)}], 'Returned': []}) else: ret[job['jid']]['Running'].append({minion: job['pid']}) + + mminion = salt.minion.MasterMinion(__opts__) for jid in ret: - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type']) - if not os.path.isdir(jid_dir): - continue - for minion in os.listdir(jid_dir): - if minion.startswith('.') or minion == 'jid': - continue - if os.path.exists(os.path.join(jid_dir, minion)): - ret[jid]['Returned'].append(minion) + returners = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_caches'])) + for returner in returners: + data = mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + if minion not in ret[jid]['Returned']: + ret[jid]['Returned'].append(minion) + salt.output.display_output(ret, 'yaml', __opts__) return ret @@ -64,10 +62,10 @@ def lookup_jid(jid, ext_source=None, output=True): salt-run jobs.lookup_jid 20130916125524463507 ''' ret = {} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: + mminion = salt.minion.MasterMinion(__opts__) + returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + for returner in returners: out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) data = mminion.returners['{0}.get_jid'.format(returner)](jid) for minion in data: if u'return' in data[minion]: @@ -77,19 +75,6 @@ def lookup_jid(jid, ext_source=None, output=True): if 'out' in data[minion]: out = data[minion]['out'] salt.output.display_output(ret, out, __opts__) - return ret - - # Fall back to the local job cache - client = salt.client.get_local_client(__opts__['conf_file']) - - for mid, data in client.get_full_returns(jid, [], 0).items(): - ret[mid] = data.get('ret') - if output: - salt.output.display_output( - {mid: ret[mid]}, - data.get('out', None), - __opts__) - return ret @@ -104,43 +89,14 @@ def list_job(jid, ext_source=None): salt-run jobs.list_job 20130916125524463507 ''' ret = {'jid': jid} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: + mminion = salt.minion.MasterMinion(__opts__) + returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + for returner in returners: out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) job = mminion.returners['{0}.get_load'.format(returner)](jid) ret.update(_format_jid_instance(jid, job)) ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) salt.output.display_output(ret, out, __opts__) - return ret - - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type'] - ) - - if not os.path.exists(jid_dir): - return ret - - # we have to copy/paste this code, because we don't seem to have a good API - serial = salt.payload.Serial(__opts__) - - # get the load info - load_path = os.path.join(jid_dir, '.load.p') - job = serial.load(salt.utils.fopen(load_path, 'rb')) - ret.update(_format_jid_instance(jid, job)) - - # get the hosts information using the localclient (instead of re-implementing the code...) - client = salt.client.get_local_client(__opts__['conf_file']) - - ret['Result'] = {} - minions_path = os.path.join(jid_dir, '.minions.p') - if os.path.isfile(minions_path): - minions = serial.load(salt.utils.fopen(minions_path, 'rb')) - ret['Minions'] = minions - - salt.output.display_output(ret, 'yaml', __opts__) return ret @@ -154,19 +110,12 @@ def list_jobs(ext_source=None): salt-run jobs.list_jobs ''' - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner: + returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + mminion = salt.minion.MasterMinion(__opts__) + for returner in returners: out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) ret = mminion.returners['{0}.get_jids'.format(returner)]() salt.output.display_output(ret, out, __opts__) - return ret - - ret = {} - job_dir = os.path.join(__opts__['cachedir'], 'jobs') - for jid, job, t_path, final in _walk_through(job_dir): - ret[jid] = _format_jid_instance(jid, job) - salt.output.display_output(ret, 'yaml', __opts__) return ret @@ -182,61 +131,39 @@ def print_job(jid, ext_source=None): ''' ret = {} - returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_ext_job_cache'])) - if returner and False: + returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + mminion = salt.minion.MasterMinion(__opts__) + for returner in returners: out = 'nested' - mminion = salt.minion.MasterMinion(__opts__) job = mminion.returners['{0}.get_load'.format(returner)](jid) ret[jid] = _format_jid_instance(jid, job) ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) salt.output.display_output(ret, out, __opts__) - return ret - - jid_dir = salt.utils.jid_dir( - jid, - __opts__['cachedir'], - __opts__['hash_type'] - ) - - if not os.path.exists(jid_dir): - return ret - - # we have to copy/paste this code, because we don't seem to have a good API - serial = salt.payload.Serial(__opts__) - - # get the load info - load_path = os.path.join(jid_dir, '.load.p') - job = serial.load(salt.utils.fopen(load_path, 'rb')) - ret[jid] = _format_jid_instance(jid, job) - - # get the hosts information using the localclient (instead of re-implementing the code...) - client = salt.client.get_local_client(__opts__['conf_file']) - - ret[jid]['Result'] = {} - for mid, data in client.get_full_returns(jid, [], 0).items(): - # have to make it say return so that it matches everyone else... - minion_data = {'return': data.get('ret')} - ret[jid]['Result'].update({mid: minion_data}) - salt.output.display_output(ret, 'yaml', __opts__) return ret -def _get_returner(returners): +def _get_returner(returner_types): ''' - Helper to iterate over retuerners and pick the first one + Helper to iterate over retuerner_types and pick the first one ''' - for ret in returners: - if ret: - return ret + for returners in returner_types: + if returners: + if type(returners) != list: + return [returners] + return returners def _format_job_instance(job): - return {'Function': job.get('fun', 'unknown-function'), - 'Arguments': list(job.get('arg', [])), - # unlikely but safeguard from invalid returns - 'Target': job.get('tgt', 'unknown-target'), - 'Target-type': job.get('tgt_type', []), - 'User': job.get('user', 'root')} + ret = {'Function': job.get('fun', 'unknown-function'), + 'Arguments': list(job.get('arg', [])), + # unlikely but safeguard from invalid returns + 'Target': job.get('tgt', 'unknown-target'), + 'Target-type': job.get('tgt_type', []), + 'User': job.get('user', 'root')} + + if 'Minions' in job: + ret['Minions'] = job['Minions'] + return ret def _format_jid_instance(jid, job): diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index 717ea8b401..f63d2c4ade 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -605,6 +605,11 @@ def jid_load(jid, cachedir, sum_type, serial='msgpack'): ''' Return the load data for a given job id ''' + salt.utils.warn_until( + 'Boron', + 'Getting the load has been moved into the returner interface ' + 'please get the data from the returners listedin master_job_caches ' + ) _dir = jid_dir(jid, cachedir, sum_type) load_fn = os.path.join(_dir, '.load.p') if not os.path.isfile(load_fn): diff --git a/salt/utils/returners.py b/salt/utils/returners.py new file mode 100644 index 0000000000..88c978f249 --- /dev/null +++ b/salt/utils/returners.py @@ -0,0 +1,17 @@ +''' +Helper functions for returners +''' + + +def valid_jid(jid, returners, mminion): + ''' + Return boolean of wether this jid exists in any of the returners passed in + ''' + valid_jid = False + for returner in returners: + if mminion.returners['{0}.get_load'.format(returner)](jid) != {}: + valid_jid = True + break + return valid_jid + + From efaf4ccc435e571ba8f4ac1718439072544d51a7 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 14:55:57 -0700 Subject: [PATCH 02/20] Move the job cleanup into the returner --- salt/daemons/masterapi.py | 46 ++++++++--------------------------- salt/returners/local_cache.py | 42 ++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index bf6b035110..c7172fc925 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -100,43 +100,17 @@ def clean_old_jobs(opts): ''' Clean out the old jobs from the job cache ''' - if opts['keep_jobs'] != 0: - jid_root = os.path.join(opts['cachedir'], 'jobs') - cur = datetime.datetime.now() + mminion = salt.minion.MasterMinion( + self.opts, + states=False, + rend=False, + ) + # for all master_job_caches, clean out the old jobs-- if they implemented it + for returner in opts['master_job_caches']: + fstr = '{0}.clean_old_jobs'.format(returner) + if fstr in self.mminion.returners: + self.mminion.returners[fstr]() - if os.path.exists(jid_root): - for top in os.listdir(jid_root): - t_path = os.path.join(jid_root, top) - for final in os.listdir(t_path): - f_path = os.path.join(t_path, final) - jid_file = os.path.join(f_path, 'jid') - if not os.path.isfile(jid_file): - # No jid file means corrupted cache entry, scrub it - shutil.rmtree(f_path) - else: - with salt.utils.fopen(jid_file, 'r') as fn_: - jid = fn_.read() - if len(jid) < 18: - # Invalid jid, scrub the dir - shutil.rmtree(f_path) - else: - # Parse the jid into a proper datetime object. - # We only parse down to the minute, since keep - # jobs is measured in hours, so a minute - # difference is not important. - try: - jidtime = datetime.datetime(int(jid[0:4]), - int(jid[4:6]), - int(jid[6:8]), - int(jid[8:10]), - int(jid[10:12])) - except ValueError as e: - # Invalid jid, scrub the dir - shutil.rmtree(f_path) - difference = cur - jidtime - hours_difference = difference.seconds / 3600.0 - if hours_difference > opts['keep_jobs']: - shutil.rmtree(f_path) def access_keys(opts): diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index c5d944fb0b..bc0e2da4bc 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -216,3 +216,45 @@ def get_jids(): ret[jid] = _format_jid_instance(jid, job) return ret +def clean_old_jobs(): + ''' + Clean out the old jobs from the job cache + ''' + if __opts__['keep_jobs'] != 0: + jid_root = os.path.join(__opts__['cachedir'], 'jobs') + cur = datetime.datetime.now() + + if os.path.exists(jid_root): + for top in os.listdir(jid_root): + t_path = os.path.join(jid_root, top) + for final in os.listdir(t_path): + f_path = os.path.join(t_path, final) + jid_file = os.path.join(f_path, 'jid') + if not os.path.isfile(jid_file): + # No jid file means corrupted cache entry, scrub it + shutil.rmtree(f_path) + else: + with salt.utils.fopen(jid_file, 'r') as fn_: + jid = fn_.read() + if len(jid) < 18: + # Invalid jid, scrub the dir + shutil.rmtree(f_path) + else: + # Parse the jid into a proper datetime object. + # We only parse down to the minute, since keep + # jobs is measured in hours, so a minute + # difference is not important. + try: + jidtime = datetime.datetime(int(jid[0:4]), + int(jid[4:6]), + int(jid[6:8]), + int(jid[8:10]), + int(jid[10:12])) + except ValueError as e: + # Invalid jid, scrub the dir + shutil.rmtree(f_path) + difference = cur - jidtime + hours_difference = difference.seconds / 3600.0 + if hours_difference > __opts__['keep_jobs']: + shutil.rmtree(f_path) + From aed8057753ac8a87fecb83f0cdec373af18121e1 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 15:03:21 -0700 Subject: [PATCH 03/20] Move the .p file names into global variables within the module-- so they are in one place --- salt/daemons/masterapi.py | 2 -- salt/returners/local_cache.py | 44 +++++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index c7172fc925..413a5a6fb3 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -8,8 +8,6 @@ involves preparing the three listeners and the workers needed by the master. import os import re import logging -import shutil -import datetime try: import pwd except ImportError: diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index bc0e2da4bc..70460a4f4c 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -11,6 +11,8 @@ import logging import os import time import glob +import shutil +import datetime from copy import deepcopy # Import salt libs @@ -19,6 +21,19 @@ import salt.utils log = logging.getLogger(__name__) +''' +Filenames for the cache +''' +# load is the published job +LOAD_P = '.load.p' +# the list of minions that the job is targeted to (best effort match on the master side) +MINIONS_P = '.minions.p' +# return is the "return" from the minion data +RETURN_P = 'return.p' +# out is the "out" from the minion data +OUT_P = 'out.p' + + def _jid_dir(jid, makedirs=False): ''' @@ -44,7 +59,7 @@ def _walk_through(job_dir): t_path = os.path.join(job_dir, top) for final in os.listdir(t_path): - load_path = os.path.join(t_path, final, '.load.p') + load_path = os.path.join(t_path, final, LOAD_P) if not os.path.isfile(load_path): continue @@ -72,17 +87,16 @@ def returner(load): ''' Return data to the local job cache ''' - new_loadp = False - if load['jid'] == 'req': - new_loadp = load.get('nocache', True) and True - serial = salt.payload.Serial(__opts__) jid_dir = _jid_dir(load['jid'], makedirs=True) if os.path.exists(os.path.join(jid_dir, 'nocache')): return - if new_loadp: - with salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') as fp_: + + # do we need to rewrite the load? + if load['jid'] == 'req' and bool(load.get('nocache', True)): + with salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b') as fp_: serial.dump(load, fp_) + hn_dir = os.path.join(jid_dir, load['id']) try: @@ -110,7 +124,7 @@ def returner(load): # Use atomic open here to avoid the file being read before it's # completely written to. Refs #1935 salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'return.p'), 'w+b' + os.path.join(hn_dir, RETURN_P), 'w+b' ) ) @@ -120,7 +134,7 @@ def returner(load): # Use atomic open here to avoid the file being read before # it's completely written to. Refs #1935 salt.utils.atomicfile.atomic_open( - os.path.join(hn_dir, 'out.p'), 'w+b' + os.path.join(hn_dir, OUT_P), 'w+b' ) ) @@ -145,13 +159,13 @@ def save_load(jid, clear_load): # save the minions to a cache so we can see in the UI serial.dump( minions, - salt.utils.fopen(os.path.join(jid_dir, '.minions.p'), 'w+b') + salt.utils.fopen(os.path.join(jid_dir, MINIONS_P), 'w+b') ) # Save the invocation information serial.dump( clear_load, - salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'w+b') + salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b') ) @@ -165,9 +179,9 @@ def get_load(jid): return {} serial = salt.payload.Serial(__opts__) - ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, '.load.p'), 'rb')) + ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'rb')) - minions_path = os.path.join(jid_dir, '.minions.p') + minions_path = os.path.join(jid_dir, MINIONS_P) if os.path.isfile(minions_path): ret['Minions'] = serial.load(salt.utils.fopen(minions_path, 'rb')) @@ -189,8 +203,8 @@ def get_jid(jid): if fn_.startswith('.'): continue if fn_ not in ret: - retp = os.path.join(jid_dir, fn_, 'return.p') - outp = os.path.join(jid_dir, fn_, 'out.p') + retp = os.path.join(jid_dir, fn_, RETURN_P) + outp = os.path.join(jid_dir, fn_, OUT_P) if not os.path.isfile(retp): continue while fn_ not in ret: From f4d97798cff66dc014c236329ff192c850920e69 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 15:08:57 -0700 Subject: [PATCH 04/20] Consolidate job dir into a private function --- salt/returners/local_cache.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index 70460a4f4c..ffa87490e4 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -33,7 +33,11 @@ RETURN_P = 'return.p' # out is the "out" from the minion data OUT_P = 'out.p' - +def _job_dir(): + ''' + Return root of the jobs cache directory + ''' + return os.path.join(__opts__['cachedir'], 'jobs') def _jid_dir(jid, makedirs=False): ''' @@ -225,8 +229,7 @@ def get_jids(): Return a list of all job ids ''' ret = {} - job_dir = os.path.join(__opts__['cachedir'], 'jobs') - for jid, job, t_path, final in _walk_through(job_dir): + for jid, job, t_path, final in _walk_through(_job_dir()): ret[jid] = _format_jid_instance(jid, job) return ret @@ -235,10 +238,9 @@ def clean_old_jobs(): Clean out the old jobs from the job cache ''' if __opts__['keep_jobs'] != 0: - jid_root = os.path.join(__opts__['cachedir'], 'jobs') cur = datetime.datetime.now() - if os.path.exists(jid_root): + if os.path.exists(_job_dir()): for top in os.listdir(jid_root): t_path = os.path.join(jid_root, top) for final in os.listdir(t_path): From a99a3d004e5014f53fe79dac2b71e3c5104dd722 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 15:17:10 -0700 Subject: [PATCH 05/20] Remove local jid_dir override from testing --- salt/daemons/masterapi.py | 1 - salt/returners/local_cache.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 413a5a6fb3..83e359b3c5 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -490,7 +490,6 @@ class RemoteFuncs(object): for returner in self.opts['master_job_caches']: fstr = '{0}.returner'.format(returner) self.mminion.returners[fstr](load) - return def _syndic_return(self, load): diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index ffa87490e4..4d5ce29682 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -41,7 +41,7 @@ def _job_dir(): def _jid_dir(jid, makedirs=False): ''' - Return the jid_dir + Return the jid_dir, and optionally create it ''' jid_dir = salt.utils.jid_dir( jid, @@ -49,8 +49,6 @@ def _jid_dir(jid, makedirs=False): __opts__['hash_type'] ) - # TODO: remove, this is just to make sure we don't have other refs to another place - jid_dir = os.path.join('/tmp/jids', str(jid)) if makedirs and not os.path.isdir(jid_dir): os.makedirs(jid_dir) From 45581819a7817ff3ed2304b267e93f93907e2e45 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 15:52:35 -0700 Subject: [PATCH 06/20] Remove some self references, and remove an indent level on clean_old_jobs --- salt/daemons/masterapi.py | 6 ++-- salt/returners/local_cache.py | 63 ++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 83e359b3c5..b80ef5c101 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -99,15 +99,15 @@ def clean_old_jobs(opts): Clean out the old jobs from the job cache ''' mminion = salt.minion.MasterMinion( - self.opts, + opts, states=False, rend=False, ) # for all master_job_caches, clean out the old jobs-- if they implemented it for returner in opts['master_job_caches']: fstr = '{0}.clean_old_jobs'.format(returner) - if fstr in self.mminion.returners: - self.mminion.returners[fstr]() + if fstr in mminion.returners: + mminion.returners[fstr]() diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index 4d5ce29682..52482d0b5a 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -45,7 +45,7 @@ def _jid_dir(jid, makedirs=False): ''' jid_dir = salt.utils.jid_dir( jid, - __opts__['cachedir'], + os.path.join(__opts__['cachedir'], 'NEWPATHFORTESTING'), __opts__['hash_type'] ) @@ -238,37 +238,40 @@ def clean_old_jobs(): if __opts__['keep_jobs'] != 0: cur = datetime.datetime.now() - if os.path.exists(_job_dir()): - for top in os.listdir(jid_root): - t_path = os.path.join(jid_root, top) - for final in os.listdir(t_path): - f_path = os.path.join(t_path, final) - jid_file = os.path.join(f_path, 'jid') - if not os.path.isfile(jid_file): - # No jid file means corrupted cache entry, scrub it + jid_root = _job_dir() + if not os.path.exists(jid_root): + return + + for top in os.listdir(jid_root): + t_path = os.path.join(jid_root, top) + for final in os.listdir(t_path): + f_path = os.path.join(t_path, final) + jid_file = os.path.join(f_path, 'jid') + if not os.path.isfile(jid_file): + # No jid file means corrupted cache entry, scrub it + shutil.rmtree(f_path) + else: + with salt.utils.fopen(jid_file, 'r') as fn_: + jid = fn_.read() + if len(jid) < 18: + # Invalid jid, scrub the dir shutil.rmtree(f_path) else: - with salt.utils.fopen(jid_file, 'r') as fn_: - jid = fn_.read() - if len(jid) < 18: + # Parse the jid into a proper datetime object. + # We only parse down to the minute, since keep + # jobs is measured in hours, so a minute + # difference is not important. + try: + jidtime = datetime.datetime(int(jid[0:4]), + int(jid[4:6]), + int(jid[6:8]), + int(jid[8:10]), + int(jid[10:12])) + except ValueError as e: # Invalid jid, scrub the dir shutil.rmtree(f_path) - else: - # Parse the jid into a proper datetime object. - # We only parse down to the minute, since keep - # jobs is measured in hours, so a minute - # difference is not important. - try: - jidtime = datetime.datetime(int(jid[0:4]), - int(jid[4:6]), - int(jid[6:8]), - int(jid[8:10]), - int(jid[10:12])) - except ValueError as e: - # Invalid jid, scrub the dir - shutil.rmtree(f_path) - difference = cur - jidtime - hours_difference = difference.seconds / 3600.0 - if hours_difference > __opts__['keep_jobs']: - shutil.rmtree(f_path) + difference = cur - jidtime + hours_difference = difference.seconds / 3600.0 + if hours_difference > __opts__['keep_jobs']: + shutil.rmtree(f_path) From 571d6550828381c697a9a5416aacf20291c35694 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 16:08:16 -0700 Subject: [PATCH 07/20] Misc pylint cleanups and a few variable fixes ;) --- salt/client/__init__.py | 5 ++--- salt/daemons/masterapi.py | 6 ++---- salt/returners/local_cache.py | 11 ++++------- salt/utils/returners.py | 3 +-- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 3a100f76af..39a812b923 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -768,7 +768,6 @@ class LocalClient(object): if len(found.intersection(minions)) >= len(minions): raise StopIteration() - def get_iter_returns( self, jid, @@ -947,7 +946,7 @@ class LocalClient(object): else: m_data['ret'] = data[minion].get('return') if 'out' in data[minion]: - md_data['out'] = data[minion]['out'] + m_data['out'] = data[minion]['out'] if minion in ret: ret[minion].update(m_data) else: @@ -991,7 +990,7 @@ class LocalClient(object): else: m_data['ret'] = data[minion].get('return') if 'out' in data[minion]: - md_data['out'] = data[minion]['out'] + m_data['out'] = data[minion]['out'] if minion in ret: ret[minion].update(m_data) else: diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index b80ef5c101..3820a37c4a 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -110,7 +110,6 @@ def clean_old_jobs(opts): mminion.returners[fstr]() - def access_keys(opts): ''' A key needs to be placed in the filesystem with permissions 0400 so @@ -491,7 +490,6 @@ class RemoteFuncs(object): fstr = '{0}.returner'.format(returner) self.mminion.returners[fstr](load) - def _syndic_return(self, load): ''' Receive a syndic minion return and format it to look like returns from @@ -1259,7 +1257,7 @@ class LocalFuncs(object): if self.opts['ext_job_cache']: try: fstr = '{0}.save_load'.format(self.opts['ext_job_cache']) - self.mminion.returners[fstr](clear_load['jid'], clear_load) + self.mminion.returners[fstr](load['jid'], load) except KeyError: log.critical( 'The specified returner used for the external job cache ' @@ -1277,7 +1275,7 @@ class LocalFuncs(object): for returner in self.opts['master_job_caches']: try: fstr = '{0}.save_load'.format(returner) - self.mminion.returners[fstr](clear_load['jid'], clear_load) + self.mminion.returners[fstr](load['jid'], load) except KeyError: log.critical( 'The specified returner used for the external job cache ' diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index 52482d0b5a..b80e702b6b 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -5,15 +5,11 @@ Return data to local job cache ''' # Import python libs -import json import errno import logging import os -import time -import glob import shutil import datetime -from copy import deepcopy # Import salt libs import salt.payload @@ -39,6 +35,7 @@ def _job_dir(): ''' return os.path.join(__opts__['cachedir'], 'jobs') + def _jid_dir(jid, makedirs=False): ''' Return the jid_dir, and optionally create it @@ -54,6 +51,7 @@ def _jid_dir(jid, makedirs=False): return jid_dir + def _walk_through(job_dir): serial = salt.payload.Serial(__opts__) @@ -70,6 +68,7 @@ def _walk_through(job_dir): jid = job['jid'] yield jid, job, t_path, final + def _format_job_instance(job): return {'Function': job.get('fun', 'unknown-function'), 'Arguments': list(job.get('arg', [])), @@ -147,7 +146,6 @@ def save_load(jid, clear_load): ''' jid_dir = _jid_dir(clear_load['jid'], makedirs=True) - serial = salt.payload.Serial(__opts__) # if you have a tgt, save that for the UI etc @@ -171,7 +169,6 @@ def save_load(jid, clear_load): ) - def get_load(jid): ''' Return the load data that marks a specified jid @@ -231,6 +228,7 @@ def get_jids(): ret[jid] = _format_jid_instance(jid, job) return ret + def clean_old_jobs(): ''' Clean out the old jobs from the job cache @@ -274,4 +272,3 @@ def clean_old_jobs(): hours_difference = difference.seconds / 3600.0 if hours_difference > __opts__['keep_jobs']: shutil.rmtree(f_path) - diff --git a/salt/utils/returners.py b/salt/utils/returners.py index 88c978f249..3de5eaad1c 100644 --- a/salt/utils/returners.py +++ b/salt/utils/returners.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- ''' Helper functions for returners ''' @@ -13,5 +14,3 @@ def valid_jid(jid, returners, mminion): valid_jid = True break return valid_jid - - From c212a2e6c92092d6017ee35b24ab85d884663884 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Apr 2014 16:22:17 -0700 Subject: [PATCH 08/20] Pylint cleanup --- salt/client/ssh/__init__.py | 1 - salt/master.py | 1 - salt/returners/local_cache.py | 4 +--- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index 43d0a5b3e9..ec1f94946a 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -461,7 +461,6 @@ class SSH(object): for returner in self.opts['master_job_caches']: self.mminion.returners['{0}.save_load'.format(returner)](jid, job_load) - if self.opts.get('verbose'): msg = 'Executing job with jid {0}'.format(jid) print(msg) diff --git a/salt/master.py b/salt/master.py index 138dde4f62..cf88ad121e 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1269,7 +1269,6 @@ class AESFuncs(object): fstr = '{0}.returner'.format(returner) self.mminion.returners[fstr](load) - def _syndic_return(self, load): ''' Receive a syndic minion return and format it to look like returns from diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index b80e702b6b..c1af27a6c9 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -17,9 +17,6 @@ import salt.utils log = logging.getLogger(__name__) -''' -Filenames for the cache -''' # load is the published job LOAD_P = '.load.p' # the list of minions that the job is targeted to (best effort match on the master side) @@ -29,6 +26,7 @@ RETURN_P = 'return.p' # out is the "out" from the minion data OUT_P = 'out.p' + def _job_dir(): ''' Return root of the jobs cache directory From 22f6cca94ac4333a3b5a257903156f3df04b11d9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 09:29:32 -0700 Subject: [PATCH 09/20] Change from master_job_caches to master_job_cache. If we want multiple returners we can create a returner that does that. --- salt/client/__init__.py | 65 ++++++++++++++--------------- salt/client/ssh/__init__.py | 9 ++-- salt/config.py | 4 +- salt/daemons/masterapi.py | 52 +++++++++++------------ salt/master.py | 39 ++++++++--------- salt/minion.py | 6 ++- salt/runners/jobs.py | 83 ++++++++++++++++++------------------- salt/utils/__init__.py | 8 +++- salt/utils/returners.py | 16 ------- 9 files changed, 133 insertions(+), 149 deletions(-) delete mode 100644 salt/utils/returners.py diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 39a812b923..50b0a20f0d 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -35,7 +35,6 @@ import salt.utils import salt.utils.args import salt.utils.event import salt.utils.minions -import salt.utils.returners import salt.utils.verify import salt.syspaths as syspaths from salt.exceptions import ( @@ -793,7 +792,7 @@ class LocalClient(object): timeout_at = start + timeout found = set() # Check to see if the jid is real, if not return the empty dict - if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: log.warning("jid does not exist") yield {} # stop the iteration, since the jid is invalid @@ -896,7 +895,7 @@ class LocalClient(object): found = set() ret = {} # Check to see if the jid is real, if not return the empty dict - if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: log.warning("jid does not exist") return ret @@ -937,20 +936,19 @@ class LocalClient(object): # create the iterator-- since we want to get anyone in the middle event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) - for returner in self.opts['master_job_caches']: - data = self.mminion.returners['{0}.get_jid'.format(returner)](jid) - for minion in data: - m_data = {} - if u'return' in data[minion]: - m_data['ret'] = data[minion].get(u'return') - else: - m_data['ret'] = data[minion].get('return') - if 'out' in data[minion]: - m_data['out'] = data[minion]['out'] - if minion in ret: - ret[minion].update(m_data) - else: - ret[minion] = m_data + data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + m_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data # if we have all the minion returns, lets just return if len(set(ret.keys()).intersection(minions)) >= len(minions): @@ -981,20 +979,19 @@ class LocalClient(object): ''' ret = {} - for returner in self.opts['master_job_caches']: - data = self.mminion.returners['{0}.get_jid'.format(returner)](jid) - for minion in data: - m_data = {} - if u'return' in data[minion]: - m_data['ret'] = data[minion].get(u'return') - else: - m_data['ret'] = data[minion].get('return') - if 'out' in data[minion]: - m_data['out'] = data[minion]['out'] - if minion in ret: - ret[minion].update(m_data) - else: - ret[minion] = m_data + data = self.mminion.returners['{0}.get_jid'.format(self.opts['master_job_cache'])](jid) + for minion in data: + m_data = {} + if u'return' in data[minion]: + m_data['ret'] = data[minion].get(u'return') + else: + m_data['ret'] = data[minion].get('return') + if 'out' in data[minion]: + m_data['out'] = data[minion]['out'] + if minion in ret: + ret[minion].update(m_data) + else: + ret[minion] = m_data return ret @@ -1027,7 +1024,7 @@ class LocalClient(object): found = set() ret = {} # Check to see if the jid is real, if not return the empty dict - if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: log.warning("jid does not exist") return ret # Wait for the hosts to check in @@ -1104,7 +1101,7 @@ class LocalClient(object): timeout_at = start + timeout found = set() # Check to see if the jid is real, if not return the empty dict - if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: log.warning("jid does not exist") yield {} # stop the iteration, since the jid is invalid @@ -1202,7 +1199,7 @@ class LocalClient(object): found = set() # Check to see if the jid is real, if not return the empty dict - if not salt.utils.returners.valid_jid(jid, self.opts['master_job_caches'], self.mminion): + if not self.mminion.returners['{0}.get_load'.format(self.opts['master_job_cache'])](jid) != {}: log.warning("jid does not exist") yield {} # stop the iteration, since the jid is invalid diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index ec1f94946a..b6fb3a4da7 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -424,8 +424,9 @@ class SSH(object): ''' Cache the job information ''' - for returner in self.opts['master_job_caches']: - self.mminion.returners['{0}.returner'.format(returner)]({'jid': jid, 'id': id_, 'return': ret}) + self.mminion.returners['{0}.returner'.format(self.opts['master_job_cache'])]({'jid': jid, + 'id': id_, + 'return': ret}) def run(self): ''' @@ -458,8 +459,8 @@ class SSH(object): 'arg': args, } - for returner in self.opts['master_job_caches']: - self.mminion.returners['{0}.save_load'.format(returner)](jid, job_load) + # save load to the master job cache + self.mminion.returners['{0}.save_load'.format(self.opts['master_job_cache'])](jid, job_load) if self.opts.get('verbose'): msg = 'Executing job with jid {0}'.format(jid) diff --git a/salt/config.py b/salt/config.py index 5289d8ed7b..89d63d16bb 100644 --- a/salt/config.py +++ b/salt/config.py @@ -187,7 +187,7 @@ VALID_OPTS = { 'order_masters': bool, 'job_cache': bool, 'ext_job_cache': str, - 'master_job_caches': list, + 'master_job_cache': str, 'minion_data_cache': bool, 'publish_session': int, 'reactor': list, @@ -413,7 +413,7 @@ DEFAULT_MASTER_OPTS = { 'order_masters': False, 'job_cache': True, 'ext_job_cache': '', - 'master_job_caches': ['local_cache'], + 'master_job_cache': 'local_cache', 'minion_data_cache': True, 'enforce_mine_cache': False, 'ipv6': False, diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 3820a37c4a..4b2a3f00d6 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -98,16 +98,16 @@ def clean_old_jobs(opts): ''' Clean out the old jobs from the job cache ''' + # TODO: better way to not require creating the masterminion every time? mminion = salt.minion.MasterMinion( opts, states=False, rend=False, ) - # for all master_job_caches, clean out the old jobs-- if they implemented it - for returner in opts['master_job_caches']: - fstr = '{0}.clean_old_jobs'.format(returner) - if fstr in mminion.returners: - mminion.returners[fstr]() + # If the master job cache has a clean_old_jobs, call it + fstr = '{0}.clean_old_jobs'.format(opts['master_job_cache']) + if fstr in mminion.returners: + mminion.returners[fstr]() def access_keys(opts): @@ -486,9 +486,9 @@ class RemoteFuncs(object): self.event.fire_ret_load(load) if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): return - for returner in self.opts['master_job_caches']: - fstr = '{0}.returner'.format(returner) - self.mminion.returners[fstr](load) + + fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load) def _syndic_return(self, load): ''' @@ -500,9 +500,8 @@ class RemoteFuncs(object): return None # if we have a load, save it if 'load' in load: - for returner in self.opts['master_job_caches']: - fstr = '{0}.save_load'.format(returner) - self.mminion.returners[fstr](load['jid'], load['load']) + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load['load']) # Format individual return loads for key, item in load['return'].items(): @@ -1271,23 +1270,22 @@ class LocalFuncs(object): exc_info=True ) - # always write out to the master job caches - for returner in self.opts['master_job_caches']: - try: - fstr = '{0}.save_load'.format(returner) - self.mminion.returners[fstr](load['jid'], load) - except KeyError: - log.critical( - 'The specified returner used for the external job cache ' - '"{0}" does not have a save_load function!'.format( - returner - ) - ) - except Exception: - log.critical( - 'The specified returner threw a stack trace:\n', - exc_info=True + # always write out to the master job cache + try: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load) + except KeyError: + log.critical( + 'The specified returner used for the external job cache ' + '"{0}" does not have a save_load function!'.format( + returner ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) # Set up the payload payload = {'enc': 'aes'} # Altering the contents of the publish load is serious!! Changes here diff --git a/salt/master.py b/salt/master.py index cf88ad121e..10829f1dd4 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1265,9 +1265,8 @@ class AESFuncs(object): return # otherwise, write to the master cache - for returner in self.opts['master_job_caches']: - fstr = '{0}.returner'.format(returner) - self.mminion.returners[fstr](load) + fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load) def _syndic_return(self, load): ''' @@ -1279,9 +1278,8 @@ class AESFuncs(object): return None # if we have a load, save it if 'load' in load: - for returner in self.opts['master_job_caches']: - fstr = '{0}.save_load'.format(returner) - self.mminion.returners[fstr](load['jid'], load) + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load['jid'], load) # Format individual return loads for key, item in load['return'].items(): @@ -2506,22 +2504,21 @@ class ClearFuncs(object): ) # always write out to the master job caches - for returner in self.opts['master_job_caches']: - try: - fstr = '{0}.save_load'.format(returner) - self.mminion.returners[fstr](clear_load['jid'], clear_load) - except KeyError: - log.critical( - 'The specified returner used for the external job cache ' - '"{0}" does not have a save_load function!'.format( - returner - ) - ) - except Exception: - log.critical( - 'The specified returner threw a stack trace:\n', - exc_info=True + try: + fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](clear_load['jid'], clear_load) + except KeyError: + log.critical( + 'The specified returner used for the external job cache ' + '"{0}" does not have a save_load function!'.format( + returner ) + ) + except Exception: + log.critical( + 'The specified returner threw a stack trace:\n', + exc_info=True + ) # Set up the payload payload = {'enc': 'aes'} diff --git a/salt/minion.py b/salt/minion.py index fc577a49b2..791f76700f 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1667,8 +1667,10 @@ class Syndic(Minion): jdict['__fun__'] = event['data'].get('fun') jdict['__jid__'] = event['data']['jid'] jdict['__load__'] = {} - for returner in self.opts['master_job_caches']: - jdict['__load__'].update(self.mminion.returners['{0}.get_jid'.format(returner)](event['data']['jid'])) + fstr = '{0}.get_jid'.format(self.opts['master_job_cache']) + jdict['__load__'].update( + self.mminion.returners[fstr](event['data']['jid']) + ) jdict[event['data']['id']] = event['data']['return'] else: # Add generic event aggregation here diff --git a/salt/runners/jobs.py b/salt/runners/jobs.py index cc5aa15e43..8c29e7a871 100644 --- a/salt/runners/jobs.py +++ b/salt/runners/jobs.py @@ -40,12 +40,11 @@ def active(): mminion = salt.minion.MasterMinion(__opts__) for jid in ret: - returners = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_caches'])) - for returner in returners: - data = mminion.returners['{0}.get_jid'.format(returner)](jid) - for minion in data: - if minion not in ret[jid]['Returned']: - ret[jid]['Returned'].append(minion) + returner = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_cache'])) + data = mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + if minion not in ret[jid]['Returned']: + ret[jid]['Returned'].append(minion) salt.output.display_output(ret, 'yaml', __opts__) return ret @@ -63,18 +62,18 @@ def lookup_jid(jid, ext_source=None, output=True): ''' ret = {} mminion = salt.minion.MasterMinion(__opts__) - returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) - for returner in returners: - out = 'nested' - data = mminion.returners['{0}.get_jid'.format(returner)](jid) - for minion in data: - if u'return' in data[minion]: - ret[minion] = data[minion].get(u'return') - else: - ret[minion] = data[minion].get('return') - if 'out' in data[minion]: - out = data[minion]['out'] - salt.output.display_output(ret, out, __opts__) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) + + out = 'nested' + data = mminion.returners['{0}.get_jid'.format(returner)](jid) + for minion in data: + if u'return' in data[minion]: + ret[minion] = data[minion].get(u'return') + else: + ret[minion] = data[minion].get('return') + if 'out' in data[minion]: + out = data[minion]['out'] + salt.output.display_output(ret, out, __opts__) return ret @@ -90,13 +89,13 @@ def list_job(jid, ext_source=None): ''' ret = {'jid': jid} mminion = salt.minion.MasterMinion(__opts__) - returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) - for returner in returners: - out = 'nested' - job = mminion.returners['{0}.get_load'.format(returner)](jid) - ret.update(_format_jid_instance(jid, job)) - ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) + + out = 'nested' + job = mminion.returners['{0}.get_load'.format(returner)](jid) + ret.update(_format_jid_instance(jid, job)) + ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) + salt.output.display_output(ret, out, __opts__) return ret @@ -110,12 +109,13 @@ def list_jobs(ext_source=None): salt-run jobs.list_jobs ''' - returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) mminion = salt.minion.MasterMinion(__opts__) - for returner in returners: - out = 'nested' - ret = mminion.returners['{0}.get_jids'.format(returner)]() - salt.output.display_output(ret, out, __opts__) + + out = 'nested' + ret = mminion.returners['{0}.get_jids'.format(returner)]() + salt.output.display_output(ret, out, __opts__) + return ret @@ -131,14 +131,15 @@ def print_job(jid, ext_source=None): ''' ret = {} - returners = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_caches'])) + returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) mminion = salt.minion.MasterMinion(__opts__) - for returner in returners: - out = 'nested' - job = mminion.returners['{0}.get_load'.format(returner)](jid) - ret[jid] = _format_jid_instance(jid, job) - ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) + + out = 'nested' + job = mminion.returners['{0}.get_load'.format(returner)](jid) + ret[jid] = _format_jid_instance(jid, job) + ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) + salt.output.display_output(ret, out, __opts__) + return ret @@ -146,11 +147,9 @@ def _get_returner(returner_types): ''' Helper to iterate over retuerner_types and pick the first one ''' - for returners in returner_types: - if returners: - if type(returners) != list: - return [returners] - return returners + for returner in returner_types: + if returner: + return returner def _format_job_instance(job): diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index f63d2c4ade..56f2be60b8 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -573,6 +573,12 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False): ''' Return a job id and prepare the job id directory ''' + salt.utils.warn_until( + 'Boron', + 'All job_cache management has been moved into the local_cache ' + 'returner, this util function will be removed-- please use ' + 'the returner' + ) jid = gen_jid() jid_dir_ = jid_dir(jid, cachedir, sum_type) @@ -608,7 +614,7 @@ def jid_load(jid, cachedir, sum_type, serial='msgpack'): salt.utils.warn_until( 'Boron', 'Getting the load has been moved into the returner interface ' - 'please get the data from the returners listedin master_job_caches ' + 'please get the data from the master_job_cache ' ) _dir = jid_dir(jid, cachedir, sum_type) load_fn = os.path.join(_dir, '.load.p') diff --git a/salt/utils/returners.py b/salt/utils/returners.py deleted file mode 100644 index 3de5eaad1c..0000000000 --- a/salt/utils/returners.py +++ /dev/null @@ -1,16 +0,0 @@ -# -*- coding: utf-8 -*- -''' -Helper functions for returners -''' - - -def valid_jid(jid, returners, mminion): - ''' - Return boolean of wether this jid exists in any of the returners passed in - ''' - valid_jid = False - for returner in returners: - if mminion.returners['{0}.get_load'.format(returner)](jid) != {}: - valid_jid = True - break - return valid_jid From 3332c82ff9f6c38f28284894d980d08bfde11376 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 09:41:25 -0700 Subject: [PATCH 10/20] Add a multi_returner which will read/write from a list of returners --- salt/returners/multi_returner.py | 90 ++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 salt/returners/multi_returner.py diff --git a/salt/returners/multi_returner.py b/salt/returners/multi_returner.py new file mode 100644 index 0000000000..7a713bb757 --- /dev/null +++ b/salt/returners/multi_returner.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +''' +Read/Write multiple returners + +''' + +# Import python libs +import logging + +# Import salt libs +import salt.minion + +log = logging.getLogger(__name__) + +CONFIG_KEY = 'multi_returner' + +# cache of the master mininon for this returner +MMINION = None + + +def _mminion(): + ''' + Create a single mminion for this module to use, instead of reloading all the time + ''' + global MMINION + + if MMINION is None: + MMINION = salt.minion.MasterMinion(__opts__) + + return MMINION + + +def returner(load): + ''' + Write return to all returners in multi_returner + ''' + + for returner in __opts__[CONFIG_KEY]: + _mminion().returners['{0}.returner'.format(returner)](load) + + +def save_load(jid, clear_load): + ''' + Write load to all returners in multi_returner + ''' + for returner in __opts__[CONFIG_KEY]: + _mminion().returners['{0}.save_load'.format(returner)](jid, clear_load) + + +def get_load(jid): + ''' + Merge the load data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_load'.format(returner)](jid)) + + return ret + + +def get_jid(jid): + ''' + Merge the return data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_jid'.format(returner)](jid)) + + return ret + + +def get_jids(): + ''' + Return all job data from all returners + ''' + ret = {} + for returner in __opts__[CONFIG_KEY]: + ret.update(_mminion().returners['{0}.get_jids'.format(returner)]()) + + return ret + + +def clean_old_jobs(): + ''' + Clean out the old jobs from all returners (if you have it) + ''' + for returner in __opts__[CONFIG_KEY]: + fstr = '{0}.clean_old_jobs'.format(returner) + if fstr in _mminion().returners: + _mminion().returners[fstr]() From 0693e506c8ed6fcb6de9eed80732930f0c570168 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 10:10:46 -0700 Subject: [PATCH 11/20] pylint cleanup --- salt/client/ssh/__init__.py | 4 ++-- salt/daemons/masterapi.py | 4 ++-- salt/master.py | 4 ++-- salt/returners/multi_returner.py | 12 ++++++------ salt/utils/__init__.py | 3 +++ 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index b6fb3a4da7..d2f371bc35 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -424,8 +424,8 @@ class SSH(object): ''' Cache the job information ''' - self.mminion.returners['{0}.returner'.format(self.opts['master_job_cache'])]({'jid': jid, - 'id': id_, + self.mminion.returners['{0}.returner'.format(self.opts['master_job_cache'])]({'jid': jid, + 'id': id_, 'return': ret}) def run(self): diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 4b2a3f00d6..3e7d555cbc 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -1276,9 +1276,9 @@ class LocalFuncs(object): self.mminion.returners[fstr](load['jid'], load) except KeyError: log.critical( - 'The specified returner used for the external job cache ' + 'The specified returner used for the master job cache ' '"{0}" does not have a save_load function!'.format( - returner + self.opts['master_job_cache'] ) ) except Exception: diff --git a/salt/master.py b/salt/master.py index 10829f1dd4..c97cb9e9a2 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2509,9 +2509,9 @@ class ClearFuncs(object): self.mminion.returners[fstr](clear_load['jid'], clear_load) except KeyError: log.critical( - 'The specified returner used for the external job cache ' + 'The specified returner used for the master job cache ' '"{0}" does not have a save_load function!'.format( - returner + self.opts['master_job_cache'] ) ) except Exception: diff --git a/salt/returners/multi_returner.py b/salt/returners/multi_returner.py index 7a713bb757..6ea744314f 100644 --- a/salt/returners/multi_returner.py +++ b/salt/returners/multi_returner.py @@ -26,15 +26,15 @@ def _mminion(): if MMINION is None: MMINION = salt.minion.MasterMinion(__opts__) - + return MMINION - + def returner(load): ''' Write return to all returners in multi_returner ''' - + for returner in __opts__[CONFIG_KEY]: _mminion().returners['{0}.returner'.format(returner)](load) @@ -54,7 +54,7 @@ def get_load(jid): ret = {} for returner in __opts__[CONFIG_KEY]: ret.update(_mminion().returners['{0}.get_load'.format(returner)](jid)) - + return ret @@ -65,7 +65,7 @@ def get_jid(jid): ret = {} for returner in __opts__[CONFIG_KEY]: ret.update(_mminion().returners['{0}.get_jid'.format(returner)](jid)) - + return ret @@ -76,7 +76,7 @@ def get_jids(): ret = {} for returner in __opts__[CONFIG_KEY]: ret.update(_mminion().returners['{0}.get_jids'.format(returner)]()) - + return ret diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index 56f2be60b8..644c08ea54 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -573,12 +573,15 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False): ''' Return a job id and prepare the job id directory ''' + # TODO: re-enable + ''' salt.utils.warn_until( 'Boron', 'All job_cache management has been moved into the local_cache ' 'returner, this util function will be removed-- please use ' 'the returner' ) + ''' jid = gen_jid() jid_dir_ = jid_dir(jid, cachedir, sum_type) From c1c09d8e3c4a429b96f9e123f6f3fb31e207446c Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 10:34:13 -0700 Subject: [PATCH 12/20] Move prep_jid into the returner interface --- salt/client/ssh/__init__.py | 6 ++-- salt/daemons/masterapi.py | 13 +++------ salt/master.py | 13 +++------ salt/returners/local_cache.py | 55 +++++++++++++++++++++++++---------- salt/utils/__init__.py | 9 ++++-- 5 files changed, 56 insertions(+), 40 deletions(-) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index d2f371bc35..6e6c6f2c24 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -432,10 +432,8 @@ class SSH(object): ''' Execute the overall routine ''' - jid = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - self.opts['user']) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + jid = self.mminion.returners[fstr]() # Save the invocation information arg_str = self.opts['arg_str'] diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 3e7d555cbc..9ab151e90e 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -476,10 +476,8 @@ class RemoteFuncs(object): return False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) @@ -1231,11 +1229,8 @@ class LocalFuncs(object): } # Retrieve the jid if not load['jid']: - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, load['jid']) new_job_load = { diff --git a/salt/master.py b/salt/master.py index c97cb9e9a2..d01816ae70 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1249,10 +1249,8 @@ class AESFuncs(object): load['arg'] = load.get('arg', load.get('fun_args', [])) load['tgt_type'] = 'glob' load['tgt'] = load['id'] - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) new_loadp = load.get('nocache', True) and True log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event @@ -2465,11 +2463,8 @@ class ClearFuncs(object): } # Retrieve the jid if not clear_load['jid']: - clear_load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + clear_load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, clear_load['jid']) new_job_load = { diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index c1af27a6c9..62e9a42289 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -10,6 +10,7 @@ import logging import os import shutil import datetime +import hashlib # Import salt libs import salt.payload @@ -33,21 +34,18 @@ def _job_dir(): ''' return os.path.join(__opts__['cachedir'], 'jobs') - -def _jid_dir(jid, makedirs=False): +def _jid_dir(jid): ''' - Return the jid_dir, and optionally create it + Return the jid_dir for the given job id ''' - jid_dir = salt.utils.jid_dir( - jid, - os.path.join(__opts__['cachedir'], 'NEWPATHFORTESTING'), - __opts__['hash_type'] - ) - - if makedirs and not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - - return jid_dir + jid = str(jid) + jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest() + return os.path.join(__opts__['cachedir'], + # TODO: remove this string... + 'NEWPATHFORTESTING', + 'jobs', + jhash[:2], + jhash[2:]) def _walk_through(job_dir): @@ -82,12 +80,39 @@ def _format_jid_instance(jid, job): return ret +#TODO: add to returner docs-- this is a new one +def prep_jid(nocache=False): + ''' + Return a job id and prepare the job id directory + ''' + jid = salt.utils.gen_jid() + + jid_dir_ = _jid_dir(jid) + + # make sure we create the jid dir, otherwise someone else is using it, + # meaning we need a new jid + try: + os.makedirs(jid_dir_) + except OSError: + # TODO: some sort of sleep? + # or maybe add a random number to the end of the jid?? weird to spin + return prep_jid(nocache=nocache) + + with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_: + fn_.write(jid) + if nocache: + with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'w+') as fn_: + fn_.write('') + + return jid + + def returner(load): ''' Return data to the local job cache ''' serial = salt.payload.Serial(__opts__) - jid_dir = _jid_dir(load['jid'], makedirs=True) + jid_dir = _jid_dir(load['jid']) if os.path.exists(os.path.join(jid_dir, 'nocache')): return @@ -142,7 +167,7 @@ def save_load(jid, clear_load): ''' Save the load to the specified jid ''' - jid_dir = _jid_dir(clear_load['jid'], makedirs=True) + jid_dir = _jid_dir(clear_load['jid']) serial = salt.payload.Serial(__opts__) diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index 644c08ea54..743c190f1a 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -573,15 +573,12 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False): ''' Return a job id and prepare the job id directory ''' - # TODO: re-enable - ''' salt.utils.warn_until( 'Boron', 'All job_cache management has been moved into the local_cache ' 'returner, this util function will be removed-- please use ' 'the returner' ) - ''' jid = gen_jid() jid_dir_ = jid_dir(jid, cachedir, sum_type) @@ -605,6 +602,12 @@ def jid_dir(jid, cachedir, sum_type): ''' Return the jid_dir for the given job id ''' + salt.utils.warn_until( + 'Boron', + 'All job_cache management has been moved into the local_cache ' + 'returner, this util function will be removed-- please use ' + 'the returner' + ) jid = str(jid) jhash = getattr(hashlib, sum_type)(jid).hexdigest() return os.path.join(cachedir, 'jobs', jhash[:2], jhash[2:]) From a07e20d2ad779b609a9aa51147fcb615e6069761 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 10:34:13 -0700 Subject: [PATCH 13/20] Move prep_jid into the returner interface --- salt/client/ssh/__init__.py | 6 ++-- salt/daemons/masterapi.py | 13 +++----- salt/master.py | 13 +++----- salt/returners/local_cache.py | 57 ++++++++++++++++++++++++++--------- salt/utils/__init__.py | 9 ++++-- 5 files changed, 58 insertions(+), 40 deletions(-) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index d2f371bc35..6e6c6f2c24 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -432,10 +432,8 @@ class SSH(object): ''' Execute the overall routine ''' - jid = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - self.opts['user']) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + jid = self.mminion.returners[fstr]() # Save the invocation information arg_str = self.opts['arg_str'] diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 3e7d555cbc..9ab151e90e 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -476,10 +476,8 @@ class RemoteFuncs(object): return False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) @@ -1231,11 +1229,8 @@ class LocalFuncs(object): } # Retrieve the jid if not load['jid']: - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, load['jid']) new_job_load = { diff --git a/salt/master.py b/salt/master.py index c97cb9e9a2..d01816ae70 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1249,10 +1249,8 @@ class AESFuncs(object): load['arg'] = load.get('arg', load.get('fun_args', [])) load['tgt_type'] = 'glob' load['tgt'] = load['id'] - load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - load.get('nocache', False)) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) new_loadp = load.get('nocache', True) and True log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event @@ -2465,11 +2463,8 @@ class ClearFuncs(object): } # Retrieve the jid if not clear_load['jid']: - clear_load['jid'] = salt.utils.prep_jid( - self.opts['cachedir'], - self.opts['hash_type'], - extra.get('nocache', False) - ) + fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + clear_load['jid'] = self.mminion.returners[fstr](nocache=extra.get('nocache', False)) self.event.fire_event({'minions': minions}, clear_load['jid']) new_job_load = { diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index c1af27a6c9..abce6d658b 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -10,6 +10,7 @@ import logging import os import shutil import datetime +import hashlib # Import salt libs import salt.payload @@ -33,21 +34,18 @@ def _job_dir(): ''' return os.path.join(__opts__['cachedir'], 'jobs') - -def _jid_dir(jid, makedirs=False): +def _jid_dir(jid): ''' - Return the jid_dir, and optionally create it + Return the jid_dir for the given job id ''' - jid_dir = salt.utils.jid_dir( - jid, - os.path.join(__opts__['cachedir'], 'NEWPATHFORTESTING'), - __opts__['hash_type'] - ) - - if makedirs and not os.path.isdir(jid_dir): - os.makedirs(jid_dir) - - return jid_dir + jid = str(jid) + jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest() + return os.path.join(__opts__['cachedir'], + # TODO: remove this string... + 'NEWPATHFORTESTING', + 'jobs', + jhash[:2], + jhash[2:]) def _walk_through(job_dir): @@ -82,12 +80,41 @@ def _format_jid_instance(jid, job): return ret +#TODO: add to returner docs-- this is a new one +def prep_jid(nocache=False): + ''' + Return a job id and prepare the job id directory + This is the function responsible for making sure jids don't collide + So do what you have to do to make sure that stays the case + ''' + jid = salt.utils.gen_jid() + + jid_dir_ = _jid_dir(jid) + + # make sure we create the jid dir, otherwise someone else is using it, + # meaning we need a new jid + try: + os.makedirs(jid_dir_) + except OSError: + # TODO: some sort of sleep? + # or maybe add a random number to the end of the jid?? weird to spin + return prep_jid(nocache=nocache) + + with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_: + fn_.write(jid) + if nocache: + with salt.utils.fopen(os.path.join(jid_dir_, 'nocache'), 'w+') as fn_: + fn_.write('') + + return jid + + def returner(load): ''' Return data to the local job cache ''' serial = salt.payload.Serial(__opts__) - jid_dir = _jid_dir(load['jid'], makedirs=True) + jid_dir = _jid_dir(load['jid']) if os.path.exists(os.path.join(jid_dir, 'nocache')): return @@ -142,7 +169,7 @@ def save_load(jid, clear_load): ''' Save the load to the specified jid ''' - jid_dir = _jid_dir(clear_load['jid'], makedirs=True) + jid_dir = _jid_dir(clear_load['jid']) serial = salt.payload.Serial(__opts__) diff --git a/salt/utils/__init__.py b/salt/utils/__init__.py index 644c08ea54..743c190f1a 100644 --- a/salt/utils/__init__.py +++ b/salt/utils/__init__.py @@ -573,15 +573,12 @@ def prep_jid(cachedir, sum_type, user='root', nocache=False): ''' Return a job id and prepare the job id directory ''' - # TODO: re-enable - ''' salt.utils.warn_until( 'Boron', 'All job_cache management has been moved into the local_cache ' 'returner, this util function will be removed-- please use ' 'the returner' ) - ''' jid = gen_jid() jid_dir_ = jid_dir(jid, cachedir, sum_type) @@ -605,6 +602,12 @@ def jid_dir(jid, cachedir, sum_type): ''' Return the jid_dir for the given job id ''' + salt.utils.warn_until( + 'Boron', + 'All job_cache management has been moved into the local_cache ' + 'returner, this util function will be removed-- please use ' + 'the returner' + ) jid = str(jid) jhash = getattr(hashlib, sum_type)(jid).hexdigest() return os.path.join(cachedir, 'jobs', jhash[:2], jhash[2:]) From 52fe44d8bffa21e0ac8f054a99c09c2af34feae2 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 11:01:58 -0700 Subject: [PATCH 14/20] Add prep_jid to multi_returner --- salt/returners/local_cache.py | 6 +++--- salt/returners/multi_returner.py | 22 +++++++++++++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index abce6d658b..afe36989b6 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -34,6 +34,7 @@ def _job_dir(): ''' return os.path.join(__opts__['cachedir'], 'jobs') + def _jid_dir(jid): ''' Return the jid_dir for the given job id @@ -84,7 +85,7 @@ def _format_jid_instance(jid, job): def prep_jid(nocache=False): ''' Return a job id and prepare the job id directory - This is the function responsible for making sure jids don't collide + This is the function responsible for making sure jids don't collide (unless its passed a jid) So do what you have to do to make sure that stays the case ''' jid = salt.utils.gen_jid() @@ -96,8 +97,7 @@ def prep_jid(nocache=False): try: os.makedirs(jid_dir_) except OSError: - # TODO: some sort of sleep? - # or maybe add a random number to the end of the jid?? weird to spin + # TODO: some sort of sleep or something? Spinning is generally bad practice return prep_jid(nocache=nocache) with salt.utils.fopen(os.path.join(jid_dir_, 'jid'), 'w+') as fn_: diff --git a/salt/returners/multi_returner.py b/salt/returners/multi_returner.py index 6ea744314f..7c75ae8e41 100644 --- a/salt/returners/multi_returner.py +++ b/salt/returners/multi_returner.py @@ -30,11 +30,31 @@ def _mminion(): return MMINION +def prep_jid(nocache=False): + ''' + Call both with prep_jid on all returners in multi_returner + + TODO: finish this, what do do when you get different jids from 2 returners... + since our jids are time based, this make this problem hard, beacuse they + aren't unique, meaning that we have to make sure that no one else got the jid + and if they did we spin to get a new one, which means "locking" the jid in 2 + returners is non-trivial + ''' + + jid = None + for returner in __opts__[CONFIG_KEY]: + if jid is None: + jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache) + else: + r_jid = _mminion().returners['{0}.prep_jid'.format(returner)](nocache=nocache) + if r_jid != jid: + print 'Uhh.... crud the jids do not match' + return jid + def returner(load): ''' Write return to all returners in multi_returner ''' - for returner in __opts__[CONFIG_KEY]: _mminion().returners['{0}.returner'.format(returner)](load) From 342126da0137d8c0cabd5b710e4a34a8bd445e65 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 11:22:16 -0700 Subject: [PATCH 15/20] Make sure to save the load on minion returns from standalone jobs (e.g. salt-call) --- salt/master.py | 10 ++++++---- salt/returners/local_cache.py | 10 +++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/salt/master.py b/salt/master.py index d01816ae70..85a870cf9f 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1243,15 +1243,17 @@ class AESFuncs(object): return False if not salt.utils.verify.valid_id(self.opts, load['id']): return False - new_loadp = False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid load['arg'] = load.get('arg', load.get('fun_args', [])) load['tgt_type'] = 'glob' load['tgt'] = load['id'] - fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) - load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) - new_loadp = load.get('nocache', True) and True + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False)) + + # save the load, since we don't have it + saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[saveload_fstr](load['jid'], load) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event( diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index afe36989b6..0eec7a110d 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -32,7 +32,10 @@ def _job_dir(): ''' Return root of the jobs cache directory ''' - return os.path.join(__opts__['cachedir'], 'jobs') + return os.path.join(__opts__['cachedir'], + # TODO: remove this string + 'NEWPATHFORTESTING', + 'jobs') def _jid_dir(jid): @@ -41,10 +44,7 @@ def _jid_dir(jid): ''' jid = str(jid) jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest() - return os.path.join(__opts__['cachedir'], - # TODO: remove this string... - 'NEWPATHFORTESTING', - 'jobs', + return os.path.join(_job_dir(), jhash[:2], jhash[2:]) From b59400b99354812fdfac39bd2bf1199ae47d54d7 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 11:23:15 -0700 Subject: [PATCH 16/20] Appease pylint :) --- salt/returners/multi_returner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/salt/returners/multi_returner.py b/salt/returners/multi_returner.py index 7c75ae8e41..87434827e3 100644 --- a/salt/returners/multi_returner.py +++ b/salt/returners/multi_returner.py @@ -51,6 +51,7 @@ def prep_jid(nocache=False): print 'Uhh.... crud the jids do not match' return jid + def returner(load): ''' Write return to all returners in multi_returner From 1f8219dc31c3a21860970e3c18cc4543b6429adf Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 12:28:11 -0700 Subject: [PATCH 17/20] Save the load on the masterapi as well --- salt/daemons/masterapi.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 9ab151e90e..3c7c8eadc0 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -476,8 +476,12 @@ class RemoteFuncs(object): return False if load['jid'] == 'req': # The minion is returning a standalone job, request a jobid - fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) - load['jid'] = self.mminion.returners[fstr](nocache=load.get('nocache', False)) + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False)) + + # save the load, since we don't have it + saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[saveload_fstr](load['jid'], load) log.info('Got return from {id} for job {jid}'.format(**load)) self.event.fire_event(load, load['jid']) # old dup event self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) From df4dd4ec18084c0f4cbc633993d4616d84137957 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 13:33:25 -0700 Subject: [PATCH 18/20] Fix failing shell test case-- and change up jobs runner to use the "output" config instead of hard coding what the option should be. In addition i changed the display_output function to default the "out" to None, since that means "use config" down inside. --- salt/output/__init__.py | 2 +- salt/runners/jobs.py | 14 +++++--------- tests/integration/shell/call.py | 4 ++-- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/salt/output/__init__.py b/salt/output/__init__.py index 75964c9cee..776d652574 100644 --- a/salt/output/__init__.py +++ b/salt/output/__init__.py @@ -27,7 +27,7 @@ STATIC = ( ) -def display_output(data, out, opts=None): +def display_output(data, out=None, opts=None): ''' Print the passed data using the desired output ''' diff --git a/salt/runners/jobs.py b/salt/runners/jobs.py index 8c29e7a871..67517bdc08 100644 --- a/salt/runners/jobs.py +++ b/salt/runners/jobs.py @@ -46,7 +46,7 @@ def active(): if minion not in ret[jid]['Returned']: ret[jid]['Returned'].append(minion) - salt.output.display_output(ret, 'yaml', __opts__) + salt.output.display_output(ret, opts=__opts__) return ret @@ -64,7 +64,6 @@ def lookup_jid(jid, ext_source=None, output=True): mminion = salt.minion.MasterMinion(__opts__) returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) - out = 'nested' data = mminion.returners['{0}.get_jid'.format(returner)](jid) for minion in data: if u'return' in data[minion]: @@ -73,7 +72,7 @@ def lookup_jid(jid, ext_source=None, output=True): ret[minion] = data[minion].get('return') if 'out' in data[minion]: out = data[minion]['out'] - salt.output.display_output(ret, out, __opts__) + salt.output.display_output(ret, opts=__opts__) return ret @@ -91,11 +90,10 @@ def list_job(jid, ext_source=None): mminion = salt.minion.MasterMinion(__opts__) returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) - out = 'nested' job = mminion.returners['{0}.get_load'.format(returner)](jid) ret.update(_format_jid_instance(jid, job)) ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) + salt.output.display_output(ret, opts=__opts__) return ret @@ -112,9 +110,8 @@ def list_jobs(ext_source=None): returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) mminion = salt.minion.MasterMinion(__opts__) - out = 'nested' ret = mminion.returners['{0}.get_jids'.format(returner)]() - salt.output.display_output(ret, out, __opts__) + salt.output.display_output(ret, opts=__opts__) return ret @@ -134,11 +131,10 @@ def print_job(jid, ext_source=None): returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache'])) mminion = salt.minion.MasterMinion(__opts__) - out = 'nested' job = mminion.returners['{0}.get_load'.format(returner)](jid) ret[jid] = _format_jid_instance(jid, job) ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid) - salt.output.display_output(ret, out, __opts__) + salt.output.display_output(ret, opts=__opts__) return ret diff --git a/tests/integration/shell/call.py b/tests/integration/shell/call.py index bda7d9e2dc..65dc54d76b 100644 --- a/tests/integration/shell/call.py +++ b/tests/integration/shell/call.py @@ -116,9 +116,9 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn): if 'returnTOmaster' in j][0] jid, idx = None, first_match[0] while idx > 0: - jid = re.match("('|\")([0-9]+)('|\"):", jobs[idx]) + jid = re.match("([0-9]+):", jobs[idx]) if jid: - jid = jid.group(2) + jid = jid.group(1) break idx -= 1 assert idx > 0 From 05f0969ee8b9374c2fe5bce2c753fb4619432f0d Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 15:59:13 -0700 Subject: [PATCH 19/20] Fix the output now that we are using the default output (nested) instead of hard coding it to yaml --- tests/integration/runners/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/runners/jobs.py b/tests/integration/runners/jobs.py index 4aacb9bbe1..2356dda045 100644 --- a/tests/integration/runners/jobs.py +++ b/tests/integration/runners/jobs.py @@ -22,7 +22,7 @@ class ManageTest(integration.ShellCase): ''' ret = self.run_run_plus('jobs.active') self.assertEqual(ret['fun'], {}) - self.assertEqual(ret['out'], ['{}']) + self.assertEqual(ret['out'], []) def test_lookup_jid(self): ''' From f9725066caa67028fc8929c57b4e2009b006a54f Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 19 Apr 2014 18:50:45 -0700 Subject: [PATCH 20/20] Remove new path naming, tests pass now :) --- salt/returners/local_cache.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/salt/returners/local_cache.py b/salt/returners/local_cache.py index 0eec7a110d..978b8524cd 100644 --- a/salt/returners/local_cache.py +++ b/salt/returners/local_cache.py @@ -33,8 +33,6 @@ def _job_dir(): Return root of the jobs cache directory ''' return os.path.join(__opts__['cachedir'], - # TODO: remove this string - 'NEWPATHFORTESTING', 'jobs')