mirror of
https://github.com/valitydev/salt.git
synced 2024-11-09 01:36:48 +00:00
1757 lines
60 KiB
Python
1757 lines
60 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Import python libs
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
import logging
|
|
|
|
# Import Salt Testing libs
|
|
from tests.support.case import ModuleCase
|
|
from tests.support.unit import skipIf
|
|
from tests.support.helpers import destructiveTest
|
|
from tests.support.mixins import SaltReturnAssertsMixin
|
|
|
|
# Import salt libs
|
|
import salt.utils.path
|
|
from salt.modules import mysql as mysqlmod
|
|
|
|
# Import 3rd-party libs
|
|
from salt.ext import six
|
|
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
NO_MYSQL = False
|
|
try:
|
|
import MySQLdb # pylint: disable=import-error,unused-import
|
|
except Exception:
|
|
NO_MYSQL = True
|
|
|
|
if not salt.utils.path.which('mysqladmin'):
|
|
NO_MYSQL = True
|
|
|
|
|
|
@skipIf(
|
|
NO_MYSQL,
|
|
'Please install MySQL bindings and a MySQL Server before running'
|
|
'MySQL integration tests.'
|
|
)
|
|
class MysqlModuleDbTest(ModuleCase, SaltReturnAssertsMixin):
|
|
'''
|
|
Module testing database creation on a real MySQL Server.
|
|
'''
|
|
|
|
user = 'root'
|
|
password = 'poney'
|
|
|
|
@destructiveTest
|
|
def setUp(self):
|
|
'''
|
|
Test presence of MySQL server, enforce a root password
|
|
'''
|
|
super(MysqlModuleDbTest, self).setUp()
|
|
NO_MYSQL_SERVER = True
|
|
# now ensure we know the mysql root password
|
|
# one of theses two at least should work
|
|
ret1 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
ret2 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' --password="'
|
|
+ self.password
|
|
+ '" flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
key, value = ret2.popitem()
|
|
if value['result']:
|
|
NO_MYSQL_SERVER = False
|
|
else:
|
|
self.skipTest('No MySQL Server running, or no root access on it.')
|
|
|
|
def _db_creation_loop(self,
|
|
db_name,
|
|
returning_name,
|
|
test_conn=False,
|
|
**kwargs):
|
|
'''
|
|
Used in db testCase, create, check exists, check in list and removes.
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name=db_name,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(
|
|
True,
|
|
ret,
|
|
'Problem while creating db for db name: \'{0}\''.format(db_name)
|
|
)
|
|
# test db exists
|
|
ret = self.run_function(
|
|
'mysql.db_exists',
|
|
name=db_name,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(
|
|
True,
|
|
ret,
|
|
'Problem while testing db exists for db name: \'{0}\''.format(db_name)
|
|
)
|
|
# List db names to ensure db is created with the right utf8 string
|
|
ret = self.run_function(
|
|
'mysql.db_list',
|
|
**kwargs
|
|
)
|
|
if not isinstance(ret, list):
|
|
raise AssertionError(
|
|
('Unexpected query result while retrieving databases list'
|
|
' \'{0}\' for \'{1}\' test').format(
|
|
ret,
|
|
db_name
|
|
)
|
|
)
|
|
self.assertIn(
|
|
returning_name,
|
|
ret,
|
|
('Problem while testing presence of db name in db lists'
|
|
' for db name: \'{0}\' in list \'{1}\'').format(
|
|
db_name,
|
|
ret
|
|
))
|
|
|
|
if test_conn:
|
|
# test connections on database with root user
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database=db_name,
|
|
query='SELECT 1',
|
|
**kwargs
|
|
)
|
|
if not isinstance(ret, dict) or 'results' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected result while testing connection'
|
|
' on database : {0}').format(
|
|
repr(db_name)
|
|
)
|
|
)
|
|
self.assertEqual([['1']], ret['results'])
|
|
|
|
# Now remove database
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name=db_name,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(
|
|
True,
|
|
ret,
|
|
'Problem while removing db for db name: \'{0}\''.format(db_name)
|
|
)
|
|
|
|
@destructiveTest
|
|
def test_database_creation_level1(self):
|
|
'''
|
|
Create database, test presence, then drop db. All theses with complex names.
|
|
'''
|
|
# name with space
|
|
db_name = 'foo 1'
|
|
self._db_creation_loop(db_name=db_name,
|
|
returning_name=db_name,
|
|
test_conn=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
|
|
# ```````
|
|
# create
|
|
# also with character_set and collate only
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name='foo`2',
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
# test db exists
|
|
ret = self.run_function(
|
|
'mysql.db_exists',
|
|
name='foo`2',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
# redoing the same should fail
|
|
# even with other character sets or collations
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name='foo`2',
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(False, ret)
|
|
# redoing the same should fail
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name='foo`2',
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(False, ret)
|
|
# Now remove database
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name='foo`2',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
|
|
# '''''''
|
|
# create
|
|
# also with character_set only
|
|
db_name = "foo'3"
|
|
self._db_creation_loop(db_name=db_name,
|
|
returning_name=db_name,
|
|
test_conn=True,
|
|
character_set='utf8',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
|
|
# """"""""
|
|
# also with collate only
|
|
db_name = 'foo"4'
|
|
self._db_creation_loop(db_name=db_name,
|
|
returning_name=db_name,
|
|
test_conn=True,
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# fuzzy
|
|
db_name = '<foo` --"5>'
|
|
self._db_creation_loop(db_name=db_name,
|
|
returning_name=db_name,
|
|
test_conn=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
|
|
@destructiveTest
|
|
def test_mysql_dbname_character_percent(self):
|
|
'''
|
|
Play with the '%' character problems
|
|
|
|
This character should be escaped in the form '%%' on queries, but only
|
|
when theses queries have arguments. It is also a special character
|
|
in LIKE SQL queries. Finally it is used to indicate query arguments.
|
|
'''
|
|
db_name1 = "foo%1_"
|
|
db_name2 = "foo%12"
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name=db_name1,
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name=db_name2,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name=db_name1,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
ret = self.run_function(
|
|
'mysql.db_exists',
|
|
name=db_name1,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(False, ret)
|
|
ret = self.run_function(
|
|
'mysql.db_exists',
|
|
name=db_name2,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name=db_name2,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
|
|
@destructiveTest
|
|
def test_database_creation_utf8(self):
|
|
'''
|
|
Test support of utf8 in database names
|
|
'''
|
|
# Simple accents : using utf8 string
|
|
db_name_unicode = u'notam\xe9rican'
|
|
# same as 'notamérican' because of file encoding
|
|
# but ensure it on this test
|
|
db_name_utf8 = 'notam\xc3\xa9rican'
|
|
# FIXME: MySQLdb problems on conn strings containing
|
|
# utf-8 on user name of db name prevent conn test
|
|
self._db_creation_loop(db_name=db_name_utf8,
|
|
returning_name=db_name_utf8,
|
|
test_conn=False,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# test unicode entry will also return utf8 name
|
|
self._db_creation_loop(db_name=db_name_unicode,
|
|
returning_name=db_name_utf8,
|
|
test_conn=False,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# Using more complex unicode characters:
|
|
db_name_unicode = u'\u6a19\u6e96\u8a9e'
|
|
# same as '標準語' because of file encoding
|
|
# but ensure it on this test
|
|
db_name_utf8 = '\xe6\xa8\x99\xe6\xba\x96\xe8\xaa\x9e'
|
|
self._db_creation_loop(db_name=db_name_utf8,
|
|
returning_name=db_name_utf8,
|
|
test_conn=False,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# test unicode entry will also return utf8 name
|
|
self._db_creation_loop(db_name=db_name_unicode,
|
|
returning_name=db_name_utf8,
|
|
test_conn=False,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
|
|
@destructiveTest
|
|
def test_database_maintenance(self):
|
|
'''
|
|
Test maintenance operations on a created database
|
|
'''
|
|
dbname = u"foo%'-- `\"'"
|
|
# create database
|
|
# but first silently try to remove it
|
|
# in case of previous tests failures
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.db_create',
|
|
name=dbname,
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
# test db exists
|
|
ret = self.run_function(
|
|
'mysql.db_exists',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
# Create 3 tables
|
|
tablenames = {'A%table "`1': 'MYISAM',
|
|
'B%table \'`2': 'InnoDB',
|
|
'Ctable --`3': 'MEMORY'
|
|
}
|
|
for tablename, engine in sorted(six.iteritems(tablenames)):
|
|
# prepare queries
|
|
create_query = ('CREATE TABLE {tblname} ('
|
|
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
|
|
' data VARCHAR(100)) ENGINE={engine};'.format(
|
|
tblname=mysqlmod.quote_identifier(tablename),
|
|
engine=engine,
|
|
))
|
|
insert_query = ('INSERT INTO {tblname} (data)'
|
|
' VALUES '.format(
|
|
tblname=mysqlmod.quote_identifier(tablename)
|
|
))
|
|
delete_query = ('DELETE from {tblname}'
|
|
' order by rand() limit 50;'.format(
|
|
tblname=mysqlmod.quote_identifier(tablename)
|
|
))
|
|
for x in range(100):
|
|
insert_query += "('foo"+six.text_type(x)+"'),"
|
|
insert_query += "('bar');"
|
|
|
|
# populate database
|
|
log.info('Adding table \'%s\'', tablename)
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database=dbname,
|
|
query=create_query,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
if not isinstance(ret, dict) or 'rows affected' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected query result while populating test table'
|
|
' \'{0}\' : \'{1}\'').format(
|
|
tablename,
|
|
ret,
|
|
)
|
|
)
|
|
self.assertEqual(ret['rows affected'], 0)
|
|
log.info('Populating table \'%s\'', tablename)
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database=dbname,
|
|
query=insert_query,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
if not isinstance(ret, dict) or 'rows affected' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected query result while populating test table'
|
|
' \'{0}\' : \'{1}\'').format(
|
|
tablename,
|
|
ret,
|
|
)
|
|
)
|
|
self.assertEqual(ret['rows affected'], 101)
|
|
log.info('Removing some rows on table\'%s\'', tablename)
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database=dbname,
|
|
query=delete_query,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
if not isinstance(ret, dict) or 'rows affected' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected query result while removing rows on test table'
|
|
' \'{0}\' : \'{1}\'').format(
|
|
tablename,
|
|
ret,
|
|
)
|
|
)
|
|
self.assertEqual(ret['rows affected'], 50)
|
|
# test check/repair/opimize on 1 table
|
|
tablename = 'A%table "`1'
|
|
ret = self.run_function(
|
|
'mysql.db_check',
|
|
name=dbname,
|
|
table=tablename,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Note that returned result does not quote_identifier of table and db
|
|
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'check'}]
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.db_repair',
|
|
name=dbname,
|
|
table=tablename,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Note that returned result does not quote_identifier of table and db
|
|
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'repair'}]
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.db_optimize',
|
|
name=dbname,
|
|
table=tablename,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Note that returned result does not quote_identifier of table and db
|
|
self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'optimize'}]
|
|
)
|
|
|
|
# test check/repair/opimize on all tables
|
|
ret = self.run_function(
|
|
'mysql.db_check',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
expected = []
|
|
for tablename, engine in sorted(six.iteritems(tablenames)):
|
|
if engine is 'MEMORY':
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': ("The storage engine for the table doesn't"
|
|
" support check"),
|
|
'Msg_type': 'note',
|
|
'Op': 'check'
|
|
}])
|
|
else:
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'check'
|
|
}])
|
|
self.assertEqual(ret, expected)
|
|
|
|
ret = self.run_function(
|
|
'mysql.db_repair',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
expected = []
|
|
for tablename, engine in sorted(six.iteritems(tablenames)):
|
|
if engine is 'MYISAM':
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'repair'
|
|
}])
|
|
else:
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': ("The storage engine for the table doesn't"
|
|
" support repair"),
|
|
'Msg_type': 'note',
|
|
'Op': 'repair'
|
|
}])
|
|
self.assertEqual(ret, expected)
|
|
|
|
ret = self.run_function(
|
|
'mysql.db_optimize',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
|
|
expected = []
|
|
for tablename, engine in sorted(six.iteritems(tablenames)):
|
|
if engine is 'MYISAM':
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'optimize'
|
|
}])
|
|
elif engine is 'InnoDB':
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': ("Table does not support optimize, "
|
|
"doing recreate + analyze instead"),
|
|
'Msg_type': 'note',
|
|
'Op': 'optimize'
|
|
},
|
|
{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': 'OK',
|
|
'Msg_type': 'status',
|
|
'Op': 'optimize'
|
|
}])
|
|
elif engine is 'MEMORY':
|
|
expected.append([{
|
|
'Table': dbname+'.'+tablename,
|
|
'Msg_text': ("The storage engine for the table doesn't"
|
|
" support optimize"),
|
|
'Msg_type': 'note',
|
|
'Op': 'optimize'
|
|
}])
|
|
self.assertEqual(ret, expected)
|
|
# Teardown, remove database
|
|
ret = self.run_function(
|
|
'mysql.db_remove',
|
|
name=dbname,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret)
|
|
|
|
|
|
@skipIf(
|
|
NO_MYSQL,
|
|
'Please install MySQL bindings and a MySQL Server before running'
|
|
'MySQL integration tests.'
|
|
)
|
|
class MysqlModuleUserTest(ModuleCase, SaltReturnAssertsMixin):
|
|
'''
|
|
User Creation and connection tests
|
|
'''
|
|
|
|
user = 'root'
|
|
password = 'poney'
|
|
|
|
@destructiveTest
|
|
def setUp(self):
|
|
'''
|
|
Test presence of MySQL server, enforce a root password
|
|
'''
|
|
super(MysqlModuleUserTest, self).setUp()
|
|
NO_MYSQL_SERVER = True
|
|
# now ensure we know the mysql root password
|
|
# one of theses two at least should work
|
|
ret1 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
ret2 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' --password="'
|
|
+ self.password
|
|
+ '" flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
key, value = ret2.popitem()
|
|
if value['result']:
|
|
NO_MYSQL_SERVER = False
|
|
else:
|
|
self.skipTest('No MySQL Server running, or no root access on it.')
|
|
|
|
def _userCreationLoop(self,
|
|
uname,
|
|
host,
|
|
password=None,
|
|
new_password=None,
|
|
new_password_hash=None,
|
|
**kwargs):
|
|
'''
|
|
Perform some tests around creation of the given user
|
|
'''
|
|
# First silently remove it, in case of
|
|
ret = self.run_function(
|
|
'mysql.user_remove',
|
|
user=uname,
|
|
host=host,
|
|
**kwargs
|
|
)
|
|
# creation
|
|
ret = self.run_function(
|
|
'mysql.user_create',
|
|
user=uname,
|
|
host=host,
|
|
password=password,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(True, ret, ('Calling user_create on'
|
|
' user \'{0}\' did not return True: {1}').format(
|
|
uname,
|
|
repr(ret)
|
|
))
|
|
# double creation failure
|
|
ret = self.run_function(
|
|
'mysql.user_create',
|
|
user=uname,
|
|
host=host,
|
|
password=password,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(False, ret, ('Calling user_create a second time on'
|
|
' user \'{0}\' did not return False: {1}').format(
|
|
uname,
|
|
repr(ret)
|
|
))
|
|
# Alter password
|
|
if new_password is not None or new_password_hash is not None:
|
|
ret = self.run_function(
|
|
'mysql.user_chpass',
|
|
user=uname,
|
|
host=host,
|
|
password=new_password,
|
|
password_hash=new_password_hash,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Calling user_chpass on'
|
|
' user \'{0}\' did not return True: {1}').format(
|
|
uname,
|
|
repr(ret)
|
|
))
|
|
|
|
def _chck_userinfo(self, user, host, check_user, check_hash):
|
|
'''
|
|
Internal routine to check user_info returned results
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.user_info',
|
|
user=user,
|
|
host=host,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
if not isinstance(ret, dict):
|
|
raise AssertionError(
|
|
'Unexpected result while retrieving user_info for '
|
|
'\'{0}\''.format(user)
|
|
)
|
|
self.assertEqual(ret['Host'], host)
|
|
self.assertEqual(ret['Password'], check_hash)
|
|
self.assertEqual(ret['User'], check_user)
|
|
|
|
def _chk_remove_user(self, user, host, **kwargs):
|
|
'''
|
|
Internal routine to check user_remove
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.user_remove',
|
|
user=user,
|
|
host=host,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(True, ret, ('Assertion failed while removing user'
|
|
' \'{0}\' on host \'{1}\': {2}').format(
|
|
user,
|
|
host,
|
|
repr(ret)
|
|
))
|
|
|
|
@destructiveTest
|
|
def test_user_management(self):
|
|
'''
|
|
Test various users creation settings
|
|
'''
|
|
# Create users with rights on this database
|
|
# and rights on other databases
|
|
user1 = "user '1"
|
|
user1_pwd = 'pwd`\'"1b'
|
|
user1_pwd_hash = '*4DF33B3B12E43384677050A818327877FAB2F4BA'
|
|
# this is : user "2'標
|
|
user2 = 'user "2\'\xe6\xa8\x99'
|
|
user2_pwd = 'user "2\'\xe6\xa8\x99b'
|
|
user2_pwd_hash = '*3A38A7B94B024B983687BB9B44FB60B7AA38FE61'
|
|
user3 = 'user "3;,?:@=&/'
|
|
user3_pwd = 'user "3;,?:@=&/'
|
|
user3_pwd_hash = '*AA3B1D4105A45D381C23A5C221C47EA349E1FD7D'
|
|
# this is : user ":=;4標 in unicode instead of utf-8
|
|
# if unicode char is counted as 1 char we hit the max user
|
|
# size (16)
|
|
user4 = u'user":;,?:@=&/4\u6a19'
|
|
user4_utf8 = 'user":;,?:@=&/4\xe6\xa8\x99'
|
|
user4_pwd = 'user "4;,?:@=&/'
|
|
user4_pwd_hash = '*FC8EF8DBF27628E4E113359F8E7478D5CF3DD57C'
|
|
user5 = u'user ``"5'
|
|
user5_utf8 = 'user ``"5'
|
|
# this is 標標標\
|
|
user5_pwd = '\xe6\xa8\x99\xe6\xa8\x99\\'
|
|
# this is password('標標\\')
|
|
user5_pwd_hash = '*3752E65CDD8751AF8D889C62CFFC6C998B12C376'
|
|
user6 = u'user %--"6'
|
|
user6_utf8 = 'user %--"6'
|
|
# this is : --'"% SIX標b
|
|
user6_pwd_u = u' --\'"% SIX\u6a19b'
|
|
user6_pwd_utf8 = ' --\'"% SIX\xe6\xa8\x99b'
|
|
# this is password(' --\'"% SIX標b')
|
|
user6_pwd_hash = '*90AE800593E2D407CD9E28CCAFBE42D17EEA5369'
|
|
self._userCreationLoop(
|
|
uname=user1,
|
|
host='localhost',
|
|
password='pwd`\'"1',
|
|
new_password='pwd`\'"1b',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Now check for results
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user1,
|
|
host='localhost',
|
|
password=user1_pwd,
|
|
password_hash=None,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' existence failed').format(user1, 'localhost')
|
|
)
|
|
|
|
self._userCreationLoop(
|
|
uname=user2,
|
|
host='localhost',
|
|
password=None,
|
|
# this is his name hash : user "2'標
|
|
password_hash='*EEF6F854748ACF841226BB1C2422BEC70AE7F1FF',
|
|
# and this is the same with a 'b' added
|
|
new_password_hash=user2_pwd_hash,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# user2 can connect from other places with other password
|
|
self._userCreationLoop(
|
|
uname=user2,
|
|
host='10.0.0.1',
|
|
allow_passwordless=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self._userCreationLoop(
|
|
uname=user2,
|
|
host='10.0.0.2',
|
|
allow_passwordless=True,
|
|
unix_socket=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# Now check for results
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user2,
|
|
host='localhost',
|
|
password=None,
|
|
password_hash=user2_pwd_hash,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' failed').format(user2, 'localhost')
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user2,
|
|
host='10.0.0.1',
|
|
allow_passwordless=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' without password failed').format(user2, '10.0.0.1')
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user2,
|
|
host='10.0.0.2',
|
|
allow_passwordless=True,
|
|
unix_socket=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' without password failed').format(user2, '10.0.0.2')
|
|
)
|
|
|
|
# Empty password is not passwordless (or is it a bug?)
|
|
self._userCreationLoop(
|
|
uname=user3,
|
|
host='localhost',
|
|
password='',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# user 3 on another host with a password
|
|
self._userCreationLoop(
|
|
uname=user3,
|
|
host='%',
|
|
password='foo',
|
|
new_password=user3_pwd,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Now check for results
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user3,
|
|
host='localhost',
|
|
password='',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' without empty password failed').format(user3, 'localhost')
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user3,
|
|
host='%',
|
|
password=user3_pwd,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' with password failed').format(user3, '%')
|
|
)
|
|
|
|
# check unicode name, and password > password_hash
|
|
self._userCreationLoop(
|
|
uname=user4,
|
|
host='%',
|
|
password=user4_pwd,
|
|
# this is password('foo')
|
|
password_hash='*F3A2A51A9B0F2BE2468926B4132313728C250DBF',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# Now check for results
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user4_utf8,
|
|
host='%',
|
|
password=user4_pwd,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' with password take from password and not password_hash'
|
|
' failed').format(user4_utf8, '%')
|
|
)
|
|
self._userCreationLoop(
|
|
uname=user5,
|
|
host='localhost',
|
|
password='\xe6\xa8\x99\xe6\xa8\x99',
|
|
new_password=user5_pwd,
|
|
unix_socket=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user5_utf8,
|
|
host='localhost',
|
|
password=user5_pwd,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' with utf8 password failed').format(user5_utf8, 'localhost')
|
|
)
|
|
# for this one we give password in unicode and check it in utf-8
|
|
self._userCreationLoop(
|
|
uname=user6,
|
|
host='10.0.0.1',
|
|
password=' foobar',
|
|
new_password=user6_pwd_u,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
# Now check for results
|
|
ret = self.run_function(
|
|
'mysql.user_exists',
|
|
user=user6_utf8,
|
|
host='10.0.0.1',
|
|
password=user6_pwd_utf8,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertEqual(True, ret, ('Testing final user \'{0}\' on host \'{1}\''
|
|
' with unicode password failed').format(user6_utf8, '10.0.0.1')
|
|
)
|
|
# Final result should be:
|
|
# mysql> select Host, User, Password from user where user like 'user%';
|
|
# +--------------------+-----------+-------------------------------+
|
|
# | User | Host | Password |
|
|
# +--------------------+-----------+-------------------------------+
|
|
# | user "2'標 | 10.0.0.1 | |
|
|
# | user "2'標 | 10.0.0.2 | |
|
|
# | user "2'標 | localhost | *3A38A7B94B0(...)60B7AA38FE61 |
|
|
# | user "3;,?:@=&/ | % | *AA3B1D4105(...)47EA349E1FD7D |
|
|
# | user "3;,?:@=&/ | localhost | |
|
|
# | user %--"6 | 10.0.0.1 | *90AE800593(...)E42D17EEA5369 |
|
|
# | user '1 | localhost | *4DF33B3B1(...)327877FAB2F4BA |
|
|
# | user ``"5 | localhost | *3752E65CD(...)FC6C998B12C376 |
|
|
# | user":;,?:@=&/4標 | % | *FC8EF8DBF(...)7478D5CF3DD57C |
|
|
# +--------------------+-----------+-------------------------------+
|
|
self._chck_userinfo(user=user2,
|
|
host='10.0.0.1',
|
|
check_user=user2,
|
|
check_hash=''
|
|
)
|
|
self._chck_userinfo(user=user2,
|
|
host='10.0.0.2',
|
|
check_user=user2,
|
|
check_hash=''
|
|
)
|
|
self._chck_userinfo(user=user2,
|
|
host='localhost',
|
|
check_user=user2,
|
|
check_hash=user2_pwd_hash
|
|
)
|
|
self._chck_userinfo(user=user3,
|
|
host='%',
|
|
check_user=user3,
|
|
check_hash=user3_pwd_hash
|
|
)
|
|
self._chck_userinfo(user=user3,
|
|
host='localhost',
|
|
check_user=user3,
|
|
check_hash=''
|
|
)
|
|
self._chck_userinfo(user=user4,
|
|
host='%',
|
|
check_user=user4_utf8,
|
|
check_hash=user4_pwd_hash
|
|
)
|
|
self._chck_userinfo(user=user6,
|
|
host='10.0.0.1',
|
|
check_user=user6_utf8,
|
|
check_hash=user6_pwd_hash
|
|
)
|
|
self._chck_userinfo(user=user1,
|
|
host='localhost',
|
|
check_user=user1,
|
|
check_hash=user1_pwd_hash
|
|
)
|
|
self._chck_userinfo(user=user5,
|
|
host='localhost',
|
|
check_user=user5_utf8,
|
|
check_hash=user5_pwd_hash
|
|
)
|
|
# check user_list function
|
|
ret = self.run_function(
|
|
'mysql.user_list',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertIn({'Host': 'localhost', 'User': user1}, ret)
|
|
self.assertIn({'Host': 'localhost', 'User': user2}, ret)
|
|
self.assertIn({'Host': '10.0.0.1', 'User': user2}, ret)
|
|
self.assertIn({'Host': '10.0.0.2', 'User': user2}, ret)
|
|
self.assertIn({'Host': '%', 'User': user3}, ret)
|
|
self.assertIn({'Host': 'localhost', 'User': user3}, ret)
|
|
self.assertIn({'Host': '%', 'User': user4_utf8}, ret)
|
|
self.assertIn({'Host': 'localhost', 'User': user5_utf8}, ret)
|
|
self.assertIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)
|
|
|
|
# And finally, test connections on MySQL with theses users
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database='information_schema',
|
|
query='SELECT 1',
|
|
connection_user=user1,
|
|
connection_pass='pwd`\'"1b',
|
|
connection_host='localhost'
|
|
)
|
|
if not isinstance(ret, dict) or 'results' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected result while testing connection'
|
|
' with user \'{0}\': {1}').format(
|
|
user1,
|
|
repr(ret)
|
|
)
|
|
)
|
|
self.assertEqual([['1']], ret['results'])
|
|
|
|
# FIXME: still failing, but works by hand...
|
|
# mysql --user="user \"2'標" --password="user \"2'標b" information_schema
|
|
# Seems to be a python-mysql library problem with user names containing
|
|
# utf8 characters
|
|
# @see https://github.com/farcepest/MySQLdb1/issues/40
|
|
#import urllib
|
|
#ret = self.run_function(
|
|
# 'mysql.query',
|
|
# database='information_schema',
|
|
# query='SELECT 1',
|
|
# connection_user=urllib.quote_plus(user2),
|
|
# connection_pass=urllib.quote_plus(user2_pwd),
|
|
# connection_host='localhost',
|
|
# connection_charset='utf8',
|
|
# saltenv={"LC_ALL": "en_US.utf8"}
|
|
#)
|
|
#if not isinstance(ret, dict) or 'results' not in ret:
|
|
# raise AssertionError(
|
|
# ('Unexpected result while testing connection'
|
|
# ' with user \'{0}\': {1}').format(
|
|
# user2,
|
|
# repr(ret)
|
|
# )
|
|
# )
|
|
#self.assertEqual([['1']], ret['results'])
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database='information_schema',
|
|
query='SELECT 1',
|
|
connection_user=user3,
|
|
connection_pass='',
|
|
connection_host='localhost',
|
|
)
|
|
if not isinstance(ret, dict) or 'results' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected result while testing connection'
|
|
' with user \'{0}\': {1}').format(
|
|
user3,
|
|
repr(ret)
|
|
)
|
|
)
|
|
self.assertEqual([['1']], ret['results'])
|
|
# FIXME: Failing
|
|
#ret = self.run_function(
|
|
# 'mysql.query',
|
|
# database='information_schema',
|
|
# query='SELECT 1',
|
|
# connection_user=user4_utf8,
|
|
# connection_pass=user4_pwd,
|
|
# connection_host='localhost',
|
|
# connection_charset='utf8',
|
|
# saltenv={"LC_ALL": "en_US.utf8"}
|
|
#)
|
|
#if not isinstance(ret, dict) or 'results' not in ret:
|
|
# raise AssertionError(
|
|
# ('Unexpected result while testing connection'
|
|
# ' with user \'{0}\': {1}').format(
|
|
# user4_utf8,
|
|
# repr(ret)
|
|
# )
|
|
# )
|
|
#self.assertEqual([['1']], ret['results'])
|
|
ret = self.run_function(
|
|
'mysql.query',
|
|
database='information_schema',
|
|
query='SELECT 1',
|
|
connection_user=user5_utf8,
|
|
connection_pass=user5_pwd,
|
|
connection_host='localhost',
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
if not isinstance(ret, dict) or 'results' not in ret:
|
|
raise AssertionError(
|
|
('Unexpected result while testing connection'
|
|
' with user \'{0}\': {1}').format(
|
|
user5_utf8,
|
|
repr(ret)
|
|
)
|
|
)
|
|
self.assertEqual([['1']], ret['results'])
|
|
|
|
# Teardown by deleting with user_remove
|
|
self._chk_remove_user(user=user2,
|
|
host='10.0.0.1',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self._chk_remove_user(user=user2,
|
|
host='10.0.0.2',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self._chk_remove_user(user=user2,
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self._chk_remove_user(user=user3,
|
|
host='%',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self._chk_remove_user(user=user3,
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self._chk_remove_user(user=user4,
|
|
host='%',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self._chk_remove_user(user=user6,
|
|
host='10.0.0.1',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self._chk_remove_user(user=user1,
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self._chk_remove_user(user=user5,
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
# Final verification of the cleanup
|
|
ret = self.run_function(
|
|
'mysql.user_list',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
self.assertNotIn({'Host': 'localhost', 'User': user1}, ret)
|
|
self.assertNotIn({'Host': 'localhost', 'User': user2}, ret)
|
|
self.assertNotIn({'Host': '10.0.0.1', 'User': user2}, ret)
|
|
self.assertNotIn({'Host': '10.0.0.2', 'User': user2}, ret)
|
|
self.assertNotIn({'Host': '%', 'User': user3}, ret)
|
|
self.assertNotIn({'Host': 'localhost', 'User': user3}, ret)
|
|
self.assertNotIn({'Host': '%', 'User': user4_utf8}, ret)
|
|
self.assertNotIn({'Host': 'localhost', 'User': user5_utf8}, ret)
|
|
self.assertNotIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)
|
|
|
|
|
|
@skipIf(
|
|
NO_MYSQL,
|
|
'Please install MySQL bindings and a MySQL Server before running'
|
|
'MySQL integration tests.'
|
|
)
|
|
class MysqlModuleUserGrantTest(ModuleCase, SaltReturnAssertsMixin):
|
|
'''
|
|
User Creation and connection tests
|
|
'''
|
|
|
|
user = 'root'
|
|
password = 'poney'
|
|
# yep, theses are valid MySQL db names
|
|
# very special chars are _ % and .
|
|
testdb1 = 'tes.t\'"saltdb'
|
|
testdb2 = 't_st `(:=salt%b)'
|
|
testdb3 = 'test `(:=salteeb)'
|
|
test_file_query_db = 'test_query'
|
|
table1 = 'foo'
|
|
table2 = "foo `\'%_bar"
|
|
users = {
|
|
'user1': {
|
|
'name': 'foo',
|
|
'pwd': 'bar',
|
|
},
|
|
'user2': {
|
|
'name': 'user ";--,?:&/\\',
|
|
'pwd': '";--(),?:@=&/\\',
|
|
},
|
|
# this is : passwd 標標
|
|
'user3': {
|
|
'name': 'user( @ )=foobar',
|
|
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
|
|
},
|
|
# this is : user/password containing 標標
|
|
'user4': {
|
|
'name': 'user \xe6\xa8\x99',
|
|
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
|
|
},
|
|
}
|
|
|
|
@destructiveTest
|
|
def setUp(self):
|
|
'''
|
|
Test presence of MySQL server, enforce a root password, create users
|
|
'''
|
|
super(MysqlModuleUserGrantTest, self).setUp()
|
|
NO_MYSQL_SERVER = True
|
|
# now ensure we know the mysql root password
|
|
# one of theses two at least should work
|
|
ret1 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
ret2 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' --password="'
|
|
+ self.password
|
|
+ '" flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
key, value = ret2.popitem()
|
|
if value['result']:
|
|
NO_MYSQL_SERVER = False
|
|
else:
|
|
self.skipTest('No MySQL Server running, or no root access on it.')
|
|
# Create some users and a test db
|
|
for user, userdef in six.iteritems(self.users):
|
|
self._userCreation(uname=userdef['name'], password=userdef['pwd'])
|
|
self.run_function(
|
|
'mysql.db_create',
|
|
name=self.testdb1,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self.run_function(
|
|
'mysql.db_create',
|
|
name=self.testdb2,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
create_query = ('CREATE TABLE {tblname} ('
|
|
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
|
|
' data VARCHAR(100)) ENGINE={engine};'.format(
|
|
tblname=mysqlmod.quote_identifier(self.table1),
|
|
engine='MYISAM',
|
|
))
|
|
log.info('Adding table \'%s\'', self.table1)
|
|
self.run_function(
|
|
'mysql.query',
|
|
database=self.testdb2,
|
|
query=create_query,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
create_query = ('CREATE TABLE {tblname} ('
|
|
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
|
|
' data VARCHAR(100)) ENGINE={engine};'.format(
|
|
tblname=mysqlmod.quote_identifier(self.table2),
|
|
engine='MYISAM',
|
|
))
|
|
log.info('Adding table \'%s\'', self.table2)
|
|
self.run_function(
|
|
'mysql.query',
|
|
database=self.testdb2,
|
|
query=create_query,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
|
|
@destructiveTest
|
|
def tearDown(self):
|
|
'''
|
|
Removes created users and db
|
|
'''
|
|
for user, userdef in six.iteritems(self.users):
|
|
self._userRemoval(uname=userdef['name'], password=userdef['pwd'])
|
|
self.run_function(
|
|
'mysql.db_remove',
|
|
name=self.testdb1,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self.run_function(
|
|
'mysql.db_remove',
|
|
name=self.testdb2,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
self.run_function(
|
|
'mysql.db_remove',
|
|
name=self.test_file_query_db,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
)
|
|
|
|
def _userCreation(self,
|
|
uname,
|
|
password=None):
|
|
'''
|
|
Create a test user
|
|
'''
|
|
self.run_function(
|
|
'mysql.user_create',
|
|
user=uname,
|
|
host='localhost',
|
|
password=password,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
|
|
def _userRemoval(self,
|
|
uname,
|
|
password=None):
|
|
'''
|
|
Removes a test user
|
|
'''
|
|
self.run_function(
|
|
'mysql.user_remove',
|
|
user=uname,
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8',
|
|
saltenv={"LC_ALL": "en_US.utf8"}
|
|
)
|
|
|
|
def _addGrantRoutine(self,
|
|
grant,
|
|
user,
|
|
db,
|
|
grant_option=False,
|
|
escape=True,
|
|
**kwargs):
|
|
'''
|
|
Perform some tests around creation of the given grants
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.grant_add',
|
|
grant=grant,
|
|
database=db,
|
|
user=user,
|
|
grant_option=grant_option,
|
|
escape=escape,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(True, ret, ('Calling grant_add on'
|
|
' user \'{0}\' and grants \'{1}\' did not return True: {2}').format(
|
|
user,
|
|
grant,
|
|
repr(ret)
|
|
))
|
|
ret = self.run_function(
|
|
'mysql.grant_exists',
|
|
grant=grant,
|
|
database=db,
|
|
user=user,
|
|
grant_option=grant_option,
|
|
escape=escape,
|
|
**kwargs
|
|
)
|
|
self.assertEqual(True, ret, ('Calling grant_exists on'
|
|
' user \'{0}\' and grants \'{1}\' did not return True: {2}').format(
|
|
user,
|
|
grant,
|
|
repr(ret)
|
|
))
|
|
|
|
@destructiveTest
|
|
def testGrants(self):
|
|
'''
|
|
Test user grant methods
|
|
'''
|
|
self._addGrantRoutine(
|
|
grant='SELECT, INSERT,UPDATE, CREATE',
|
|
user=self.users['user1']['name'],
|
|
db=self.testdb1 + '.*',
|
|
grant_option=True,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self._addGrantRoutine(
|
|
grant='INSERT, SELECT',
|
|
user=self.users['user1']['name'],
|
|
db=self.testdb2 + '.' + self.table1,
|
|
grant_option=True,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self._addGrantRoutine(
|
|
grant=' SELECT, UPDATE,DELETE, CREATE TEMPORARY TABLES',
|
|
user=self.users['user2']['name'],
|
|
db=self.testdb1 + '.*',
|
|
grant_option=True,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self._addGrantRoutine(
|
|
grant='select, ALTER,CREATE TEMPORARY TABLES, EXECUTE ',
|
|
user=self.users['user3']['name'],
|
|
db=self.testdb1 + '.*',
|
|
grant_option=True,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self._addGrantRoutine(
|
|
grant='SELECT, INSERT',
|
|
user=self.users['user4']['name'],
|
|
db=self.testdb2 + '.' + self.table2,
|
|
grant_option=False,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8'
|
|
)
|
|
self._addGrantRoutine(
|
|
grant='CREATE',
|
|
user=self.users['user4']['name'],
|
|
db=self.testdb2 + '.*',
|
|
grant_option=False,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8'
|
|
)
|
|
self._addGrantRoutine(
|
|
grant='SELECT, INSERT',
|
|
user=self.users['user4']['name'],
|
|
db=self.testdb2 + '.' + self.table1,
|
|
grant_option=False,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8'
|
|
)
|
|
# '' is valid for anonymous users
|
|
self._addGrantRoutine(
|
|
grant='DELETE',
|
|
user='',
|
|
db=self.testdb3 + '.*',
|
|
grant_option=False,
|
|
escape=True,
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
# Check result for users
|
|
ret = self.run_function(
|
|
'mysql.user_grants',
|
|
user=self.users['user1']['name'],
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(ret, [
|
|
"GRANT USAGE ON *.* TO 'foo'@'localhost'",
|
|
('GRANT SELECT, INSERT, UPDATE, CREATE ON '
|
|
'`tes.t\'"saltdb`.* TO \'foo\'@\'localhost\' WITH GRANT OPTION'),
|
|
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo`"
|
|
" TO 'foo'@'localhost' WITH GRANT OPTION")
|
|
])
|
|
|
|
ret = self.run_function(
|
|
'mysql.user_grants',
|
|
user=self.users['user2']['name'],
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(ret, [
|
|
'GRANT USAGE ON *.* TO \'user ";--,?:&/\\\'@\'localhost\'',
|
|
('GRANT SELECT, UPDATE, DELETE, CREATE TEMPORARY TABLES ON `tes.t\''
|
|
'"saltdb`.* TO \'user ";--,?:&/\\\'@\'localhost\''
|
|
' WITH GRANT OPTION')
|
|
])
|
|
|
|
ret = self.run_function(
|
|
'mysql.user_grants',
|
|
user=self.users['user3']['name'],
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(ret, [
|
|
"GRANT USAGE ON *.* TO 'user( @ )=foobar'@'localhost'",
|
|
('GRANT SELECT, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON '
|
|
'`tes.t\'"saltdb`.* TO \'user( @ )=foobar\'@\'localhost\' '
|
|
'WITH GRANT OPTION')
|
|
])
|
|
|
|
ret = self.run_function(
|
|
'mysql.user_grants',
|
|
user=self.users['user4']['name'],
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_charset='utf8'
|
|
)
|
|
self.assertEqual(ret, [
|
|
"GRANT USAGE ON *.* TO 'user \xe6\xa8\x99'@'localhost'",
|
|
(r"GRANT CREATE ON `t\_st ``(:=salt\%b)`.* TO "
|
|
"'user \xe6\xa8\x99'@'localhost'"),
|
|
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo ``'%_bar` TO "
|
|
"'user \xe6\xa8\x99'@'localhost'"),
|
|
("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo` TO "
|
|
"'user \xe6\xa8\x99'@'localhost'"),
|
|
])
|
|
|
|
ret = self.run_function(
|
|
'mysql.user_grants',
|
|
user='',
|
|
host='localhost',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertEqual(ret, [
|
|
"GRANT USAGE ON *.* TO ''@'localhost'",
|
|
"GRANT DELETE ON `test ``(:=salteeb)`.* TO ''@'localhost'"
|
|
])
|
|
|
|
|
|
@skipIf(
|
|
NO_MYSQL,
|
|
'Please install MySQL bindings and a MySQL Server before running'
|
|
'MySQL integration tests.'
|
|
)
|
|
class MysqlModuleFileQueryTest(ModuleCase, SaltReturnAssertsMixin):
|
|
'''
|
|
Test file query module
|
|
'''
|
|
|
|
user = 'root'
|
|
password = 'poney'
|
|
testdb = 'test_file_query'
|
|
|
|
@destructiveTest
|
|
def setUp(self):
|
|
'''
|
|
Test presence of MySQL server, enforce a root password, create users
|
|
'''
|
|
super(MysqlModuleFileQueryTest, self).setUp()
|
|
NO_MYSQL_SERVER = True
|
|
# now ensure we know the mysql root password
|
|
# one of theses two at least should work
|
|
ret1 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
ret2 = self.run_state(
|
|
'cmd.run',
|
|
name='mysqladmin --host="localhost" -u '
|
|
+ self.user
|
|
+ ' --password="'
|
|
+ self.password
|
|
+ '" flush-privileges password "'
|
|
+ self.password
|
|
+ '"'
|
|
)
|
|
key, value = ret2.popitem()
|
|
if value['result']:
|
|
NO_MYSQL_SERVER = False
|
|
else:
|
|
self.skipTest('No MySQL Server running, or no root access on it.')
|
|
# Create some users and a test db
|
|
self.run_function(
|
|
'mysql.db_create',
|
|
name=self.testdb,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_db='mysql',
|
|
)
|
|
|
|
@destructiveTest
|
|
def tearDown(self):
|
|
'''
|
|
Removes created users and db
|
|
'''
|
|
self.run_function(
|
|
'mysql.db_remove',
|
|
name=self.testdb,
|
|
connection_user=self.user,
|
|
connection_pass=self.password,
|
|
connection_db='mysql',
|
|
)
|
|
|
|
@destructiveTest
|
|
def test_update_file_query(self):
|
|
'''
|
|
Test query without any output
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.file_query',
|
|
database=self.testdb,
|
|
file_name='salt://mysql/update_query.sql',
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
self.assertTrue('query time' in ret)
|
|
ret.pop('query time')
|
|
self.assertEqual(ret, {'rows affected': 2})
|
|
|
|
@destructiveTest
|
|
def test_select_file_query(self):
|
|
'''
|
|
Test query with table output
|
|
'''
|
|
ret = self.run_function(
|
|
'mysql.file_query',
|
|
database=self.testdb,
|
|
file_name='salt://mysql/select_query.sql',
|
|
character_set='utf8',
|
|
collate='utf8_general_ci',
|
|
connection_user=self.user,
|
|
connection_pass=self.password
|
|
)
|
|
expected = {
|
|
'rows affected': 5,
|
|
'rows returned': 4,
|
|
'results': [
|
|
[
|
|
['2'],
|
|
['3'],
|
|
['4'],
|
|
['5']
|
|
]
|
|
],
|
|
'columns': [
|
|
['a']
|
|
],
|
|
}
|
|
self.assertTrue('query time' in ret)
|
|
ret.pop('query time')
|
|
self.assertEqual(ret, expected)
|