salt/tests/integration/modules/test_mysql.py

1757 lines
60 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
2017-04-02 16:09:47 +00:00
from tests.support.mixins import SaltReturnAssertsMixin
# Import salt libs
Use explicit unicode strings + break up salt.utils This PR is part of what will be an ongoing effort to use explicit unicode strings in Salt. Because Python 3 does not suport Python 2's raw unicode string syntax (i.e. `ur'\d+'`), we must use `salt.utils.locales.sdecode()` to ensure that the raw string is unicode. However, because of how `salt/utils/__init__.py` has evolved into the hulking monstrosity it is today, this means importing a large module in places where it is not needed, which could negatively impact performance. For this reason, this PR also breaks out some of the functions from `salt/utils/__init__.py` into new/existing modules under `salt/utils/`. The long term goal will be that the modules within this directory do not depend on importing `salt.utils`. A summary of the changes in this PR is as follows: * Moves the following functions from `salt.utils` to new locations (including a deprecation warning if invoked from `salt.utils`): `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`, `dequote`, `is_hex`, `is_bin_str`, `rand_string`, `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`, `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`, `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`, `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`, `is_openbsd`, `is_aix` * Moves the functions already deprecated by @rallytime to the bottom of `salt/utils/__init__.py` for better organization, so we can keep the deprecated ones separate from the ones yet to be deprecated as we continue to break up `salt.utils` * Updates `salt/*.py` and all files under `salt/client/` to use explicit unicode string literals. * Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils import foo` becomes `import salt.utils.foo as foo`). * Renames the `test.rand_str` function to `test.random_hash` to more accurately reflect what it does * Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`) such that it returns a string matching the passed size. Previously this function would get `size` bytes from `os.urandom()`, base64-encode it, and return the result, which would in most cases not be equal to the passed size.
2017-07-25 01:47:15 +00:00
import salt.utils.path
from salt.modules import mysql as mysqlmod
2014-11-21 19:47:43 +00:00
# Import 3rd-party libs
Use explicit unicode strings + break up salt.utils This PR is part of what will be an ongoing effort to use explicit unicode strings in Salt. Because Python 3 does not suport Python 2's raw unicode string syntax (i.e. `ur'\d+'`), we must use `salt.utils.locales.sdecode()` to ensure that the raw string is unicode. However, because of how `salt/utils/__init__.py` has evolved into the hulking monstrosity it is today, this means importing a large module in places where it is not needed, which could negatively impact performance. For this reason, this PR also breaks out some of the functions from `salt/utils/__init__.py` into new/existing modules under `salt/utils/`. The long term goal will be that the modules within this directory do not depend on importing `salt.utils`. A summary of the changes in this PR is as follows: * Moves the following functions from `salt.utils` to new locations (including a deprecation warning if invoked from `salt.utils`): `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`, `dequote`, `is_hex`, `is_bin_str`, `rand_string`, `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`, `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`, `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`, `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`, `is_openbsd`, `is_aix` * Moves the functions already deprecated by @rallytime to the bottom of `salt/utils/__init__.py` for better organization, so we can keep the deprecated ones separate from the ones yet to be deprecated as we continue to break up `salt.utils` * Updates `salt/*.py` and all files under `salt/client/` to use explicit unicode string literals. * Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils import foo` becomes `import salt.utils.foo as foo`). * Renames the `test.rand_str` function to `test.random_hash` to more accurately reflect what it does * Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`) such that it returns a string matching the passed size. Previously this function would get `size` bytes from `os.urandom()`, base64-encode it, and return the result, which would in most cases not be equal to the passed size.
2017-07-25 01:47:15 +00:00
from salt.ext import six
2014-11-21 19:47:43 +00:00
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
log = logging.getLogger(__name__)
NO_MYSQL = False
try:
2014-11-21 19:47:43 +00:00
import MySQLdb # pylint: disable=import-error,unused-import
except Exception:
NO_MYSQL = True
Use explicit unicode strings + break up salt.utils This PR is part of what will be an ongoing effort to use explicit unicode strings in Salt. Because Python 3 does not suport Python 2's raw unicode string syntax (i.e. `ur'\d+'`), we must use `salt.utils.locales.sdecode()` to ensure that the raw string is unicode. However, because of how `salt/utils/__init__.py` has evolved into the hulking monstrosity it is today, this means importing a large module in places where it is not needed, which could negatively impact performance. For this reason, this PR also breaks out some of the functions from `salt/utils/__init__.py` into new/existing modules under `salt/utils/`. The long term goal will be that the modules within this directory do not depend on importing `salt.utils`. A summary of the changes in this PR is as follows: * Moves the following functions from `salt.utils` to new locations (including a deprecation warning if invoked from `salt.utils`): `to_bytes`, `to_str`, `to_unicode`, `str_to_num`, `is_quoted`, `dequote`, `is_hex`, `is_bin_str`, `rand_string`, `contains_whitespace`, `clean_kwargs`, `invalid_kwargs`, `which`, `which_bin`, `path_join`, `shlex_split`, `rand_str`, `is_windows`, `is_proxy`, `is_linux`, `is_darwin`, `is_sunos`, `is_smartos`, `is_smartos_globalzone`, `is_smartos_zone`, `is_freebsd`, `is_netbsd`, `is_openbsd`, `is_aix` * Moves the functions already deprecated by @rallytime to the bottom of `salt/utils/__init__.py` for better organization, so we can keep the deprecated ones separate from the ones yet to be deprecated as we continue to break up `salt.utils` * Updates `salt/*.py` and all files under `salt/client/` to use explicit unicode string literals. * Gets rid of implicit imports of `salt.utils` (e.g. `from salt.utils import foo` becomes `import salt.utils.foo as foo`). * Renames the `test.rand_str` function to `test.random_hash` to more accurately reflect what it does * Modifies `salt.utils.stringutils.random()` (née `salt.utils.rand_string()`) such that it returns a string matching the passed size. Previously this function would get `size` bytes from `os.urandom()`, base64-encode it, and return the result, which would in most cases not be equal to the passed size.
2017-07-25 01:47:15 +00:00
if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
2013-12-01 01:34:05 +00:00
2013-12-04 13:31:17 +00:00
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlModuleDbTest(ModuleCase, SaltReturnAssertsMixin):
2013-12-04 13:31:17 +00:00
'''
Module testing database creation on a real MySQL Server.
'''
2013-12-01 01:34:05 +00:00
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
2013-12-01 01:34:05 +00:00
'''
super(MysqlModuleDbTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
2013-12-01 01:34:05 +00:00
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
2013-12-01 01:34:05 +00:00
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
2013-12-11 13:40:42 +00:00
def _db_creation_loop(self,
db_name,
returning_name,
test_conn=False,
**kwargs):
'''
2013-12-04 13:31:17 +00:00
Used in db testCase, create, check exists, check in list and removes.
'''
ret = self.run_function(
'mysql.db_create',
name=db_name,
**kwargs
)
2013-12-04 13:31:17 +00:00
self.assertEqual(
True,
ret,
'Problem while creating db for db name: \'{0}\''.format(db_name)
2013-12-04 13:31:17 +00:00
)
# test db exists
ret = self.run_function(
'mysql.db_exists',
name=db_name,
**kwargs
)
2013-12-04 13:31:17 +00:00
self.assertEqual(
True,
ret,
'Problem while testing db exists for db name: \'{0}\''.format(db_name)
2013-12-04 13:31:17 +00:00
)
# List db names to ensure db is created with the right utf8 string
ret = self.run_function(
'mysql.db_list',
**kwargs
)
if not isinstance(ret, list):
raise AssertionError(
2013-12-04 13:31:17 +00:00
('Unexpected query result while retrieving databases list'
' \'{0}\' for \'{1}\' test').format(
ret,
db_name
)
)
2013-12-04 13:31:17 +00:00
self.assertIn(
returning_name,
ret,
('Problem while testing presence of db name in db lists'
' for db name: \'{0}\' in list \'{1}\'').format(
db_name,
ret
2013-12-04 13:31:17 +00:00
))
if test_conn:
# test connections on database with root user
ret = self.run_function(
'mysql.query',
database=db_name,
query='SELECT 1',
**kwargs
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' on database : {0}').format(
repr(db_name)
)
)
self.assertEqual([['1']], ret['results'])
# Now remove database
ret = self.run_function(
'mysql.db_remove',
name=db_name,
**kwargs
)
2013-12-04 13:31:17 +00:00
self.assertEqual(
True,
ret,
'Problem while removing db for db name: \'{0}\''.format(db_name)
2013-12-04 13:31:17 +00:00
)
@destructiveTest
def test_database_creation_level1(self):
'''
Create database, test presence, then drop db. All theses with complex names.
'''
# name with space
2013-12-02 13:06:35 +00:00
db_name = 'foo 1'
self._db_creation_loop(db_name=db_name,
returning_name=db_name,
test_conn=True,
connection_user=self.user,
connection_pass=self.password
)
# ```````
# create
# also with character_set and collate only
ret = self.run_function(
'mysql.db_create',
name='foo`2',
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
name='foo`2',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
# redoing the same should fail
# even with other character sets or collations
ret = self.run_function(
'mysql.db_create',
name='foo`2',
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(False, ret)
# redoing the same should fail
ret = self.run_function(
'mysql.db_create',
name='foo`2',
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(False, ret)
# Now remove database
ret = self.run_function(
'mysql.db_remove',
name='foo`2',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
# '''''''
# create
# also with character_set only
2013-12-02 13:06:35 +00:00
db_name = "foo'3"
self._db_creation_loop(db_name=db_name,
returning_name=db_name,
test_conn=True,
character_set='utf8',
connection_user=self.user,
connection_pass=self.password
)
# """"""""
# also with collate only
2013-12-02 13:06:35 +00:00
db_name = 'foo"4'
self._db_creation_loop(db_name=db_name,
returning_name=db_name,
test_conn=True,
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
# fuzzy
2013-12-02 13:06:35 +00:00
db_name = '<foo` --"5>'
self._db_creation_loop(db_name=db_name,
returning_name=db_name,
test_conn=True,
connection_user=self.user,
connection_pass=self.password
)
@destructiveTest
def test_mysql_dbname_character_percent(self):
'''
Play with the '%' character problems
This character should be escaped in the form '%%' on queries, but only
when theses queries have arguments. It is also a special character
in LIKE SQL queries. Finally it is used to indicate query arguments.
'''
db_name1 = "foo%1_"
db_name2 = "foo%12"
ret = self.run_function(
'mysql.db_create',
name=db_name1,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
ret = self.run_function(
'mysql.db_create',
name=db_name2,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
ret = self.run_function(
'mysql.db_remove',
name=db_name1,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
ret = self.run_function(
'mysql.db_exists',
name=db_name1,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(False, ret)
ret = self.run_function(
'mysql.db_exists',
name=db_name2,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
ret = self.run_function(
'mysql.db_remove',
name=db_name2,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
@destructiveTest
def test_database_creation_utf8(self):
'''
Test support of utf8 in database names
'''
# Simple accents : using utf8 string
2013-12-02 13:06:35 +00:00
db_name_unicode = u'notam\xe9rican'
# same as 'notamérican' because of file encoding
# but ensure it on this test
2013-12-02 13:06:35 +00:00
db_name_utf8 = 'notam\xc3\xa9rican'
# FIXME: MySQLdb problems on conn strings containing
# utf-8 on user name of db name prevent conn test
self._db_creation_loop(db_name=db_name_utf8,
returning_name=db_name_utf8,
test_conn=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# test unicode entry will also return utf8 name
self._db_creation_loop(db_name=db_name_unicode,
returning_name=db_name_utf8,
test_conn=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# Using more complex unicode characters:
2013-12-02 13:06:35 +00:00
db_name_unicode = u'\u6a19\u6e96\u8a9e'
# same as '標準語' because of file encoding
# but ensure it on this test
2013-12-02 13:06:35 +00:00
db_name_utf8 = '\xe6\xa8\x99\xe6\xba\x96\xe8\xaa\x9e'
self._db_creation_loop(db_name=db_name_utf8,
returning_name=db_name_utf8,
test_conn=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# test unicode entry will also return utf8 name
self._db_creation_loop(db_name=db_name_unicode,
returning_name=db_name_utf8,
test_conn=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
def test_database_maintenance(self):
'''
Test maintenance operations on a created database
'''
dbname = u"foo%'-- `\"'"
# create database
# but first silently try to remove it
# in case of previous tests failures
ret = self.run_function(
'mysql.db_remove',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
ret = self.run_function(
'mysql.db_create',
name=dbname,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
# Create 3 tables
tablenames = {'A%table "`1': 'MYISAM',
'B%table \'`2': 'InnoDB',
2013-12-04 13:31:17 +00:00
'Ctable --`3': 'MEMORY'
}
2014-11-21 19:47:43 +00:00
for tablename, engine in sorted(six.iteritems(tablenames)):
# prepare queries
create_query = ('CREATE TABLE {tblname} ('
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE={engine};'.format(
tblname=mysqlmod.quote_identifier(tablename),
engine=engine,
))
insert_query = ('INSERT INTO {tblname} (data)'
' VALUES '.format(
2013-12-04 13:31:17 +00:00
tblname=mysqlmod.quote_identifier(tablename)
))
delete_query = ('DELETE from {tblname}'
' order by rand() limit 50;'.format(
2013-12-04 13:31:17 +00:00
tblname=mysqlmod.quote_identifier(tablename)
))
for x in range(100):
insert_query += "('foo"+six.text_type(x)+"'),"
insert_query += "('bar');"
# populate database
log.info('Adding table \'%s\'', tablename)
ret = self.run_function(
'mysql.query',
database=dbname,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
2013-12-04 13:31:17 +00:00
('Unexpected query result while populating test table'
' \'{0}\' : \'{1}\'').format(
tablename,
ret,
)
)
2013-12-02 10:33:13 +00:00
self.assertEqual(ret['rows affected'], 0)
log.info('Populating table \'%s\'', tablename)
ret = self.run_function(
'mysql.query',
database=dbname,
query=insert_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
2013-12-04 13:31:17 +00:00
('Unexpected query result while populating test table'
' \'{0}\' : \'{1}\'').format(
tablename,
ret,
)
)
2013-12-02 10:33:13 +00:00
self.assertEqual(ret['rows affected'], 101)
log.info('Removing some rows on table\'%s\'', tablename)
ret = self.run_function(
'mysql.query',
database=dbname,
query=delete_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
('Unexpected query result while removing rows on test table'
' \'{0}\' : \'{1}\'').format(
tablename,
ret,
)
)
2013-12-02 10:33:13 +00:00
self.assertEqual(ret['rows affected'], 50)
# test check/repair/opimize on 1 table
tablename = 'A%table "`1'
ret = self.run_function(
'mysql.db_check',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
# Note that returned result does not quote_identifier of table and db
2013-12-12 22:13:20 +00:00
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
2013-12-04 13:31:17 +00:00
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'check'}]
)
ret = self.run_function(
'mysql.db_repair',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
# Note that returned result does not quote_identifier of table and db
2013-12-04 13:31:17 +00:00
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'repair'}]
)
ret = self.run_function(
'mysql.db_optimize',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
# Note that returned result does not quote_identifier of table and db
2013-12-04 13:31:17 +00:00
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'optimize'}]
)
# test check/repair/opimize on all tables
ret = self.run_function(
'mysql.db_check',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
expected = []
2014-11-21 19:47:43 +00:00
for tablename, engine in sorted(six.iteritems(tablenames)):
if engine is 'MEMORY':
expected.append([{
'Table': dbname+'.'+tablename,
2013-12-04 13:31:17 +00:00
'Msg_text': ("The storage engine for the table doesn't"
" support check"),
'Msg_type': 'note',
'Op': 'check'
}])
else:
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'check'
}])
2013-12-02 10:33:13 +00:00
self.assertEqual(ret, expected)
ret = self.run_function(
'mysql.db_repair',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
expected = []
2014-11-21 19:47:43 +00:00
for tablename, engine in sorted(six.iteritems(tablenames)):
if engine is 'MYISAM':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'repair'
}])
else:
expected.append([{
'Table': dbname+'.'+tablename,
2013-12-04 13:31:17 +00:00
'Msg_text': ("The storage engine for the table doesn't"
" support repair"),
'Msg_type': 'note',
'Op': 'repair'
}])
2013-12-02 10:33:13 +00:00
self.assertEqual(ret, expected)
ret = self.run_function(
'mysql.db_optimize',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
expected = []
2014-11-21 19:47:43 +00:00
for tablename, engine in sorted(six.iteritems(tablenames)):
if engine is 'MYISAM':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'optimize'
}])
elif engine is 'InnoDB':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': ("Table does not support optimize, "
"doing recreate + analyze instead"),
'Msg_type': 'note',
'Op': 'optimize'
},
{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'optimize'
}])
elif engine is 'MEMORY':
expected.append([{
'Table': dbname+'.'+tablename,
2013-12-04 13:31:17 +00:00
'Msg_text': ("The storage engine for the table doesn't"
" support optimize"),
'Msg_type': 'note',
'Op': 'optimize'
}])
2013-12-02 10:33:13 +00:00
self.assertEqual(ret, expected)
# Teardown, remove database
ret = self.run_function(
'mysql.db_remove',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
2013-12-02 10:33:13 +00:00
self.assertEqual(True, ret)
2013-12-04 13:31:17 +00:00
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlModuleUserTest(ModuleCase, SaltReturnAssertsMixin):
2013-12-04 13:31:17 +00:00
'''
User Creation and connection tests
'''
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlModuleUserTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
2013-12-04 13:47:26 +00:00
def _userCreationLoop(self,
2013-12-04 13:31:17 +00:00
uname,
host,
password=None,
new_password=None,
new_password_hash=None,
**kwargs):
'''
Perform some tests around creation of the given user
'''
# First silently remove it, in case of
ret = self.run_function(
'mysql.user_remove',
user=uname,
host=host,
**kwargs
)
# creation
ret = self.run_function(
'mysql.user_create',
user=uname,
host=host,
password=password,
**kwargs
)
self.assertEqual(True, ret, ('Calling user_create on'
' user \'{0}\' did not return True: {1}').format(
uname,
repr(ret)
))
# double creation failure
ret = self.run_function(
'mysql.user_create',
user=uname,
host=host,
password=password,
**kwargs
)
self.assertEqual(False, ret, ('Calling user_create a second time on'
' user \'{0}\' did not return False: {1}').format(
uname,
repr(ret)
))
# Alter password
if new_password is not None or new_password_hash is not None:
ret = self.run_function(
'mysql.user_chpass',
user=uname,
host=host,
password=new_password,
password_hash=new_password_hash,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Calling user_chpass on'
' user \'{0}\' did not return True: {1}').format(
uname,
repr(ret)
))
def _chck_userinfo(self, user, host, check_user, check_hash):
'''
Internal routine to check user_info returned results
'''
ret = self.run_function(
'mysql.user_info',
user=user,
host=host,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
if not isinstance(ret, dict):
raise AssertionError(
'Unexpected result while retrieving user_info for '
'\'{0}\''.format(user)
)
2013-12-04 13:31:17 +00:00
self.assertEqual(ret['Host'], host)
self.assertEqual(ret['Password'], check_hash)
self.assertEqual(ret['User'], check_user)
def _chk_remove_user(self, user, host, **kwargs):
'''
Internal routine to check user_remove
'''
ret = self.run_function(
'mysql.user_remove',
user=user,
host=host,
**kwargs
)
2013-12-04 13:31:17 +00:00
self.assertEqual(True, ret, ('Assertion failed while removing user'
' \'{0}\' on host \'{1}\': {2}').format(
user,
host,
repr(ret)
))
@destructiveTest
def test_user_management(self):
'''
Test various users creation settings
'''
# Create users with rights on this database
# and rights on other databases
user1 = "user '1"
user1_pwd = 'pwd`\'"1b'
user1_pwd_hash = '*4DF33B3B12E43384677050A818327877FAB2F4BA'
# this is : user "2'標
user2 = 'user "2\'\xe6\xa8\x99'
user2_pwd = 'user "2\'\xe6\xa8\x99b'
user2_pwd_hash = '*3A38A7B94B024B983687BB9B44FB60B7AA38FE61'
user3 = 'user "3;,?:@=&/'
user3_pwd = 'user "3;,?:@=&/'
user3_pwd_hash = '*AA3B1D4105A45D381C23A5C221C47EA349E1FD7D'
# this is : user ":=;4標 in unicode instead of utf-8
2013-12-12 22:13:20 +00:00
# if unicode char is counted as 1 char we hit the max user
# size (16)
user4 = u'user":;,?:@=&/4\u6a19'
user4_utf8 = 'user":;,?:@=&/4\xe6\xa8\x99'
user4_pwd = 'user "4;,?:@=&/'
user4_pwd_hash = '*FC8EF8DBF27628E4E113359F8E7478D5CF3DD57C'
user5 = u'user ``"5'
user5_utf8 = 'user ``"5'
# this is 標標標\
user5_pwd = '\xe6\xa8\x99\xe6\xa8\x99\\'
# this is password('標標\\')
user5_pwd_hash = '*3752E65CDD8751AF8D889C62CFFC6C998B12C376'
user6 = u'user %--"6'
user6_utf8 = 'user %--"6'
# this is : --'"% SIX標b
user6_pwd_u = u' --\'"% SIX\u6a19b'
user6_pwd_utf8 = ' --\'"% SIX\xe6\xa8\x99b'
# this is password(' --\'"% SIX標b')
user6_pwd_hash = '*90AE800593E2D407CD9E28CCAFBE42D17EEA5369'
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user1,
host='localhost',
password='pwd`\'"1',
new_password='pwd`\'"1b',
connection_user=self.user,
connection_pass=self.password
)
# Now check for results
ret = self.run_function(
'mysql.user_exists',
user=user1,
host='localhost',
password=user1_pwd,
password_hash=None,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' existence failed').format(user1, 'localhost')
)
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user2,
host='localhost',
password=None,
# this is his name hash : user "2'標
password_hash='*EEF6F854748ACF841226BB1C2422BEC70AE7F1FF',
# and this is the same with a 'b' added
new_password_hash=user2_pwd_hash,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# user2 can connect from other places with other password
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user2,
host='10.0.0.1',
allow_passwordless=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user2,
host='10.0.0.2',
allow_passwordless=True,
unix_socket=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# Now check for results
ret = self.run_function(
'mysql.user_exists',
user=user2,
host='localhost',
password=None,
password_hash=user2_pwd_hash,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' failed').format(user2, 'localhost')
)
ret = self.run_function(
'mysql.user_exists',
user=user2,
host='10.0.0.1',
allow_passwordless=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' without password failed').format(user2, '10.0.0.1')
)
ret = self.run_function(
'mysql.user_exists',
user=user2,
host='10.0.0.2',
allow_passwordless=True,
unix_socket=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' without password failed').format(user2, '10.0.0.2')
)
# Empty password is not passwordless (or is it a bug?)
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user3,
host='localhost',
password='',
connection_user=self.user,
connection_pass=self.password
)
# user 3 on another host with a password
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user3,
host='%',
password='foo',
new_password=user3_pwd,
connection_user=self.user,
connection_pass=self.password
)
# Now check for results
ret = self.run_function(
'mysql.user_exists',
user=user3,
host='localhost',
password='',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' without empty password failed').format(user3, 'localhost')
)
ret = self.run_function(
'mysql.user_exists',
user=user3,
host='%',
password=user3_pwd,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' with password failed').format(user3, '%')
)
# check unicode name, and password > password_hash
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user4,
host='%',
password=user4_pwd,
# this is password('foo')
password_hash='*F3A2A51A9B0F2BE2468926B4132313728C250DBF',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# Now check for results
ret = self.run_function(
'mysql.user_exists',
user=user4_utf8,
host='%',
password=user4_pwd,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' with password take from password and not password_hash'
' failed').format(user4_utf8, '%')
)
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user5,
host='localhost',
password='\xe6\xa8\x99\xe6\xa8\x99',
new_password=user5_pwd,
unix_socket=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
ret = self.run_function(
'mysql.user_exists',
user=user5_utf8,
host='localhost',
password=user5_pwd,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
' with utf8 password failed').format(user5_utf8, 'localhost')
)
# for this one we give password in unicode and check it in utf-8
2013-12-04 13:47:26 +00:00
self._userCreationLoop(
uname=user6,
host='10.0.0.1',
password=' foobar',
new_password=user6_pwd_u,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
# Now check for results
ret = self.run_function(
'mysql.user_exists',
user=user6_utf8,
host='10.0.0.1',
password=user6_pwd_utf8,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
2013-12-12 22:13:20 +00:00
' with unicode password failed').format(user6_utf8, '10.0.0.1')
)
# Final result should be:
# mysql> select Host, User, Password from user where user like 'user%';
2013-12-04 13:31:17 +00:00
# +--------------------+-----------+-------------------------------+
# | User | Host | Password |
# +--------------------+-----------+-------------------------------+
# | user "2'標 | 10.0.0.1 | |
# | user "2'標 | 10.0.0.2 | |
# | user "2'標 | localhost | *3A38A7B94B0(...)60B7AA38FE61 |
# | user "3;,?:@=&/ | % | *AA3B1D4105(...)47EA349E1FD7D |
# | user "3;,?:@=&/ | localhost | |
# | user %--"6 | 10.0.0.1 | *90AE800593(...)E42D17EEA5369 |
# | user '1 | localhost | *4DF33B3B1(...)327877FAB2F4BA |
# | user ``"5 | localhost | *3752E65CD(...)FC6C998B12C376 |
# | user":;,?:@=&/4標 | % | *FC8EF8DBF(...)7478D5CF3DD57C |
# +--------------------+-----------+-------------------------------+
self._chck_userinfo(user=user2,
host='10.0.0.1',
check_user=user2,
check_hash=''
)
self._chck_userinfo(user=user2,
host='10.0.0.2',
check_user=user2,
check_hash=''
)
self._chck_userinfo(user=user2,
host='localhost',
check_user=user2,
check_hash=user2_pwd_hash
)
self._chck_userinfo(user=user3,
host='%',
check_user=user3,
check_hash=user3_pwd_hash
)
self._chck_userinfo(user=user3,
host='localhost',
check_user=user3,
check_hash=''
)
self._chck_userinfo(user=user4,
host='%',
check_user=user4_utf8,
check_hash=user4_pwd_hash
)
self._chck_userinfo(user=user6,
host='10.0.0.1',
check_user=user6_utf8,
check_hash=user6_pwd_hash
)
self._chck_userinfo(user=user1,
host='localhost',
check_user=user1,
check_hash=user1_pwd_hash
)
self._chck_userinfo(user=user5,
host='localhost',
check_user=user5_utf8,
check_hash=user5_pwd_hash
)
# check user_list function
ret = self.run_function(
'mysql.user_list',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertIn({'Host': 'localhost', 'User': user1}, ret)
self.assertIn({'Host': 'localhost', 'User': user2}, ret)
self.assertIn({'Host': '10.0.0.1', 'User': user2}, ret)
self.assertIn({'Host': '10.0.0.2', 'User': user2}, ret)
self.assertIn({'Host': '%', 'User': user3}, ret)
self.assertIn({'Host': 'localhost', 'User': user3}, ret)
self.assertIn({'Host': '%', 'User': user4_utf8}, ret)
self.assertIn({'Host': 'localhost', 'User': user5_utf8}, ret)
self.assertIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)
2013-12-13 03:51:28 +00:00
# And finally, test connections on MySQL with theses users
ret = self.run_function(
'mysql.query',
database='information_schema',
query='SELECT 1',
connection_user=user1,
connection_pass='pwd`\'"1b',
connection_host='localhost'
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' with user \'{0}\': {1}').format(
user1,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
2013-12-12 22:13:20 +00:00
# FIXME: still failing, but works by hand...
# mysql --user="user \"2'標" --password="user \"2'標b" information_schema
# Seems to be a python-mysql library problem with user names containing
# utf8 characters
# @see https://github.com/farcepest/MySQLdb1/issues/40
#import urllib
#ret = self.run_function(
# 'mysql.query',
# database='information_schema',
# query='SELECT 1',
# connection_user=urllib.quote_plus(user2),
# connection_pass=urllib.quote_plus(user2_pwd),
# connection_host='localhost',
# connection_charset='utf8',
# saltenv={"LC_ALL": "en_US.utf8"}
#)
#if not isinstance(ret, dict) or 'results' not in ret:
# raise AssertionError(
# ('Unexpected result while testing connection'
# ' with user \'{0}\': {1}').format(
# user2,
# repr(ret)
# )
# )
#self.assertEqual([['1']], ret['results'])
ret = self.run_function(
'mysql.query',
database='information_schema',
query='SELECT 1',
connection_user=user3,
connection_pass='',
connection_host='localhost',
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' with user \'{0}\': {1}').format(
user3,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# FIXME: Failing
#ret = self.run_function(
# 'mysql.query',
# database='information_schema',
# query='SELECT 1',
# connection_user=user4_utf8,
# connection_pass=user4_pwd,
# connection_host='localhost',
# connection_charset='utf8',
# saltenv={"LC_ALL": "en_US.utf8"}
#)
#if not isinstance(ret, dict) or 'results' not in ret:
# raise AssertionError(
# ('Unexpected result while testing connection'
# ' with user \'{0}\': {1}').format(
# user4_utf8,
# repr(ret)
# )
# )
#self.assertEqual([['1']], ret['results'])
ret = self.run_function(
'mysql.query',
database='information_schema',
query='SELECT 1',
connection_user=user5_utf8,
connection_pass=user5_pwd,
connection_host='localhost',
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' with user \'{0}\': {1}').format(
user5_utf8,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# Teardown by deleting with user_remove
self._chk_remove_user(user=user2,
host='10.0.0.1',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self._chk_remove_user(user=user2,
host='10.0.0.2',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self._chk_remove_user(user=user2,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self._chk_remove_user(user=user3,
host='%',
connection_user=self.user,
connection_pass=self.password,
)
self._chk_remove_user(user=user3,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
)
self._chk_remove_user(user=user4,
host='%',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self._chk_remove_user(user=user6,
host='10.0.0.1',
connection_user=self.user,
connection_pass=self.password,
)
self._chk_remove_user(user=user1,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
)
self._chk_remove_user(user=user5,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
)
# Final verification of the cleanup
ret = self.run_function(
'mysql.user_list',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
self.assertNotIn({'Host': 'localhost', 'User': user1}, ret)
self.assertNotIn({'Host': 'localhost', 'User': user2}, ret)
self.assertNotIn({'Host': '10.0.0.1', 'User': user2}, ret)
self.assertNotIn({'Host': '10.0.0.2', 'User': user2}, ret)
self.assertNotIn({'Host': '%', 'User': user3}, ret)
self.assertNotIn({'Host': 'localhost', 'User': user3}, ret)
self.assertNotIn({'Host': '%', 'User': user4_utf8}, ret)
self.assertNotIn({'Host': 'localhost', 'User': user5_utf8}, ret)
self.assertNotIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlModuleUserGrantTest(ModuleCase, SaltReturnAssertsMixin):
'''
User Creation and connection tests
'''
user = 'root'
password = 'poney'
2013-12-09 19:48:36 +00:00
# yep, theses are valid MySQL db names
# very special chars are _ % and .
testdb1 = 'tes.t\'"saltdb'
testdb2 = 't_st `(:=salt%b)'
testdb3 = 'test `(:=salteeb)'
test_file_query_db = 'test_query'
2013-12-09 19:48:36 +00:00
table1 = 'foo'
table2 = "foo `\'%_bar"
users = {
'user1': {
'name': 'foo',
'pwd': 'bar',
},
'user2': {
2013-12-09 19:48:36 +00:00
'name': 'user ";--,?:&/\\',
2013-12-09 15:27:26 +00:00
'pwd': '";--(),?:@=&/\\',
},
2013-12-09 15:27:26 +00:00
# this is : passwd 標標
'user3': {
2013-12-09 19:48:36 +00:00
'name': 'user( @ )=foobar',
2013-12-09 15:27:26 +00:00
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
},
# this is : user/password containing 標標
'user4': {
'name': 'user \xe6\xa8\x99',
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
},
}
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password, create users
'''
2013-12-09 19:48:36 +00:00
super(MysqlModuleUserGrantTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
2013-12-09 15:27:26 +00:00
# Create some users and a test db
2014-11-21 19:47:43 +00:00
for user, userdef in six.iteritems(self.users):
2013-12-09 15:27:26 +00:00
self._userCreation(uname=userdef['name'], password=userdef['pwd'])
2013-12-09 19:48:36 +00:00
self.run_function(
'mysql.db_create',
name=self.testdb1,
connection_user=self.user,
connection_pass=self.password,
)
self.run_function(
'mysql.db_create',
name=self.testdb2,
connection_user=self.user,
connection_pass=self.password,
)
create_query = ('CREATE TABLE {tblname} ('
2013-12-09 19:48:36 +00:00
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE={engine};'.format(
2013-12-09 19:48:36 +00:00
tblname=mysqlmod.quote_identifier(self.table1),
engine='MYISAM',
))
log.info('Adding table \'%s\'', self.table1)
2013-12-09 19:48:36 +00:00
self.run_function(
'mysql.query',
database=self.testdb2,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
create_query = ('CREATE TABLE {tblname} ('
2013-12-09 19:48:36 +00:00
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE={engine};'.format(
2013-12-09 19:48:36 +00:00
tblname=mysqlmod.quote_identifier(self.table2),
engine='MYISAM',
))
log.info('Adding table \'%s\'', self.table2)
2013-12-09 19:48:36 +00:00
self.run_function(
'mysql.query',
database=self.testdb2,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
@destructiveTest
def tearDown(self):
'''
2013-12-09 15:27:26 +00:00
Removes created users and db
'''
2014-11-21 19:47:43 +00:00
for user, userdef in six.iteritems(self.users):
2013-12-11 13:40:42 +00:00
self._userRemoval(uname=userdef['name'], password=userdef['pwd'])
2013-12-09 19:48:36 +00:00
self.run_function(
'mysql.db_remove',
name=self.testdb1,
connection_user=self.user,
connection_pass=self.password,
)
2013-12-09 19:48:36 +00:00
self.run_function(
'mysql.db_remove',
name=self.testdb2,
connection_user=self.user,
connection_pass=self.password,
)
self.run_function(
'mysql.db_remove',
name=self.test_file_query_db,
connection_user=self.user,
connection_pass=self.password,
)
def _userCreation(self,
uname,
password=None):
'''
Create a test user
'''
self.run_function(
'mysql.user_create',
user=uname,
host='localhost',
password=password,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
def _userRemoval(self,
uname,
password=None):
2013-12-09 15:27:26 +00:00
'''
Removes a test user
'''
self.run_function(
'mysql.user_remove',
user=uname,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
2013-12-09 15:27:26 +00:00
def _addGrantRoutine(self,
2013-12-09 19:48:36 +00:00
grant,
user,
db,
grant_option=False,
escape=True,
2013-12-09 15:27:26 +00:00
**kwargs):
'''
2013-12-09 19:48:36 +00:00
Perform some tests around creation of the given grants
2013-12-09 15:27:26 +00:00
'''
ret = self.run_function(
'mysql.grant_add',
2013-12-09 19:48:36 +00:00
grant=grant,
database=db,
user=user,
grant_option=grant_option,
escape=escape,
2013-12-09 15:27:26 +00:00
**kwargs
)
self.assertEqual(True, ret, ('Calling grant_add on'
' user \'{0}\' and grants \'{1}\' did not return True: {2}').format(
2013-12-09 19:48:36 +00:00
user,
grant,
2013-12-09 15:27:26 +00:00
repr(ret)
))
ret = self.run_function(
'mysql.grant_exists',
grant=grant,
database=db,
user=user,
grant_option=grant_option,
escape=escape,
**kwargs
)
self.assertEqual(True, ret, ('Calling grant_exists on'
' user \'{0}\' and grants \'{1}\' did not return True: {2}').format(
user,
grant,
repr(ret)
))
@destructiveTest
2013-12-04 21:43:27 +00:00
def testGrants(self):
'''
2013-12-04 21:43:27 +00:00
Test user grant methods
'''
2013-12-09 19:48:36 +00:00
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='SELECT, INSERT,UPDATE, CREATE',
2013-12-09 19:48:36 +00:00
user=self.users['user1']['name'],
db=self.testdb1 + '.*',
grant_option=True,
escape=True,
connection_user=self.user,
connection_pass=self.password
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='INSERT, SELECT',
2013-12-09 19:48:36 +00:00
user=self.users['user1']['name'],
db=self.testdb2 + '.' + self.table1,
grant_option=True,
escape=True,
connection_user=self.user,
connection_pass=self.password
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant=' SELECT, UPDATE,DELETE, CREATE TEMPORARY TABLES',
2013-12-09 19:48:36 +00:00
user=self.users['user2']['name'],
db=self.testdb1 + '.*',
grant_option=True,
escape=True,
connection_user=self.user,
connection_pass=self.password
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='select, ALTER,CREATE TEMPORARY TABLES, EXECUTE ',
2013-12-09 19:48:36 +00:00
user=self.users['user3']['name'],
db=self.testdb1 + '.*',
grant_option=True,
escape=True,
connection_user=self.user,
connection_pass=self.password
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='SELECT, INSERT',
2013-12-09 19:48:36 +00:00
user=self.users['user4']['name'],
db=self.testdb2 + '.' + self.table2,
grant_option=False,
escape=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
2013-12-09 19:48:36 +00:00
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='CREATE',
2013-12-09 19:48:36 +00:00
user=self.users['user4']['name'],
db=self.testdb2 + '.*',
grant_option=False,
escape=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='SELECT, INSERT',
user=self.users['user4']['name'],
db=self.testdb2 + '.' + self.table1,
grant_option=False,
escape=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# '' is valid for anonymous users
self._addGrantRoutine(
2013-12-12 22:13:20 +00:00
grant='DELETE',
user='',
db=self.testdb3 + '.*',
grant_option=False,
escape=True,
connection_user=self.user,
connection_pass=self.password
)
# Check result for users
ret = self.run_function(
'mysql.user_grants',
user=self.users['user1']['name'],
host='localhost',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(ret, [
"GRANT USAGE ON *.* TO 'foo'@'localhost'",
('GRANT SELECT, INSERT, UPDATE, CREATE ON '
'`tes.t\'"saltdb`.* TO \'foo\'@\'localhost\' WITH GRANT OPTION'),
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo`"
" TO 'foo'@'localhost' WITH GRANT OPTION")
])
ret = self.run_function(
'mysql.user_grants',
user=self.users['user2']['name'],
host='localhost',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(ret, [
'GRANT USAGE ON *.* TO \'user ";--,?:&/\\\'@\'localhost\'',
('GRANT SELECT, UPDATE, DELETE, CREATE TEMPORARY TABLES ON `tes.t\''
'"saltdb`.* TO \'user ";--,?:&/\\\'@\'localhost\''
' WITH GRANT OPTION')
])
ret = self.run_function(
'mysql.user_grants',
user=self.users['user3']['name'],
host='localhost',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(ret, [
"GRANT USAGE ON *.* TO 'user( @ )=foobar'@'localhost'",
('GRANT SELECT, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON '
'`tes.t\'"saltdb`.* TO \'user( @ )=foobar\'@\'localhost\' '
'WITH GRANT OPTION')
])
ret = self.run_function(
'mysql.user_grants',
user=self.users['user4']['name'],
host='localhost',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertEqual(ret, [
"GRANT USAGE ON *.* TO 'user \xe6\xa8\x99'@'localhost'",
(r"GRANT CREATE ON `t\_st ``(:=salt\%b)`.* TO "
"'user \xe6\xa8\x99'@'localhost'"),
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo ``'%_bar` TO "
"'user \xe6\xa8\x99'@'localhost'"),
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo` TO "
"'user \xe6\xa8\x99'@'localhost'"),
])
ret = self.run_function(
'mysql.user_grants',
user='',
host='localhost',
connection_user=self.user,
2013-12-09 19:48:36 +00:00
connection_pass=self.password
)
self.assertEqual(ret, [
"GRANT USAGE ON *.* TO ''@'localhost'",
"GRANT DELETE ON `test ``(:=salteeb)`.* TO ''@'localhost'"
])
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlModuleFileQueryTest(ModuleCase, SaltReturnAssertsMixin):
'''
Test file query module
'''
user = 'root'
password = 'poney'
testdb = 'test_file_query'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password, create users
'''
super(MysqlModuleFileQueryTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
# Create some users and a test db
self.run_function(
'mysql.db_create',
name=self.testdb,
connection_user=self.user,
connection_pass=self.password,
connection_db='mysql',
)
@destructiveTest
def tearDown(self):
'''
Removes created users and db
'''
self.run_function(
'mysql.db_remove',
name=self.testdb,
connection_user=self.user,
connection_pass=self.password,
connection_db='mysql',
)
@destructiveTest
def test_update_file_query(self):
'''
Test query without any output
'''
ret = self.run_function(
'mysql.file_query',
database=self.testdb,
file_name='salt://mysql/update_query.sql',
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue('query time' in ret)
ret.pop('query time')
self.assertEqual(ret, {'rows affected': 2})
@destructiveTest
def test_select_file_query(self):
'''
Test query with table output
'''
ret = self.run_function(
'mysql.file_query',
database=self.testdb,
file_name='salt://mysql/select_query.sql',
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
expected = {
'rows affected': 5,
'rows returned': 4,
'results': [
[
['2'],
['3'],
['4'],
['5']
]
],
'columns': [
['a']
],
}
self.assertTrue('query time' in ret)
ret.pop('query time')
self.assertEqual(ret, expected)