Merge pull request #34476 from kriberg/update-postgres-returner

Updated postgres returner to support new API
This commit is contained in:
Mike Place 2016-07-27 11:29:25 -06:00 committed by GitHub
commit e28d2bc5ed

View File

@ -57,17 +57,37 @@ correctly:
DROP TABLE IF EXISTS salt_returns;
CREATE TABLE salt_returns (
added TIMESTAMP WITH TIME ZONE DEFAULT now(),
fun text NOT NULL,
jid varchar(20) NOT NULL,
fun varchar(50) NOT NULL,
jid varchar(255) NOT NULL,
return text NOT NULL,
id text NOT NULL,
success boolean
full_ret text,
id varchar(255) NOT NULL,
success varchar(10) NOT NULL,
alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX ON salt_returns (added);
CREATE INDEX ON salt_returns (id);
CREATE INDEX ON salt_returns (jid);
CREATE INDEX ON salt_returns (fun);
CREATE INDEX idx_salt_returns_id ON salt_returns (id);
CREATE INDEX idx_salt_returns_jid ON salt_returns (jid);
CREATE INDEX idx_salt_returns_fun ON salt_returns (fun);
CREATE INDEX idx_salt_returns_updated ON salt_returns (alter_time);
--
-- Table structure for table `salt_events`
--
DROP TABLE IF EXISTS salt_events;
DROP SEQUENCE IF EXISTS seq_salt_events_id;
CREATE SEQUENCE seq_salt_events_id;
CREATE TABLE salt_events (
id BIGINT NOT NULL UNIQUE DEFAULT nextval('seq_salt_events_id'),
tag varchar(255) NOT NULL,
data text NOT NULL,
alter_time TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
master_id varchar(255) NOT NULL
);
CREATE INDEX idx_salt_events_tag on salt_events (tag);
EOF
Required python modules: psycopg2
@ -101,10 +121,15 @@ from __future__ import absolute_import
# Import python libs
import json
import sys
import time
import logging
from contextlib import contextmanager
# Import Salt libs
import salt.utils.jid
import salt.returners
import salt.exceptions
# Import third party libs
try:
@ -115,6 +140,8 @@ except ImportError:
__virtualname__ = 'postgres'
log = logging.getLogger(__name__)
def __virtual__():
if not HAS_POSTGRES:
@ -126,6 +153,12 @@ def _get_options(ret=None):
'''
Get the postgres options from salt.
'''
defaults = {'host': 'localhost',
'user': 'salt',
'passwd': 'salt',
'db': 'salt',
'port': 5432}
attrs = {'host': 'host',
'user': 'user',
'passwd': 'passwd',
@ -136,69 +169,105 @@ def _get_options(ret=None):
ret,
attrs,
__salt__=__salt__,
__opts__=__opts__)
__opts__=__opts__,
defaults=defaults)
# Ensure port is an int
if 'port' in _options:
_options['port'] = int(_options['port'])
return _options
def _get_conn(ret=None):
@contextmanager
def _get_serv(ret=None, commit=False):
'''
Return a postgres connection.
Return a Pg cursor
'''
_options = _get_options(ret)
try:
conn = psycopg2.connect(host=_options.get('host'),
user=_options.get('user'),
password=_options.get('passwd'),
database=_options.get('db'),
port=_options.get('port'))
host = _options.get('host')
user = _options.get('user')
passwd = _options.get('passwd')
datab = _options.get('db')
port = _options.get('port')
except psycopg2.OperationalError as exc:
raise salt.exceptions.SaltMasterError('postgres returner could not connect to database: {exc}'.format(exc=exc))
return psycopg2.connect(
host=host,
user=user,
password=passwd,
database=datab,
port=port)
cursor = conn.cursor()
def _close_conn(conn):
'''
Close the Postgres connection
'''
conn.commit()
conn.close()
try:
yield cursor
except psycopg2.DatabaseError as err:
error = err.args
sys.stderr.write(str(error))
cursor.execute("ROLLBACK")
raise err
else:
if commit:
cursor.execute("COMMIT")
else:
cursor.execute("ROLLBACK")
finally:
conn.close()
def returner(ret):
'''
Return data to a postgres server
'''
conn = _get_conn(ret)
cur = conn.cursor()
sql = '''INSERT INTO salt_returns
(fun, jid, return, id, success)
VALUES (%s, %s, %s, %s, %s)'''
cur.execute(
sql, (
ret['fun'],
ret['jid'],
json.dumps(ret['return']),
ret['id'],
ret['success']
)
)
_close_conn(conn)
try:
with _get_serv(ret, commit=True) as cur:
sql = '''INSERT INTO salt_returns
(fun, jid, return, id, success, full_ret)
VALUES (%s, %s, %s, %s, %s, %s)'''
cur.execute(
sql, (
ret['fun'],
ret['jid'],
json.dumps(ret['return']),
ret['id'],
ret.get('success', False),
json.dumps(ret)))
except salt.exceptions.SaltMasterError:
log.critical('Could not store return with postgres returner. PostgreSQL server unavailable.')
def save_load(jid, load, minions=None):
def event_return(events):
'''
Return event to Pg server
Requires that configuration be enabled via 'event_return'
option in master config.
'''
with _get_serv(events, commit=True) as cur:
for event in events:
tag = event.get('tag', '')
data = event.get('data', '')
sql = '''INSERT INTO salt_events (tag, data, master_id)
VALUES (%s, %s, %s)'''
cur.execute(sql, (tag,
json.dumps(data),
__opts__['id']))
def save_load(jid, load, minions=None): # pylint: disable=unused-argument
'''
Save the load to the specified jid id
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''INSERT INTO jids (jid, load) VALUES (%s, %s)'''
with _get_serv(commit=True) as cur:
cur.execute(sql, (jid, json.dumps(load)))
_close_conn(conn)
sql = '''INSERT INTO jids
(jid, load)
VALUES (%s, %s)'''
try:
cur.execute(sql, (jid,
json.dumps(load)))
except psycopg2.IntegrityError:
# https://github.com/saltstack/salt/issues/22171
# Without this try:except: we get tons of duplicate entry errors
# which result in job returns not being stored properly
pass
def save_minions(jid, minions): # pylint: disable=unused-argument
@ -212,92 +281,90 @@ def get_load(jid):
'''
Return the load data that marks a specified jid
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''SELECT load FROM jids WHERE jid = %s;'''
cur.execute(sql, (jid,))
data = cur.fetchone()
if data:
return json.loads(data[0])
_close_conn(conn)
return {}
with _get_serv(ret=None, commit=True) as cur:
sql = '''SELECT load FROM jids WHERE jid = %s;'''
cur.execute(sql, (jid,))
data = cur.fetchone()
if data:
return json.loads(data[0])
return {}
def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''SELECT id, full_ret FROM salt_returns WHERE jid = %s'''
with _get_serv(ret=None, commit=True) as cur:
cur.execute(sql, (jid,))
data = cur.fetchall()
ret = {}
if data:
for minion, full_ret in data:
ret[minion] = json.loads(full_ret)
_close_conn(conn)
return ret
sql = '''SELECT id, full_ret FROM salt_returns
WHERE jid = %s'''
cur.execute(sql, (jid,))
data = cur.fetchall()
ret = {}
if data:
for minion, full_ret in data:
ret[minion] = json.loads(full_ret)
return ret
def get_fun(fun):
'''
Return a dict of the last function called for all minions
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''SELECT s.id,s.jid, s.full_ret
FROM salt_returns s
JOIN ( SELECT MAX(jid) AS jid FROM salt_returns GROUP BY fun, id) max
ON s.jid = max.jid
WHERE s.fun = %s
'''
with _get_serv(ret=None, commit=True) as cur:
cur.execute(sql, (fun,))
data = cur.fetchall()
sql = '''SELECT s.id,s.jid, s.full_ret
FROM salt_returns s
JOIN ( SELECT MAX(`jid`) as jid
from salt_returns GROUP BY fun, id) max
ON s.jid = max.jid
WHERE s.fun = %s
'''
ret = {}
if data:
for minion, _, full_ret in data:
ret[minion] = json.loads(full_ret)
_close_conn(conn)
return ret
cur.execute(sql, (fun,))
data = cur.fetchall()
ret = {}
if data:
for minion, _, full_ret in data:
ret[minion] = json.loads(full_ret)
return ret
def get_jids():
'''
Return a list of all job ids
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''SELECT jid, load FROM jids'''
with _get_serv(ret=None, commit=True) as cur:
cur.execute(sql)
data = cur.fetchall()
ret = {}
for jid, load in data:
ret[jid] = salt.utils.jid.format_jid_instance(jid, json.loads(load))
_close_conn(conn)
return ret
sql = '''SELECT jid, load
FROM jids'''
cur.execute(sql)
data = cur.fetchall()
ret = {}
for jid, load in data:
ret[jid] = salt.utils.jid.format_jid_instance(jid,
json.loads(load))
return ret
def get_minions():
'''
Return a list of minions
'''
conn = _get_conn(ret=None)
cur = conn.cursor()
sql = '''SELECT DISTINCT id FROM salt_returns'''
with _get_serv(ret=None, commit=True) as cur:
cur.execute(sql)
data = cur.fetchall()
ret = []
for minion in data:
ret.append(minion[0])
_close_conn(conn)
return ret
sql = '''SELECT DISTINCT id
FROM salt_returns'''
cur.execute(sql)
data = cur.fetchall()
ret = []
for minion in data:
ret.append(minion[0])
return ret
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument