salt/tests/integration/states/test_mysql_grants.py

295 lines
9.0 KiB
Python

# -*- coding: utf-8 -*-
'''
Tests for the MySQL states
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
from tests.support.mixins import SaltReturnAssertsMixin
# Import salt libs
import salt.utils.path
from salt.ext import six
from salt.modules import mysql as mysqlmod
log = logging.getLogger(__name__)
NO_MYSQL = False
try:
import MySQLdb # pylint: disable=import-error,unused-import
except ImportError:
NO_MYSQL = True
if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlGrantsStateTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the mysql_grants states
'''
user = 'root'
password = 'poney'
# yep, theses are valid MySQL db names
# very special chars are _ % and .
testdb1 = 'tes.t\'"saltdb'
testdb2 = 't_st `(:=salt%b)'
testdb3 = 'test `(:=salteeb)'
table1 = 'foo'
table2 = "foo `\'%_bar"
users = {
'user1': {
'name': 'foo',
'pwd': 'bar',
},
'user2': {
'name': 'user ";--,?:&/\\',
'pwd': '";--(),?:@=&/\\',
},
# this is : passwd 標標
'user3': {
'name': 'user( @ )=foobar',
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
},
# this is : user/password containing 標標
'user4': {
'name': 'user \xe6\xa8\x99',
'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
},
}
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlGrantsStateTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
# Create some users and a test db
for user, userdef in six.iteritems(self.users):
self._userCreation(uname=userdef['name'], password=userdef['pwd'])
self.run_state(
'mysql_database.present',
name=self.testdb1,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
)
self.run_state(
'mysql_database.present',
name=self.testdb2,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
)
create_query = ('CREATE TABLE {tblname} ('
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE={engine};'.format(
tblname=mysqlmod.quote_identifier(self.table1),
engine='MYISAM',
))
log.info('Adding table \'%s\'', self.table1)
self.run_function(
'mysql.query',
database=self.testdb2,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
create_query = ('CREATE TABLE {tblname} ('
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE={engine};'.format(
tblname=mysqlmod.quote_identifier(self.table2),
engine='MYISAM',
))
log.info('Adding table \'%s\'', self.table2)
self.run_function(
'mysql.query',
database=self.testdb2,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
@destructiveTest
def tearDown(self):
'''
Removes created users and db
'''
for user, userdef in six.iteritems(self.users):
self._userRemoval(uname=userdef['name'], password=userdef['pwd'])
self.run_state(
'mysql_database.absent',
name=self.testdb1,
connection_user=self.user,
connection_pass=self.password,
)
self.run_function(
'mysql_database.absent',
name=self.testdb2,
connection_user=self.user,
connection_pass=self.password,
)
def _userCreation(self,
uname,
password=None):
'''
Create a test user
'''
self.run_state(
'mysql_user.present',
name=uname,
host='localhost',
password=password,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
def _userRemoval(self,
uname,
password=None):
'''
Removes a test user
'''
self.run_state(
'mysql_user.absent',
name=uname,
host='localhost',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
def test_grant_present_absent(self):
'''
mysql_database.present
'''
ret = self.run_state(
'mysql_grants.present',
name='grant test 1',
grant='SELECT, INSERT',
database=self.testdb1 + '.*',
user=self.users['user1']['name'],
host='localhost',
grant_option=True,
revoke_first=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)
ret = self.run_state(
'mysql_grants.present',
name='grant test 2',
grant='SELECT, ALTER,CREATE TEMPORARY tables, execute',
database=self.testdb1 + '.*',
user=self.users['user1']['name'],
host='localhost',
grant_option=True,
revoke_first=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)
ret = self.run_state(
'mysql_grants.present',
name='grant test 3',
grant='SELECT, INSERT',
database=self.testdb2 + '.' + self.table2,
user=self.users['user2']['name'],
host='localhost',
grant_option=True,
revoke_first=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)
ret = self.run_state(
'mysql_grants.present',
name='grant test 4',
grant='SELECT, INSERT',
database=self.testdb2 + '.' + self.table2,
user=self.users['user2']['name'],
host='localhost',
grant_option=True,
revoke_first=True,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)
ret = self.run_state(
'mysql_grants.present',
name='grant test 5',
grant='SELECT, UPDATE',
database=self.testdb2 + '.*',
user=self.users['user1']['name'],
host='localhost',
grant_option=True,
revoke_first=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)
ret = self.run_state(
'mysql_grants.absent',
name='grant test 6',
grant='SELECT,update',
database=self.testdb2 + '.*',
user=self.users['user1']['name'],
host='localhost',
grant_option=True,
revoke_first=False,
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
self.assertSaltTrueReturn(ret)