Updated elasticsearch return to address 3 things:

- fix: load.return multiple types issue in salt-master-job-cache index
- fix: [WARNING ] Returner unavailable: expected string or buffer message returned by get_load function
- feature: add date to salt-master-job-cache index if index_date is True
This commit is contained in:
Stephane OLIVIER 2018-10-11 10:42:17 +02:00
parent 8aa2b90813
commit db0135e297

View File

@ -226,7 +226,7 @@ def returner(ret):
) )
return return
if ret.get('return', None) is None: if ret.get('data', None) is None:
log.info( log.info(
'Won\'t push new data to Elasticsearch, job with jid=%s was ' 'Won\'t push new data to Elasticsearch, job with jid=%s was '
'not successful', job_id 'not successful', job_id
@ -254,23 +254,23 @@ def returner(ret):
'failed': 0, 'failed': 0,
} }
# Prepend each state execution key in ret['return'] with a zero-padded # Prepend each state execution key in ret['data'] with a zero-padded
# version of the '__run_num__' field allowing the states to be ordered # version of the '__run_num__' field allowing the states to be ordered
# more easily. Change the index to be # more easily. Change the index to be
# index to be '<index>-ordered' so as not to clash with the unsorted # index to be '<index>-ordered' so as not to clash with the unsorted
# index data format # index data format
if options['states_order_output'] and isinstance(ret['return'], dict): if options['states_order_output'] and isinstance(ret['data'], dict):
index = '{0}-ordered'.format(index) index = '{0}-ordered'.format(index)
max_chars = len(six.text_type(len(ret['return']))) max_chars = len(six.text_type(len(ret['data'])))
for uid, data in six.iteritems(ret['return']): for uid, data in six.iteritems(ret['data']):
# Skip keys we've already prefixed # Skip keys we've already prefixed
if uid.startswith(tuple('0123456789')): if uid.startswith(tuple('0123456789')):
continue continue
# Store the function being called as it's a useful key to search # Store the function being called as it's a useful key to search
decoded_uid = uid.split('_|-') decoded_uid = uid.split('_|-')
ret['return'][uid]['_func'] = '{0}.{1}'.format( ret['data'][uid]['_func'] = '{0}.{1}'.format(
decoded_uid[0], decoded_uid[0],
decoded_uid[-1] decoded_uid[-1]
) )
@ -281,17 +281,17 @@ def returner(ret):
uid, uid,
) )
ret['return'][new_uid] = ret['return'].pop(uid) ret['data'][new_uid] = ret['data'].pop(uid)
# Catch a state output that has failed and where the error message is # Catch a state output that has failed and where the error message is
# not in a dict as expected. This prevents elasticsearch from # not in a dict as expected. This prevents elasticsearch from
# complaining about a mapping error # complaining about a mapping error
elif not isinstance(ret['return'], dict): elif not isinstance(ret['data'], dict):
ret['return'] = {'return': ret['return']} ret['data'] = { job_fun_escaped: { 'return': ret['data'] } }
# Need to count state successes and failures # Need to count state successes and failures
if options['states_count']: if options['states_count']:
for state_data in ret['return'].values(): for state_data in ret['data'].values():
if state_data['result'] is False: if state_data['result'] is False:
counts['failed'] += 1 counts['failed'] += 1
else: else:
@ -320,7 +320,7 @@ def returner(ret):
'fun': job_fun, 'fun': job_fun,
'jid': job_id, 'jid': job_id,
'counts': counts, 'counts': counts,
'data': _convert_keys(ret['return']) 'data': _convert_keys(ret['data'])
} }
if options['debug_returner_payload']: if options['debug_returner_payload']:
@ -379,8 +379,21 @@ def save_load(jid, load, minions=None):
index = options['master_job_cache_index'] index = options['master_job_cache_index']
doc_type = options['master_job_cache_doc_type'] doc_type = options['master_job_cache_doc_type']
if options['index_date']:
index = '{0}-{1}'.format(index,
datetime.date.today().strftime('%Y.%m.%d'))
_ensure_index(index) _ensure_index(index)
# addressing multiple types (bool, string, dict, ...) issue in master_job_cache index for return key (#20826)
if not load.get('return', None) is None:
# if load.return is not a dict, moving the result to load.return.<job_fun_escaped>.return
if not isinstance(load['return'], dict):
job_fun_escaped = load['fun'].replace('.', '_')
load['return'] = { job_fun_escaped: { 'return': load['return'] } }
# rename load.return to load.data in order to have the same key in all indices (master_job_cache, job)
load['data'] = load.pop('return')
data = { data = {
'jid': jid, 'jid': jid,
'load': load, 'load': load,
@ -403,9 +416,14 @@ def get_load(jid):
index = options['master_job_cache_index'] index = options['master_job_cache_index']
doc_type = options['master_job_cache_doc_type'] doc_type = options['master_job_cache_doc_type']
if options['index_date']:
index = '{0}-{1}'.format(index,
datetime.date.today().strftime('%Y.%m.%d'))
data = __salt__['elasticsearch.document_get'](index=index, data = __salt__['elasticsearch.document_get'](index=index,
id=jid, id=jid,
doc_type=doc_type) doc_type=doc_type)
if data: if data:
return salt.utils.json.loads(data) # Use salt.utils.json.dumps to convert elasticsearch unicode json to standard json
return salt.utils.json.loads(salt.utils.json.dumps(data))
return {} return {}