Merge pull request #16521 from cachedout/gen_jid

Add prep_jid to returners
This commit is contained in:
Thomas S Hatch 2014-10-17 10:33:38 -06:00
commit f1bebf20da
18 changed files with 258 additions and 59 deletions

View File

@ -40,6 +40,9 @@ import socket
import struct
import time
# Import salt libs
import salt.utils
log = logging.getLogger(__name__)
# Define the module's virtual name
@ -67,8 +70,8 @@ def _carbon(host, port):
socket.IPPROTO_TCP)
carbon_sock.connect((host, port))
except socket.error as e:
log.error('Error connecting to {0}:{1}, {2}'.format(host, port, e))
except socket.error as err:
log.error('Error connecting to {0}:{1}, {2}'.format(host, port, err))
raise
else:
log.debug('Connected to carbon')
@ -203,3 +206,10 @@ def returner(ret):
log.debug('Sent {0} bytes to carbon'.format(sent_bytes))
total_sent_bytes += sent_bytes
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''
return salt.utils.gen_jid()

View File

@ -22,6 +22,9 @@ Required python modules: pycassa
# Import python libs
import logging
# Import salt libs
import salt.utils
# Import third party libs
try:
import pycassa
@ -69,3 +72,11 @@ def returner(ret):
log.debug(columns)
ccf.insert(ret['jid'], columns)
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any work necessary to prepare the jid for storage,
including returning a custom jid
'''
return salt.utils.gen_jid()

View File

@ -93,21 +93,21 @@ def _verify_views():
if VERIFIED_VIEWS or __opts__.get('couchbase.skip_verify_views', False):
return
cb = _get_connection()
cb_ = _get_connection()
ddoc = {'views': {'jids': {'map': "function (doc, meta) { if (meta.id.indexOf('/') === -1 && doc.load){ emit(meta.id, null) } }"},
'jid_returns': {'map': "function (doc, meta) { if (meta.id.indexOf('/') > -1){ key_parts = meta.id.split('/'); emit(key_parts[0], key_parts[1]); } }"}
}
}
try:
curr_ddoc = cb.design_get(DESIGN_NAME, use_devmode=False).value
curr_ddoc = cb_.design_get(DESIGN_NAME, use_devmode=False).value
if curr_ddoc['views'] == ddoc['views']:
VERIFIED_VIEWS = True
return
except couchbase.exceptions.HTTPError:
pass
cb.design_create(DESIGN_NAME, ddoc, use_devmode=False)
cb_.design_create(DESIGN_NAME, ddoc, use_devmode=False)
VERIFIED_VIEWS = True
@ -125,11 +125,11 @@ def prep_jid(nocache=False):
This is the function responsible for making sure jids don't collide (unless its passed a jid)
So do what you have to do to make sure that stays the case
'''
cb = _get_connection()
cb_ = _get_connection()
jid = salt.utils.gen_jid()
try:
cb.add(str(jid),
cb_.add(str(jid),
{'nocache': nocache},
ttl=_get_ttl(),
)
@ -143,9 +143,9 @@ def returner(load):
'''
Return data to the local job cache
'''
cb = _get_connection()
cb_ = _get_connection()
try:
jid_doc = cb.get(load['jid'])
jid_doc = cb_.get(load['jid'])
if jid_doc.value['nocache'] is True:
return
except couchbase.exceptions.NotFoundError:
@ -161,7 +161,7 @@ def returner(load):
if 'out' in load:
ret_doc['out'] = load['out']
cb.add(hn_key,
cb_.add(hn_key,
ret_doc,
ttl=_get_ttl(),
)
@ -179,10 +179,10 @@ def save_load(jid, clear_load):
'''
Save the load to the specified jid
'''
cb = _get_connection()
cb_ = _get_connection()
try:
jid_doc = cb.get(str(jid))
jid_doc = cb_.get(str(jid))
except couchbase.exceptions.NotFoundError:
log.warning('Could not write job cache file for jid: {0}'.format(jid))
return False
@ -200,7 +200,7 @@ def save_load(jid, clear_load):
jid_doc.value['load'] = clear_load
cb.replace(str(jid),
cb_.replace(str(jid),
jid_doc.value,
cas=jid_doc.cas,
ttl=_get_ttl()
@ -211,10 +211,10 @@ def get_load(jid):
'''
Return the load data that marks a specified jid
'''
cb = _get_connection()
cb_ = _get_connection()
try:
jid_doc = cb.get(str(jid))
jid_doc = cb_.get(str(jid))
except couchbase.exceptions.NotFoundError:
return {}
@ -229,12 +229,12 @@ def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
cb = _get_connection()
cb_ = _get_connection()
_verify_views()
ret = {}
for result in cb.query(DESIGN_NAME, 'jid_returns', key=str(jid), include_docs=True):
for result in cb_.query(DESIGN_NAME, 'jid_returns', key=str(jid), include_docs=True):
ret[result.value] = result.doc.value
return ret
@ -244,18 +244,21 @@ def get_jids():
'''
Return a list of all job ids
'''
cb = _get_connection()
cb_ = _get_connection()
_verify_views()
ret = {}
for result in cb.query(DESIGN_NAME, 'jids', include_docs=True):
for result in cb_.query(DESIGN_NAME, 'jids', include_docs=True):
ret[result.key] = _format_jid_instance(result.key, result.doc.value['load'])
return ret
def _format_job_instance(job):
'''
Return a properly formatted job dict
'''
return {'Function': job.get('fun', 'unknown-function'),
'Arguments': list(job.get('arg', [])),
# unlikely but safeguard from invalid returns
@ -265,6 +268,9 @@ def _format_job_instance(job):
def _format_jid_instance(jid, job):
'''
Return a properly formated jid dict
'''
ret = _format_job_instance(job)
ret.update({'StartTime': salt.utils.jid_to_time(jid)})
return ret

View File

@ -10,11 +10,15 @@ couchdb.url: 'http://salt:5984/'
salt '*' test.ping --return couchdb
'''
# Import Python libs
import logging
import time
import urllib2
import json
# Import Salt libs
import salt.utils
log = logging.getLogger(__name__)
# Define the module's virtual name
@ -306,3 +310,10 @@ def set_salt_view():
.format(_response['error']))
return False
return True
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any necessary pre-processing and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -30,8 +30,12 @@ In order to have the returner apply to all minions:
ext_job_cache: elasticsearch
'''
# Import Python libs
import datetime
# Import Salt libs
import salt.utils
__virtualname__ = 'elasticsearch'
try:
@ -48,7 +52,9 @@ except ImportError:
def _create_index(client, index):
# create empty index
'''
Create empty index
'''
client.indices.create(
index=index,
body={
@ -92,20 +98,36 @@ def __virtual__():
def _get_pickler():
'''
Return a picker instance
'''
return Pickler(max_depth=5)
def _get_instance():
'''
Return the elasticsearch instance
'''
return elasticsearch.Elasticsearch([__salt__['config.get']('elasticsearch:host')])
def returner(ret):
es = _get_instance()
_create_index(es, __salt__['config.get']('elasticsearch:index'))
r = ret
'''
Process the return from Salt
'''
es_ = _get_instance()
_create_index(es_, __salt__['config.get']('elasticsearch:index'))
the_time = datetime.datetime.now().isoformat()
r['@timestamp'] = the_time
es.index(index=__salt__['config.get']('elasticsearch:index'),
ret['@timestamp'] = the_time
es_.index(index=__salt__['config.get']('elasticsearch:index'),
doc_type='returner',
body=_get_pickler().flatten(r),
body=_get_pickler().flatten(ret),
)
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Prepare the jid, including doing any pre-processing and
returning the jid to use
'''
return salt.utils.gen_jid()

View File

@ -52,6 +52,8 @@ try:
except ImportError:
HAS_LIBS = False
import salt.utils
log = logging.getLogger(__name__)
# Define the module's virtual name
@ -164,3 +166,10 @@ def get_minions():
comps = str(item.key).split('/')
ret.append(comps[-1])
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Pre-process the JID and return the JID to use
'''
return salt.utils.gen_jid()

View File

@ -48,6 +48,9 @@ def _jid_dir(jid):
def _walk_through(job_dir):
'''
Walk though the jid dir and look for jobs
'''
serial = salt.payload.Serial(__opts__)
for top in os.listdir(job_dir):
@ -65,6 +68,9 @@ def _walk_through(job_dir):
def _format_job_instance(job):
'''
Format the job instance correctly
'''
return {'Function': job.get('fun', 'unknown-function'),
'Arguments': list(job.get('arg', [])),
# unlikely but safeguard from invalid returns
@ -74,6 +80,9 @@ def _format_job_instance(job):
def _format_jid_instance(jid, job):
'''
Format the jid correctly
'''
ret = _format_job_instance(job)
ret.update({'StartTime': salt.utils.jid_to_time(jid)})
return ret
@ -125,8 +134,8 @@ def returner(load):
try:
os.mkdir(hn_dir)
except OSError as e:
if e.errno == errno.EEXIST:
except OSError as err:
if err.errno == errno.EEXIST:
# Minion has already returned this jid and it should be dropped
log.error(
'An extra return was detected from minion {0}, please verify '
@ -135,7 +144,7 @@ def returner(load):
)
)
return False
elif e.errno == errno.ENOENT:
elif err.errno == errno.ENOENT:
log.error(
'An inconsistency occurred, a job was received with a job id '
'that is not present in the local cache: {jid}'.format(**load)
@ -253,7 +262,7 @@ def get_jids():
Return a list of all job ids
'''
ret = {}
for jid, job, t_path, final in _walk_through(_job_dir()):
for jid, job, _, _ in _walk_through(_job_dir()):
ret[jid] = _format_jid_instance(jid, job)
return ret
@ -294,7 +303,7 @@ def clean_old_jobs():
int(jid[6:8]),
int(jid[8:10]),
int(jid[10:12]))
except ValueError as e:
except ValueError:
# Invalid jid, scrub the dir
shutil.rmtree(f_path)
difference = cur - jidtime

View File

@ -15,6 +15,7 @@ python2-memcache uses 'localhost' and '11211' as syntax on connection.
# Import python libs
import json
import logging
import salt.utils
log = logging.getLogger(__name__)
@ -51,7 +52,7 @@ def _get_serv():
log.error('Host or port not defined in salt config')
return
#Combine host and port to conform syntax of python memcache client
memcacheoptions = (host, port)
memcacheoptions = (host, str(port))
return memcache.Client([':'.join(memcacheoptions)], debug=0)
## TODO: make memcacheoptions cluster aware
@ -61,6 +62,13 @@ def _get_serv():
# an integer weight value.
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Pre-process the jid and return the jid to use
'''
return salt.utils.gen_jid()
def returner(ret):
'''
Return data to a memcache data store

View File

@ -26,6 +26,9 @@ in the future and should not be considered API stable yet.
# Import python libs
import logging
# Import Salt libs
import salt.utils
# Import third party libs
try:
import pymongo
@ -46,6 +49,9 @@ def __virtual__():
def _remove_dots(src):
'''
Remove the dots from the given data structure
'''
output = {}
for key, val in src.iteritems():
if isinstance(val, dict):
@ -61,19 +67,19 @@ def _get_conn():
if 'config.option' in __salt__:
host = __salt__['config.option']('mongo.host')
port = __salt__['config.option']('mongo.port')
db = __salt__['config.option']('mongo.db')
db_ = __salt__['config.option']('mongo.db')
user = __salt__['config.option']('mongo.user')
password = __salt__['config.option']('mongo.password')
else:
cfg = __opts__
host = cfg.get('mongo.host', None)
port = cfg.get('mongo.port', None)
db = cfg.get('mongo.db', None)
db_ = cfg.get('mongo.db', None)
user = cfg.get('mongo.user', None)
password = cfg.get('mongo.password', None)
conn = pymongo.Connection(host, port)
mdb = conn[db]
mdb = conn[db_]
if user and password:
mdb.authenticate(user, password)
@ -84,7 +90,7 @@ def returner(ret):
'''
Return data to a mongodb server
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
col = mdb[ret['id']]
if isinstance(ret['return'], dict):
@ -103,7 +109,7 @@ def save_load(jid, load):
'''
Save the load for a given job id
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
col = mdb[jid]
col.insert(load)
@ -112,7 +118,7 @@ def get_load(jid):
'''
Return the load associated with a given job id
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
return mdb[jid].find_one()
@ -120,7 +126,7 @@ def get_jid(jid):
'''
Return the return information associated with a jid
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = {}
for collection in mdb.collection_names():
rdata = mdb[collection].find_one({jid: {'$exists': 'true'}})
@ -133,7 +139,7 @@ def get_fun(fun):
'''
Return the most recent jobs that have executed the named function
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = {}
for collection in mdb.collection_names():
rdata = mdb[collection].find_one({'fun': fun})
@ -146,7 +152,7 @@ def get_minions():
'''
Return a list of minions
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = []
for name in mdb.collection_names():
if len(name) == 20:
@ -163,7 +169,7 @@ def get_jids():
'''
Return a list of job ids
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = []
for name in mdb.collection_names():
if len(name) == 20:
@ -173,3 +179,10 @@ def get_jids():
except ValueError:
pass
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Pre-process the jid and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -23,6 +23,9 @@ to the minion config files::
# Import python libs
import logging
# import Salt libs
import salt.utils
# Import third party libs
try:
import pymongo
@ -41,6 +44,9 @@ def __virtual__():
def _remove_dots(src):
'''
Remove dots from the given data structure
'''
output = {}
for key, val in src.iteritems():
if isinstance(val, dict):
@ -56,19 +62,19 @@ def _get_conn():
if 'config.option' in __salt__:
host = __salt__['config.option']('mongo.host')
port = __salt__['config.option']('mongo.port')
db = __salt__['config.option']('mongo.db')
db_ = __salt__['config.option']('mongo.db')
user = __salt__['config.option']('mongo.user')
password = __salt__['config.option']('mongo.password')
else:
cfg = __opts__
host = cfg.get('mongo.host', None)
port = cfg.get('mongo.port', None)
db = cfg.get('mongo.db', None)
db_ = cfg.get('mongo.db', None)
user = cfg.get('mongo.user', None)
password = cfg.get('mongo.password', None)
conn = pymongo.Connection(host, port)
mdb = conn[db]
mdb = conn[db_]
if user and password:
mdb.authenticate(user, password)
@ -79,7 +85,7 @@ def returner(ret):
'''
Return data to a mongodb server
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
col = mdb[ret['id']]
if isinstance(ret['return'], dict):
@ -98,7 +104,7 @@ def get_jid(jid):
'''
Return the return information associated with a jid
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = {}
for collection in mdb.collection_names():
rdata = mdb[collection].find_one({jid: {'$exists': 'true'}})
@ -111,10 +117,17 @@ def get_fun(fun):
'''
Return the most recent jobs that have executed the named function
'''
conn, mdb = _get_conn()
_, mdb = _get_conn()
ret = {}
for collection in mdb.collection_names():
rdata = mdb[collection].find_one({'fun': fun})
if rdata:
ret[collection] = rdata
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Pre-process the jid and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -119,7 +119,11 @@ def _get_serv(commit=False):
Return a mysql cursor
'''
_options = _get_options()
conn = MySQLdb.connect(host=_options['host'], user=_options['user'], passwd=_options['pass'], db=_options['db'], port=_options['port'])
conn = MySQLdb.connect(host=_options['host'],
user=_options['user'],
passwd=_options['pass'],
db=_options['db'],
port=_options['port'])
cursor = conn.cursor()
try:
yield cursor
@ -147,8 +151,10 @@ def returner(ret):
VALUES (%s, %s, %s, %s, %s, %s)'''
cur.execute(sql, (ret['fun'], ret['jid'],
json.dumps(ret['return']), ret['id'],
ret['success'], json.dumps(ret)))
json.dumps(ret['return']),
ret['id'],
ret['success'],
json.dumps(ret)))
def save_load(jid, load):
@ -217,7 +223,7 @@ def get_fun(fun):
ret = {}
if data:
for minion, jid, full_ret in data:
for minion, _, full_ret in data:
ret[minion] = json.loads(full_ret)
return ret

View File

@ -101,6 +101,9 @@ correctly. Replace with equivalent SQL for other ODBC-compliant servers::
# Import python libs
import json
# Import Salt libs
import salt.utils
# FIXME We'll need to handle this differently for Windows.
# Import third party libs
try:
@ -135,6 +138,9 @@ def _get_conn():
def _close_conn(conn):
'''
Close the MySQL connection
'''
conn.commit()
conn.close()
@ -225,7 +231,7 @@ def get_fun(fun):
ret = {}
if data:
for minion, jid, retval in data:
for minion, _, retval in data:
ret[minion] = json.loads(retval)
_close_conn(conn)
return ret
@ -263,3 +269,10 @@ def get_minions():
ret.append(minion[0])
_close_conn(conn)
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any jid pre-processing and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -66,6 +66,9 @@ Required python modules: psycopg2
# Import python libs
import json
# Import Salt libs
import salt.utils
# Import third party libs
try:
import psycopg2
@ -103,6 +106,9 @@ def _get_conn():
def _close_conn(conn):
'''
Close the Postgres connection
'''
conn.commit()
conn.close()
@ -192,7 +198,7 @@ def get_fun(fun):
ret = {}
if data:
for minion, jid, full_ret in data:
for minion, _, full_ret in data:
ret[minion] = json.loads(full_ret)
_close_conn(conn)
return ret
@ -230,3 +236,10 @@ def get_minions():
ret.append(minion[0])
_close_conn(conn)
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any pre-processing necessary and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -18,6 +18,9 @@ config, these are the defaults:
# Import python libs
import json
# Import Salt libs
import salt.utils
# Import third party libs
try:
import redis
@ -128,3 +131,10 @@ def get_minions():
'''
serv = _get_serv()
return list(serv.smembers('minions'))
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any pre-processing necessary and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -24,8 +24,12 @@ The tags list (optional) specifies grains items that will be used as sentry tags
in the sentry ui.
'''
# Import Python libs
import logging
# Import Salt libs
import salt.utils
try:
from raven import Client
has_raven = True
@ -50,6 +54,9 @@ def returner(ret):
messages will be reported at info level.
'''
def connect_sentry(message, result):
'''
Connect to the Sentry server
'''
pillar_data = __salt__['pillar.raw']()
grains = __salt__['grains.items']()
sentry_data = {
@ -102,3 +109,10 @@ def returner(ret):
'Can\'t run connect_sentry: {0}'.format(err),
exc_info=True
)
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any necessary pre-processing and then return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -51,6 +51,9 @@ import logging
import smtplib
from email.utils import formatdate
# Import Salt libs
import salt.utils
try:
import gnupg
HAS_GNUPG = True
@ -105,10 +108,10 @@ def returner(ret):
log.debug("smtp_return: Subject is '{0}'".format(subject))
content = ('id: {0}\r\n'
'function: {1}\r\n'
'function args: {2}\r\n'
'jid: {3}\r\n'
'return: {4}\r\n').format(
'function: {1}\r\n'
'function args: {2}\r\n'
'jid: {3}\r\n'
'return: {4}\r\n').format(
ret.get('id'),
ret.get('fun'),
ret.get('fun_args'),
@ -123,7 +126,8 @@ def returner(ret):
content = str(encrypted_data)
else:
log.error('smtp_return: Encryption failed, only an error message will be sent')
content = 'Encryption failed, the return data was not sent.\r\n\r\n{0}\r\n{1}'.format(encrypted_data.status, encrypted_data.stderr)
content = 'Encryption failed, the return data was not sent.\r\n\r\n{0}\r\n{1}'.format(
encrypted_data.status, encrypted_data.stderr)
message = ('From: {0}\r\n'
'To: {1}\r\n'
@ -147,3 +151,10 @@ def returner(ret):
server.sendmail(from_addr, to_addrs, message)
log.debug('smtp_return: Message sent.')
server.quit()
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any necessary pre-processing and return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -54,6 +54,9 @@ import logging
import json
import datetime
# Import Salt libs
import salt.utils
# Better safe than sorry here. Even though sqlite3 is included in python
try:
import sqlite3
@ -242,3 +245,10 @@ def get_minions():
ret.append(minion[0])
_close_conn(conn)
return ret
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any necessary pre-processing and then return the jid to use
'''
return salt.utils.gen_jid()

View File

@ -21,6 +21,9 @@ try:
except ImportError:
HAS_SYSLOG = False
# Import Salt libs
import salt.utils
# Define the module's virtual name
__virtualname__ = 'syslog'
@ -36,3 +39,10 @@ def returner(ret):
Return data to the local syslog
'''
syslog.syslog(syslog.LOG_INFO, 'salt-minion: {0}'.format(json.dumps(ret)))
def prep_jid(nocache): # pylint: disable=unused-argument
'''
Do any necessary pre-preocessing and then return the jid to use
'''
return salt.utils.gen_jid()