Merge pull request #8939 from s0undt3ch/develop

Hand merge @religero #8930 with Lint Fixes
This commit is contained in:
Pedro Algarvio 2013-12-02 02:38:15 -08:00
commit c3cb1b2006
3 changed files with 320 additions and 52 deletions

View File

@ -69,7 +69,9 @@ def __check_table(name, table, **connection_args):
if dbc is None:
return {}
cur = dbc.cursor(MySQLdb.cursors.DictCursor)
qry = 'CHECK TABLE `{0}`.`{1}`'.format(name, table)
s_name = quoteIdentifier(name)
s_table = quoteIdentifier(table)
qry = 'CHECK TABLE %(dbname)s.%(dbtable)s' % dict(dbname=s_name, dbtable=s_table)
log.debug('Doing query: {0}'.format(qry))
cur.execute(qry)
results = cur.fetchall()
@ -82,7 +84,9 @@ def __repair_table(name, table, **connection_args):
if dbc is None:
return {}
cur = dbc.cursor(MySQLdb.cursors.DictCursor)
qry = 'REPAIR TABLE `{0}`.`{1}`'.format(name, table)
s_name = quoteIdentifier(name)
s_table = quoteIdentifier(table)
qry = 'REPAIR TABLE %(dbname)s.%(dbtable)s' % dict(dbname=s_name, dbtable=s_table)
log.debug('Doing query: {0}'.format(qry))
cur.execute(qry)
results = cur.fetchall()
@ -95,7 +99,9 @@ def __optimize_table(name, table, **connection_args):
if dbc is None:
return {}
cur = dbc.cursor(MySQLdb.cursors.DictCursor)
qry = 'OPTIMIZE TABLE `{0}`.`{1}`'.format(name, table)
s_name = quoteIdentifier(name)
s_table = quoteIdentifier(table)
qry = 'OPTIMIZE TABLE %(dbname)s.%(dbtable)s' % dict(dbname=s_name, dbtable=s_table)
log.debug('Doing query: {0}'.format(qry))
cur.execute(qry)
results = cur.fetchall()
@ -218,11 +224,17 @@ def _grant_to_tokens(grant):
database=database)
def _quoteIdentifier(identifier):
def quoteIdentifier(identifier):
'''
Return an identifier name (column, table, database, etc) escaped accordingly for MySQL
This means surrounded by "`" charecter and escaping this charater inside.
This means surrounded by "`" character and escaping this charater inside.
CLI Example:
.. code-block:: bash
salt '*' mysql.quoteIdentifier 'foo`bar'
'''
return '`' + identifier.replace('`', '``') + '`'
@ -543,7 +555,7 @@ def db_tables(name, **connection_args):
if dbc is None:
return []
cur = dbc.cursor()
s_name = _quoteIdentifier(name)
s_name = quoteIdentifier(name)
qry = 'SHOW TABLES IN %(dbname)s' % dict(dbname=s_name)
log.debug('Doing query: {0}'.format(qry))
try:
@ -622,7 +634,7 @@ def db_create(name, character_set=None, collate=None, **connection_args):
if dbc is None:
return False
cur = dbc.cursor()
s_name = _quoteIdentifier(name)
s_name = quoteIdentifier(name)
qry = 'CREATE DATABASE %(dbname)s' % dict(dbname=s_name)
args = {}
if character_set is not None:
@ -669,7 +681,7 @@ def db_remove(name, **connection_args):
if dbc is None:
return False
cur = dbc.cursor()
s_name = _quoteIdentifier(name)
s_name = quoteIdentifier(name)
qry = 'DROP DATABASE %(dbname)s;' % dict(dbname=s_name)
log.debug('Doing query: {0}'.format(qry))
try:
@ -1049,6 +1061,7 @@ def db_check(name,
.. code-block:: bash
salt '*' mysql.db_check dbname
salt '*' mysql.db_check dbname dbtable
'''
ret = []
if table is None:

View File

@ -2,6 +2,7 @@
# Import python libs
import os
import logging
from mock import patch, MagicMock
@ -19,6 +20,7 @@ import integration
import salt.utils
from salt.modules import mysql as mysqlmod
log = logging.getLogger(__name__)
NO_MYSQL = False
try:
@ -35,7 +37,6 @@ class MysqlModuleTest(integration.ModuleCase,
password = 'poney'
@destructiveTest
@skipIf(salt.utils.is_windows(), 'not tested on windows yet')
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
@ -69,7 +70,6 @@ class MysqlModuleTest(integration.ModuleCase,
self.skipTest('No MySQL Server running, or no root access on it.')
@destructiveTest
@skipIf(salt.utils.is_windows(), 'not tested on windows yet')
def test_database_creation_level1(self):
'''
Create database, test it exists and remove it
@ -81,7 +81,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
@ -90,7 +90,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# redoing the same should fail
ret = self.run_function(
@ -99,7 +99,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertFalse(ret)
self.assertEqual(False, ret)
# Now remove database
ret = self.run_function(
@ -108,10 +108,9 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
@destructiveTest
@skipIf(salt.utils.is_windows(), 'not tested on windows yet')
def test_database_creation_level2(self):
'''
Same as level1 with strange names and with character set and collate keywords
@ -127,7 +126,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
@ -135,7 +134,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# redoing the same should fail
# even with other character sets or collations
ret = self.run_function(
@ -146,7 +145,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertFalse(ret)
self.assertEqual(False, ret)
# redoing the same should fail
ret = self.run_function(
'mysql.db_create',
@ -156,7 +155,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertFalse(ret)
self.assertEqual(False, ret)
# Now remove database
ret = self.run_function(
'mysql.db_remove',
@ -164,7 +163,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# '''''''
# create
@ -176,7 +175,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
@ -184,7 +183,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# Now remove database
ret = self.run_function(
'mysql.db_remove',
@ -192,7 +191,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# """"""""
# also with collate only
@ -203,7 +202,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
@ -211,7 +210,7 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# Now remove database
ret = self.run_function(
'mysql.db_remove',
@ -219,32 +218,288 @@ class MysqlModuleTest(integration.ModuleCase,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
self.assertEqual(True, ret)
# Unicode
# TODO: Simple accents :
#db_name=u'notamérican'
#ret = self.run_function(
# 'mysql.db_create',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
# test db exists
#ret = self.run_function(
# 'mysql.db_exists',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
# Now remove database
#ret = self.run_function(
# 'mysql.db_remove',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
# TODO: Unicode, currently Failing on :
# UnicodeDecodeError: \'ascii\' codec can\'t decode byte 0xe6 in position 1: ordinal not in range(128)
# something like: '標準語'
#unicode_str=u'\u6a19\u6e96\u8a9e'
#db_name=unicode_str.encode('utf8')
#ret = self.run_function(
# 'mysql.db_create',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
# test db exists
#ret = self.run_function(
# 'mysql.db_exists',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
# Now remove database
#ret = self.run_function(
# 'mysql.db_remove',
# name=db_name,
# connection_user=self.user,
# connection_pass=self.password
#)
#self.assertEqual(True, ret)
@destructiveTest
def test_database_maintenance(self):
'''
Test maintenance operations on a created database
'''
dbname = u"foo'-- `\"'"
# create database
# but first silently try to remove it
# in case of previous tests failures
ret = self.run_function(
'mysql.db_create',
name=u'標準語',
connection_user=self.user,
connection_pass=self.password
'mysql.db_remove',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
ret = self.run_function(
'mysql.db_create',
name=dbname,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
# test db exists
ret = self.run_function(
'mysql.db_exists',
name=u'標準語',
connection_user=self.user,
connection_pass=self.password
'mysql.db_exists',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
# Now remove database
self.assertEqual(True, ret)
# Create 3 tables
tablenames = {'Atable "`1': 'MYISAM', 'Btable \'`2': 'InnoDB', 'Ctable --`3': 'MEMORY'}
for tablename, engine in iter(sorted(tablenames.iteritems())):
# prepare queries
create_query = ('CREATE TABLE %(tblname)s ('
' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
' data VARCHAR(100)) ENGINE=%(engine)s;') % dict(
tblname=mysqlmod.quoteIdentifier(tablename),
engine=engine,
)
insert_query = ('INSERT INTO %(tblname)s (data)'
' VALUES ') % dict(tblname=mysqlmod.quoteIdentifier(tablename))
delete_query = ('DELETE from %(tblname)s'
' order by rand() limit 50;') % dict(tblname=mysqlmod.quoteIdentifier(tablename))
for x in range(100):
insert_query += "('foo"+str(x)+"'),"
insert_query += "('bar');"
# populate database
log.info('Adding table{0!r}'.format(tablename,))
ret = self.run_function(
'mysql.query',
database=dbname,
query=create_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
'Unexpected query result while populating test table {0!r} : {1!r}'.format(
tablename,
ret,
)
)
self.assertEqual(ret['rows affected'], 0)
log.info('Populating table{0!r}'.format(tablename,))
ret = self.run_function(
'mysql.query',
database=dbname,
query=insert_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
'Unexpected query result while populating test table {0!r} : {1!r}'.format(
tablename,
ret,
)
)
self.assertEqual(ret['rows affected'], 101)
log.info('Removing some rows on table{0!r}'.format(tablename,))
ret = self.run_function(
'mysql.query',
database=dbname,
query=delete_query,
connection_user=self.user,
connection_pass=self.password
)
if not isinstance(ret, dict) or 'rows affected' not in ret:
raise AssertionError(
('Unexpected query result while removing rows on test table'
' {0!r} : {1!r}').format(
tablename,
ret,
)
)
self.assertEqual(ret['rows affected'], 50)
# test check/repair/opimize on 1 table
tablename = 'Atable "`1'
ret = self.run_function(
'mysql.db_remove',
name=u'標準語',
connection_user=self.user,
connection_pass=self.password
'mysql.db_check',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
self.assertTrue(ret)
# Note that returned result does not quoteIdentifier of table and db
self.assertEqual(ret, [{'Table': dbname+'.'+tablename, 'Msg_text': 'OK', 'Msg_type': 'status', 'Op': 'check'}])
ret = self.run_function(
'mysql.db_repair',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
# Note that returned result does not quoteIdentifier of table and db
self.assertEqual(ret, [{'Table': dbname+'.'+tablename, 'Msg_text': 'OK', 'Msg_type': 'status', 'Op': 'repair'}])
ret = self.run_function(
'mysql.db_optimize',
name=dbname,
table=tablename,
connection_user=self.user,
connection_pass=self.password
)
# Note that returned result does not quoteIdentifier of table and db
self.assertEqual(ret, [{'Table': dbname+'.'+tablename, 'Msg_text': 'OK', 'Msg_type': 'status', 'Op': 'optimize'}])
# test check/repair/opimize on all tables
ret = self.run_function(
'mysql.db_check',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
expected = []
for tablename, engine in iter(sorted(tablenames.iteritems())):
if engine is 'MEMORY':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': "The storage engine for the table doesn't support check",
'Msg_type': 'note',
'Op': 'check'
}])
else:
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'check'
}])
self.assertEqual(ret, expected)
ret = self.run_function(
'mysql.db_repair',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
expected = []
for tablename, engine in iter(sorted(tablenames.iteritems())):
if engine is 'MYISAM':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'repair'
}])
else:
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': "The storage engine for the table doesn't support repair",
'Msg_type': 'note',
'Op': 'repair'
}])
self.assertEqual(ret, expected)
ret = self.run_function(
'mysql.db_optimize',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
expected = []
for tablename, engine in iter(sorted(tablenames.iteritems())):
if engine is 'MYISAM':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'optimize'
}])
elif engine is 'InnoDB':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': ("Table does not support optimize, "
"doing recreate + analyze instead"),
'Msg_type': 'note',
'Op': 'optimize'
},
{
'Table': dbname+'.'+tablename,
'Msg_text': 'OK',
'Msg_type': 'status',
'Op': 'optimize'
}])
elif engine is 'MEMORY':
expected.append([{
'Table': dbname+'.'+tablename,
'Msg_text': "The storage engine for the table doesn't support optimize",
'Msg_type': 'note',
'Op': 'optimize'
}])
self.assertEqual(ret, expected)
# Teardown, remove database
ret = self.run_function(
'mysql.db_remove',
name=dbname,
connection_user=self.user,
connection_pass=self.password
)
self.assertEqual(True, ret)
if __name__ == '__main__':
from integration import run_tests

View File

@ -79,32 +79,32 @@ class MySQLTestCase(TestCase):
'''
Test MySQL db check function in mysql exec module
'''
self._test_call(mysql.db_check, 'CHECK TABLE `testdb`.`mytable`', 'testdb', 'mytable')
self._test_call(mysql.db_check, 'CHECK TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
def test_db_repair(self):
'''
Test MySQL db repair function in mysql exec module
'''
self._test_call(mysql.db_repair, 'REPAIR TABLE `testdb`.`mytable`', 'testdb', 'mytable')
self._test_call(mysql.db_repair, 'REPAIR TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
def test_db_optimize(self):
'''
Test MySQL db optimize function in mysql exec module
'''
self._test_call(mysql.db_optimize, 'OPTIMIZE TABLE `testdb`.`mytable`', 'testdb', 'mytable')
self._test_call(mysql.db_optimize, 'OPTIMIZE TABLE `test``\'" db`.`my``\'" table`', 'test`\'" db', 'my`\'" table')
def test_db_remove(self):
'''
Test MySQL db remove function in mysql exec module
'''
mysql.db_exists = MagicMock(return_value=True)
self._test_call(mysql.db_remove, 'DROP DATABASE `testdb`;', 'testdb')
self._test_call(mysql.db_remove, 'DROP DATABASE `test``\'" db`;', 'test`\'" db')
def test_db_tables(self):
'''
Test MySQL db_tables function in mysql exec module
'''
self._test_call(mysql.db_tables, 'SHOW TABLES IN `testdb`', 'testdb')
self._test_call(mysql.db_tables, 'SHOW TABLES IN `test``\'" db`', 'test`\'" db')
def test_db_exists(self):
'''
@ -113,9 +113,9 @@ class MySQLTestCase(TestCase):
self._test_call(
mysql.db_exists,
{'sql': 'SHOW DATABASES LIKE %(dbname)s;',
'sql_args': {'dbname': 'testdb'}
'sql_args': {'dbname': 'test`\'" db'}
},
'testdb'
'test`\'" db'
)
def test_db_create(self):
@ -124,9 +124,9 @@ class MySQLTestCase(TestCase):
'''
self._test_call(
mysql.db_create,
{'sql': 'CREATE DATABASE `testdb`;',
{'sql': 'CREATE DATABASE `test``\'" db`;',
'sql_args': {}},
'testdb'
'test`\'" db'
)
def test_user_list(self):