Merge pull request #22196 from rallytime/bp-cassandra-prs

Backport cassandra_cql module and returner to 2015.2
This commit is contained in:
Thomas S Hatch 2015-03-31 11:02:54 -06:00
commit d329943107
20 changed files with 1118 additions and 18 deletions

View File

@ -0,0 +1,706 @@
# -*- coding: utf-8 -*-
'''
Cassandra Database Module
.. versionadded:: 2015.2.0
:depends: DataStax Python Driver for Apache Cassandra
https://github.com/datastax/python-driver
pip install cassandra-driver
:referenced by: Salt's cassandra_cql returner
:configuration:
The Cassandra cluster members and connection port can either be specified
in the master or minion config, the minion's pillar or be passed to the module.
Example configuration in the config for a single node:
.. code-block:: yaml
cassandra:
cluster: 192.168.50.10
port: 9000
Example configuration in the config for a cluster:
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
'''
# Import Python Libs
from __future__ import absolute_import
import logging
import json
# Import Salt Libs
from salt.exceptions import CommandExecutionError
from salt.ext import six
log = logging.getLogger(__name__)
__virtualname__ = 'cassandra_cql'
HAS_DRIVER = False
try:
# pylint: disable=import-error,no-name-in-module
from cassandra.cluster import Cluster
from cassandra.cluster import NoHostAvailable
from cassandra.connection import ConnectionException, ConnectionShutdown
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
# pylint: enable=import-error,no-name-in-module
HAS_DRIVER = True
except ImportError:
pass
def __virtual__():
'''
Return virtual name of the module only if the python driver can be loaded.
:return: The virtual name of the module.
:rtype: str
'''
if HAS_DRIVER:
return __virtualname__
return False
def _load_properties(property_name, config_option, set_default=False, default=None):
'''
Load properties for the cassandra module from config or pillar.
:param property_name: The property to load.
:type property_name: str or list of str
:param config_option: The name of the config option.
:type config_option: str
:param set_default: Should a default be set if not found in config.
:type set_default: bool
:param default: The default value to be set.
:type default: str
:return: The property fetched from the configuration or default.
:rtype: str or list of str
'''
if not property_name:
log.debug("No property specified in function, trying to load from salt configuration")
try:
options = __salt__['config.option']('cassandra')
except BaseException as e:
log.error("Failed to get cassandra config options. Reason: {0}".format(str(e)))
raise
loaded_property = options.get(config_option)
if not loaded_property:
if set_default:
log.debug('Setting default Cassandra {0} to {1}'.format(config_option, default))
loaded_property = default
else:
log.error('No cassandra {0} specified in the configuration or passed to the module.'.format(config_option))
raise CommandExecutionError("ERROR: Cassandra {0} cannot be empty.".format(config_option))
return loaded_property
return property_name
def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Connect to a Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str or list of str
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The session and cluster objects.
:rtype: cluster object, session object
'''
contact_points = _load_properties(property_name=contact_points, config_option='cluster')
contact_points = contact_points if isinstance(contact_points, list) else contact_points.split(',')
port = _load_properties(property_name=port, config_option='port', set_default=True, default=9042)
cql_user = _load_properties(property_name=cql_user, config_option='username', set_default=True, default="cassandra")
cql_pass = _load_properties(property_name=cql_pass, config_option='password', set_default=True, default="cassandra")
try:
auth_provider = PlainTextAuthProvider(username=cql_user, password=cql_pass)
cluster = Cluster(contact_points, port=port, auth_provider=auth_provider)
session = cluster.connect()
log.debug('Successfully connected to Cassandra cluster at {0}'.format(contact_points))
return cluster, session
except (ConnectionException, ConnectionShutdown, NoHostAvailable):
log.error('Could not connect to Cassandra cluster at {0}'.format(contact_points))
raise CommandExecutionError('ERROR: Could not connect to Cassandra cluster.')
def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Run a query on a Cassandra cluster and return a dictionary.
:param query: The query to execute.
:type query: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param params: The parameters for the query, optional.
:type params: str
:return: A dictionary from the return values of the query
:rtype: list[dict]
'''
try:
cluster, session = _connect(contact_points=contact_points, port=port, cql_user=cql_user, cql_pass=cql_pass)
except CommandExecutionError:
log.critical('Could not get Cassandra cluster session.')
raise
except BaseException as e:
log.critical('Unexpected error while getting Cassandra cluster session: {0}'.format(str(e)))
raise
session.row_factory = dict_factory
ret = []
try:
results = session.execute(query)
except BaseException as e:
log.error('Failed to execute query: {0}\n reason: {1}'.format(query, str(e)))
msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, str(e))
raise CommandExecutionError(msg)
if results:
for result in results:
values = {}
for key, value in result.iteritems():
# Salt won't return dictionaries with odd types like uuid.UUID
if not isinstance(value, six.text_type):
value = str(value)
values[key] = value
ret.append(values)
cluster.shutdown()
return ret
def version(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Show the Cassandra version.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The version for this Cassandra cluster.
:rtype: str
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.version
salt 'minion1' cassandra.version contact_points=minion1
'''
query = '''select release_version
from system.local
limit 1;'''
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not get Cassandra version.')
raise
except BaseException as e:
log.critical('Unexpected error while getting Cassandra version: {0}'.format(str(e)))
raise
return ret[0].get('release_version')
def info(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Show the Cassandra information for this cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The information for this Cassandra cluster.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.info
salt 'minion1' cassandra.info contact_points=minion1
'''
query = '''select cluster_name,
data_center,
partitioner,
host_id,
rack,
release_version,
cql_version,
schema_version,
thrift_version
from system.local
limit 1;'''
ret = {}
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not list Cassandra info.')
raise
except BaseException as e:
log.critical('Unexpected error while listing Cassandra info: {0}'.format(str(e)))
raise
return ret
def list_keyspaces(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
List keyspaces in a Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The keyspaces in this Cassandra cluster.
:rtype: list[dict]
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.list_keyspaces
salt 'minion1' cassandra.list_keyspaces contact_points=minion1 port=9000
'''
query = '''select keyspace_name
from system.schema_keyspaces;'''
ret = {}
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not list keyspaces.')
raise
except BaseException as e:
log.critical('Unexpected error while listing keyspaces: {0}'.format(str(e)))
raise
return ret
def list_column_families(keyspace=None, contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
List column families in a Cassandra cluster for all keyspaces or just the provided one.
:param keyspace The keyspace to provide the column families for, optional.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The column families in this Cassandra cluster.
:rtype: list[dict]
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.list_column_families
salt 'minion1' cassandra.list_column_families contact_points=minion1
salt 'minion1' cassandra.list_column_families keyspace=system
'''
where_clause = "where keyspace_name = '{0}'".format(keyspace) if keyspace else ""
query = '''select columnfamily_name
from system.schema_columnfamilies
{0};'''.format(where_clause)
ret = {}
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not list column families.')
raise
except BaseException as e:
log.critical('Unexpected error while listing column families: {0}'.format(str(e)))
raise
return ret
def keyspace_exists(keyspace, contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Check if a keyspace exists in a Cassandra cluster.
:param keyspace The keyspace name to check for.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.keyspace_exists keyspace=system
salt 'minion1' cassandra.list_keyspaces keyspace=system contact_points=minion1
'''
# Only project the keyspace_name to make the query efficien.
# Like an echo
query = '''select keyspace_name
from system.schema_keyspaces
where keyspace_name = '{0}';'''.format(keyspace)
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not determine if keyspace exists.')
raise
except BaseException as e:
log.critical('Unexpected error while determining if keyspace exists: {0}'.format(str(e)))
raise
return True if ret else False
def create_keyspace(keyspace, replication_strategy='SimpleStrategy', replication_factor=1, replication_datacenters=None,
contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Create a new keyspace in Cassandra.
:param keyspace: The keyspace name
:type keyspace: str
:param replication_strategy: either `SimpleStrategy` or `NetworkTopologyStrategy`
:type replication_strategy: str
:param replication_factor: number of replicas of data on multiple nodes. not used if using NetworkTopologyStrategy
:type replication_factor: int
:param replication_datacenters: string or dict of datacenter names to replication factors, required if using
NetworkTopologyStrategy (will be a dict if coming from state file).
:type replication_datacenters: str | dict[str, int]
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
.. code-block:: bash
salt 'minion1' cassandra.create_keyspace keyspace=newkeyspace
salt 'minion1' cassandra.create_keyspace keyspace=newkeyspace replication_strategy=NetworkTopologyStrategy \
replication_datacenters='{"datacenter_1": 3, "datacenter_2": 2}'
'''
existing_keyspace = keyspace_exists(keyspace, contact_points, port)
if not existing_keyspace:
# Add the strategy, replication_factor, etc.
replication_map = {
'class': replication_strategy
}
if replication_datacenters:
if isinstance(replication_datacenters, six.string_types):
try:
replication_datacenter_map = json.loads(replication_datacenters)
replication_map.update(**replication_datacenter_map)
except BaseException: # pylint: disable=W0703
log.error("Could not load json replication_datacenters.")
return False
else:
replication_map.update(**replication_datacenters)
else:
replication_map['replication_factor'] = replication_factor
query = '''create keyspace {0}
with replication = {1}
and durable_writes = true;'''.format(keyspace, replication_map)
try:
cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not create keyspace.')
raise
except BaseException as e:
log.critical('Unexpected error while creating keyspace: {0}'.format(str(e)))
raise
def drop_keyspace(keyspace, contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Drop a keyspace if it exists in a Cassandra cluster.
:param keyspace The keyspace to drop.
:type keyspace: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: The info for the keyspace or False if it does not exist.
:rtype: dict
CLI Example:
.. code-block:: bash
salt 'minion1' cassandra.drop_keyspace keyspace=test
salt 'minion1' cassandra.drop_keyspace keyspace=test contact_points=minion1
'''
existing_keyspace = keyspace_exists(keyspace, contact_points, port)
if existing_keyspace:
query = '''drop keyspace {0};'''.format(keyspace)
try:
cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not drop keyspace.')
raise
except BaseException as e:
log.critical('Unexpected error while dropping keyspace: {0}'.format(str(e)))
raise
return True
def list_users(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
List existing users in this Cassandra cluster.
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:return: The list of existing users.
:rtype: dict
.. code-block:: bash
salt 'minion1' cassandra.list_users
salt 'minion1' cassandra.list_users contact_points=minion1
'''
query = "list users;"
ret = {}
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not list users.')
raise
except BaseException as e:
log.critical('Unexpected error while listing users: {0}'.format(str(e)))
raise
return ret
def create_user(username, password, superuser=False, contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Create a new cassandra user with credentials and superuser status.
:param username: The name of the new user.
:type username: str
:param password: The password of the new user.
:type password: str
:param superuser: Is the new user going to be a superuser? default: False
:type superuser: bool
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return:
:rtype:
.. code-block:: bash
salt 'minion1' cassandra.create_user username=joe password=secret
salt 'minion1' cassandra.create_user username=joe password=secret superuser=True
salt 'minion1' cassandra.create_user username=joe password=secret superuser=True contact_points=minion1
'''
superuser_cql = 'superuser' if superuser else 'nosuperuser'
query = '''create user if not exists {0} with password '{1}' {2};'''.format(username, password, superuser_cql)
log.debug("Attempting to create a new user with username={0} superuser={1}".format(username, superuser_cql))
# The create user query doesn't actually return anything if the query succeeds.
# If the query fails, catch the exception, log a messange and raise it again.
try:
cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not create user.')
raise
except BaseException as e:
log.critical('Unexpected error while creating user: {0}'.format(str(e)))
raise
return True
def list_permissions(username=None, resource=None, resource_type='keyspace', permission=None, contact_points=None,
port=None, cql_user=None, cql_pass=None):
'''
List permissions.
:param username: The name of the user to list permissions for.
:type username: str
:param resource: The resource (keyspace or table), if None, permissions for all resources are listed.
:type resource: str
:param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'.
:type resource_type: str
:param permission: A permission name (e.g. select), if None, all permissions are listed.
:type permission: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return: Dictionary of permissions.
:rtype: dict
.. code-block:: bash
salt 'minion1' cassandra.list_permissions
salt 'minion1' cassandra.list_permissions username=joe resource=test_keyspace permission=select
salt 'minion1' cassandra.list_permissions username=joe resource=test_table resource_type=table \
permission=select contact_points=minion1
'''
keyspace_cql = "{0} {1}".format(resource_type, resource) if resource else "all keyspaces"
permission_cql = "{0} permission".format(permission) if permission else "all permissions"
query = "list {0} on {1}".format(permission_cql, keyspace_cql)
if username:
query = "{0} of {1}".format(query, username)
log.debug("Attempting to list permissions with query '{0}'".format(query))
ret = {}
try:
ret = cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not list permissions.')
raise
except BaseException as e:
log.critical('Unexpected error while listing permissions: {0}'.format(str(e)))
raise
return ret
def grant_permission(username, resource=None, resource_type='keyspace', permission=None, contact_points=None, port=None,
cql_user=None, cql_pass=None):
'''
Grant permissions to a user.
:param username: The name of the user to grant permissions to.
:type username: str
:param resource: The resource (keyspace or table), if None, permissions for all resources are granted.
:type resource: str
:param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'.
:type resource_type: str
:param permission: A permission name (e.g. select), if None, all permissions are granted.
:type permission: str
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:return:
:rtype:
.. code-block:: bash
salt 'minion1' cassandra.grant_permission
salt 'minion1' cassandra.grant_permission username=joe resource=test_keyspace permission=select
salt 'minion1' cassandra.grant_permission username=joe resource=test_table resource_type=table \
permission=select contact_points=minion1
'''
permission_cql = "grant {0}".format(permission) if permission else "grant all permissions"
resource_cql = "on {0} {1}".format(resource_type, resource) if resource else "on all keyspaces"
query = "{0} {1} to {2}".format(permission_cql, resource_cql, username)
log.debug("Attempting to grant permissions with query '{0}'".format(query))
try:
cql_query(query, contact_points, port, cql_user, cql_pass)
except CommandExecutionError:
log.critical('Could not grant permissions.')
raise
except BaseException as e:
log.critical('Unexpected error while granting permissions: {0}'.format(str(e)))
raise
return True

View File

@ -29,7 +29,7 @@ For each of those items we process, it depends on the object type:
- A list has the first entry used as the query, the second as the pillar depth.
- A mapping uses the keys "query" and "depth" as the tuple
You can retrieve as many fields as you like, how the get used depends on the
You can retrieve as many fields as you like, how they get used depends on the
exact settings.
Configuring the mysql ext_pillar

View File

@ -250,7 +250,7 @@ def returner(ret):
total_sent_bytes += sent_bytes
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -0,0 +1,394 @@
# -*- coding: utf-8 -*-
'''
Return data to a cassandra server
.. versionadded:: 2015.2.0
:maintainer: Corin Kochenower<ckochenower@saltstack.com>
:maturity: new as of 2015.2
:depends: salt.modules.cassandra_cql
:depends: DataStax Python Driver for Apache Cassandra
https://github.com/datastax/python-driver
pip install cassandra-driver
:platform: all
:configuration:
To enable this returner, the minion will need the DataStax Python Driver
for Apache Cassandra ( https://github.com/datastax/python-driver )
installed and the following values configured in the minion or master
config. The list of cluster IPs must include at least one cassandra node
IP address. No assumption or default will be used for the cluster IPs.
The cluster IPs will be tried in the order listed. The port, username,
and password values shown below will be the assumed defaults if you do
not provide values.::
cassandra:
cluster:
- 192.168.50.11
- 192.168.50.12
- 192.168.50.13
port: 9042
username: salt
password: salt
Use the following cassandra database schema::
CREATE KEYSPACE IF NOT EXISTS salt
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE USER IF NOT EXISTS salt WITH PASSWORD 'salt' NOSUPERUSER;
GRANT ALL ON KEYSPACE salt TO salt;
USE salt;
CREATE KEYSPACE IF NOT EXISTS salt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS salt.salt_returns (
jid text PRIMARY KEY,
alter_time timestamp,
full_ret text,
fun text,
id text,
return text,
success boolean
);
CREATE INDEX IF NOT EXISTS fun ON salt.salt_returns (fun);
CREATE INDEX IF NOT EXISTS id ON salt.salt_returns (id);
CREATE TABLE IF NOT EXISTS salt.minions (
id text PRIMARY KEY,
last_fun text
);
CREATE TABLE IF NOT EXISTS salt.jids (
jid text PRIMARY KEY,
load text
);
CREATE TABLE IF NOT EXISTS salt.salt_events (
id timeuuid PRIMARY KEY,
alter_time timestamp,
data text,
master_id text,
tag text
);
CREATE INDEX IF NOT EXISTS tag ON salt.salt_events (tag);
Required python modules: cassandra-driver
To use the cassandra returner, append '--return cassandra' to the salt command. ex:
salt '*' test.ping --return cassandra
'''
from __future__ import absolute_import
# Let's not allow PyLint complain about string substitution
# pylint: disable=W1321,E1321
# Import python libs
import json
import logging
import uuid
import time
# Import salt libs
import salt.returners
import salt.utils.jid
import salt.exceptions
from salt.exceptions import CommandExecutionError
# Import third party libs
try:
# The following imports are not directly required by this module. Rather,
# they are required by the modules/cassandra_cql execution module, on which
# this module depends.
#
# This returner cross-calls the cassandra_cql execution module using the __salt__ dunder.
#
# The modules/cassandra_cql execution module will not load if the DataStax Python Driver
# for Apache Cassandra is not installed.
#
# This module will try to load all of the 3rd party dependencies on which the
# modules/cassandra_cql execution module depends.
#
# Effectively, if the DataStax Python Driver for Apache Cassandra is not
# installed, both the modules/cassandra_cql execution module and this returner module
# will not be loaded by Salt's loader system.
# pylint: disable=unused-import
from cassandra.cluster import Cluster
from cassandra.cluster import NoHostAvailable
from cassandra.connection import ConnectionException, ConnectionShutdown
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
# pylint: enable=unused-import
HAS_CASSANDRA_DRIVER = True
except ImportError as e:
HAS_CASSANDRA_DRIVER = False
log = logging.getLogger(__name__)
# Define the module's virtual name
#
# The 'cassandra' __virtualname__ is already taken by the
# returners/cassandra_return module, which utilizes nodetool. This module
# cross-calls the modules/cassandra_cql execution module, which uses the
# DataStax Python Driver for Apache Cassandra. Namespacing allows both the
# modules/cassandra_cql and returners/cassandra_cql modules to use the
# virtualname 'cassandra_cql'.
__virtualname__ = 'cassandra_cql'
def __virtual__():
if not HAS_CASSANDRA_DRIVER:
return False
return True
def returner(ret):
'''
Return data to one of potentially many clustered cassandra nodes
'''
query = '''INSERT INTO salt.salt_returns (
jid, alter_time, full_ret, fun, id, return, success
) VALUES (
'{0}', '{1}', '{2}', '{3}', '{4}', '{5}', {6}
);'''.format(
ret['jid'],
int(time.time() * 1000),
json.dumps(ret),
ret['fun'],
ret['id'],
json.dumps(ret['return']),
ret.get('success', False),
)
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
except CommandExecutionError:
log.critical('Could not insert into salt_returns with Cassandra returner.')
raise
except Exception as e:
log.critical('Unexpected error while inserting into salt_returns: {0}'.format(str(e)))
raise
# Store the last function called by the minion
# The data in salt.minions will be used by get_fun and get_minions
query = '''INSERT INTO salt.minions (
id, last_fun
) VALUES (
'{0}', '{1}'
);'''.format(ret['id'], ret['fun'])
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
except CommandExecutionError:
log.critical('Could not store minion ID with Cassandra returner.')
raise
except Exception as e:
log.critical('Unexpected error while inserting minion ID into the minions table: {0}'.format(str(e)))
raise
def event_return(events):
'''
Return event to one of potentially many clustered cassandra nodes
Requires that configuration be enabled via 'event_return'
option in master config.
Cassandra does not support an auto-increment feature due to the
highly inefficient nature of creating a monotonically increasing
number accross all nodes in a distributed database. Each event
will be assigned a uuid by the connecting client.
'''
for event in events:
tag = event.get('tag', '')
data = event.get('data', '')
query = '''INSERT INTO salt.salt_events (
id, alter_time, data, master_id, tag
) VALUES (
{0}, {1}, '{2}', '{3}', '{4}'
);'''.format(str(uuid.uuid1()),
int(time.time() * 1000),
json.dumps(data),
__opts__['id'],
tag)
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
except CommandExecutionError:
log.critical('Could not store events with Cassandra returner.')
raise
except Exception as e:
log.critical('Unexpected error while inserting into salt_events: {0}'.format(str(e)))
raise
def save_load(jid, load):
'''
Save the load to the specified jid id
'''
query = '''INSERT INTO salt.jids (
jid, load
) VALUES (
'{0}', '{1}'
);'''.format(jid, json.dumps(load))
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
except CommandExecutionError:
log.critical('Could not save load in jids table.')
raise
except Exception as e:
log.critical('Unexpected error while inserting into jids: {0}'.format(str(e)))
raise
# salt-run jobs.list_jobs FAILED
def get_load(jid):
'''
Return the load data that marks a specified jid
'''
query = '''SELECT load FROM salt.jids WHERE jid = '{0}';'''.format(jid)
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
if data:
load = data[0].get('load')
if load:
ret = json.loads(load)
except CommandExecutionError:
log.critical('Could not get load from jids table.')
raise
except Exception as e:
log.critical('Unexpected error while getting load from jids: {0}'.format(str(e)))
raise
return ret
# salt-call ret.get_jid cassandra_cql 20150327234537907315 PASSED
def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
query = '''SELECT id, full_ret FROM salt.salt_returns WHERE jid = '{0}';'''.format(jid)
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
if data:
for row in data:
minion = row.get('id')
full_ret = row.get('full_ret')
if minion and full_ret:
ret[minion] = json.loads(full_ret)
except CommandExecutionError:
log.critical('Could not select job specific information.')
raise
except Exception as e:
log.critical('Unexpected error while getting job specific information: {0}'.format(str(e)))
raise
return ret
# salt-call ret.get_fun cassandra_cql test.ping PASSED
def get_fun(fun):
'''
Return a dict of the last function called for all minions
'''
query = '''SELECT id, last_fun FROM salt.minions where last_fun = '{0}';'''.format(fun)
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
if data:
for row in data:
minion = row.get('id')
last_fun = row.get('last_fun')
if minion and last_fun:
ret[minion] = last_fun
except CommandExecutionError:
log.critical('Could not get the list of minions.')
raise
except Exception as e:
log.critical('Unexpected error while getting list of minions: {0}'.format(str(e)))
raise
return ret
# salt-call ret.get_jids cassandra_cql PASSED
def get_jids():
'''
Return a list of all job ids
'''
query = '''SELECT DISTINCT jid FROM salt.jids;'''
ret = []
#import pdb; pdb.set_trace()
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
if data:
for row in data:
jid = row.get('jid')
if jid:
ret.append(jid)
except CommandExecutionError:
log.critical('Could not get a list of all job ids.')
raise
except Exception as e:
log.critical('Unexpected error while getting list of all job ids: {0}'.format(str(e)))
raise
return ret
# salt-call ret.get_minions cassandra_cql PASSED
def get_minions():
'''
Return a list of minions
'''
query = '''SELECT DISTINCT id FROM salt.minions;'''
ret = []
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
if data:
for row in data:
minion = row.get('id')
if minion:
ret.append(minion)
except CommandExecutionError:
log.critical('Could not get the list of minions.')
raise
except Exception as e:
log.critical('Unexpected error while getting list of minions: {0}'.format(str(e)))
raise
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''
return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid()

View File

@ -75,7 +75,7 @@ def returner(ret):
ccf.insert(ret['jid'], columns)
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -362,7 +362,7 @@ def set_salt_view():
return True
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -78,7 +78,7 @@ def save_load(jid, load):
'which responded with {1}'.format(signal[0], signal[1]))
def prep_jid(nocache, passed_jid=None):
def prep_jid(nocache=False, passed_jid=None):
'''
Do any work necessary to prepare a JID, including sending a custom ID
'''

View File

@ -149,7 +149,7 @@ def returner(ret):
)
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -170,7 +170,7 @@ def get_minions():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -104,7 +104,7 @@ def _get_serv(ret):
# an integer weight value.
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -221,7 +221,7 @@ def get_jids():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -169,7 +169,7 @@ def get_fun(fun):
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -332,7 +332,7 @@ def get_minions():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -309,7 +309,7 @@ def get_minions():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -285,7 +285,7 @@ def get_minions():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -171,7 +171,7 @@ def get_minions():
return list(serv.smembers('minions'))
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -114,7 +114,7 @@ def returner(ret):
)
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -188,7 +188,7 @@ def returner(ret):
server.quit()
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -284,7 +284,7 @@ def get_minions():
return ret
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''

View File

@ -44,7 +44,7 @@ def returner(ret):
syslog.syslog(syslog.LOG_INFO, 'salt-minion: {0}'.format(json.dumps(ret)))
def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument
def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
'''
Do any work necessary to prepare a JID, including sending a custom id
'''