Fixed some tests, updated a doc.

This commit is contained in:
Dmitry Kuzmenko 2017-12-14 03:01:24 +03:00
parent 31c6a9f187
commit a67fe51a5c
No known key found for this signature in database
GPG Key ID: 4C7CAD30C95651DA
3 changed files with 85 additions and 9 deletions

View File

@ -146,6 +146,7 @@ Optional Dependencies
.. _`ZeroMQ`: http://zeromq.org/
.. _`pyzmq`: https://github.com/zeromq/pyzmq
.. _`msgpack-python`: https://pypi.python.org/pypi/msgpack-python/
.. _`M2Crypto`: https://gitlab.com/m2crypto/m2crypto
.. _`PyCrypto`: https://www.dlitz.net/software/pycrypto/
.. _`YAML`: http://pyyaml.org/
.. _`Jinja2`: http://jinja.pocoo.org/

View File

@ -124,10 +124,10 @@ def gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
priv = '{0}.pem'.format(base)
pub = '{0}.pub'.format(base)
salt.utils.crypt.reinit_crypto()
if HAS_M2:
gen = RSA.gen_key(keysize, 65537)
else:
salt.utils.crypt.reinit_crypto()
gen = RSA.generate(bits=keysize, e=65537)
if os.path.isfile(priv):
# Between first checking and the generation another process has made
@ -139,11 +139,21 @@ def gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
raise IOError('Write access denied to "{0}" for user "{1}".'.format(os.path.abspath(keydir), getpass.getuser()))
cumask = os.umask(0o277)
with salt.utils.files.fopen(priv, 'wb+') as f:
f.write(gen.exportKey('PEM', passphrase))
if HAS_M2:
# if passphrase is empty or None use no cipher
if not passphrase:
gen.save_pem(priv, cipher=None)
else:
gen.save_pem(priv, cipher='des_ede3_cbc', callback=lambda x: six.b(passphrase))
else:
with salt.utils.files.fopen(priv, 'wb+') as f:
f.write(gen.exportKey('PEM', passphrase))
os.umask(cumask)
with salt.utils.files.fopen(pub, 'wb+') as f:
f.write(gen.publickey().exportKey('PEM'))
if HAS_M2:
gen.save_pub_key(pub)
else:
with salt.utils.files.fopen(pub, 'wb+') as f:
f.write(gen.publickey().exportKey('PEM'))
os.chmod(priv, 0o400)
if user:
try:
@ -229,7 +239,7 @@ def verify_signature(pubkey_path, message, signature):
pubkey = get_rsa_pub_key(pubkey_path)
log.debug('salt.crypt.verify_signature: Verifying signature')
if HAS_M2:
md = EVP.MesageDigest('sha1')
md = EVP.MessageDigest('sha1')
md.update(six.b(message))
digest = md.final()
return pubkey.verify(digest, six.b(signature))

View File

@ -13,6 +13,11 @@ import salt.utils.files
from salt import crypt
# third-party libs
try:
import M2Crypto
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.PublicKey import RSA # pylint: disable=unused-import
HAS_PYCRYPTO_RSA = True
@ -85,8 +90,8 @@ SIG = (
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not HAS_PYCRYPTO_RSA, 'pycrypto >= 2.6 is not available')
@skipIf(HAS_M2, 'm2crypto is used by salt.crypt if installed')
class CryptTestCase(TestCase):
def test_gen_keys(self):
with patch.multiple(os, umask=MagicMock(), chmod=MagicMock(),
access=MagicMock(return_value=True)):
@ -122,14 +127,74 @@ class CryptTestCase(TestCase):
def test_sign_message(self):
key = RSA.importKey(PRIVKEY_DATA)
with patch('salt.crypt._get_rsa_key', return_value=key):
with patch('salt.crypt.get_rsa_key', return_value=key):
self.assertEqual(SIG, salt.crypt.sign_message('/keydir/keyname.pem', MSG))
def test_sign_message_with_passphrase(self):
key = RSA.importKey(PRIVKEY_DATA)
with patch('salt.crypt._get_rsa_key', return_value=key):
with patch('salt.crypt.get_rsa_key', return_value=key):
self.assertEqual(SIG, crypt.sign_message('/keydir/keyname.pem', MSG, passphrase='password'))
def test_verify_signature(self):
with patch('salt.utils.files.fopen', mock_open(read_data=PUBKEY_DATA)):
self.assertTrue(crypt.verify_signature('/keydir/keyname.pub', MSG, SIG))
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not HAS_M2, 'm2crypto is not available')
class M2CryptTestCase(TestCase):
@patch('os.umask', MagicMock())
@patch('os.chmod', MagicMock())
@patch('os.access', MagicMock(return_value=True))
def test_gen_keys(self):
with patch('M2Crypto.RSA.RSA.save_pem', MagicMock()) as save_pem:
with patch('M2Crypto.RSA.RSA.save_pub_key', MagicMock()) as save_pub:
with patch('os.path.isfile', return_value=True):
self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048),
'/keydir{0}keyname.pem'.format(os.sep))
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch('os.path.isfile', return_value=False):
self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048),
'/keydir{0}keyname.pem'.format(os.sep))
save_pem.assert_called_once_with('/keydir{0}keyname.pem'.format(os.sep), cipher=None)
save_pub.assert_called_once_with('/keydir{0}keyname.pub'.format(os.sep))
@patch('os.umask', MagicMock())
@patch('os.chmod', MagicMock())
@patch('os.chown', MagicMock())
@patch('os.access', MagicMock(return_value=True))
def test_gen_keys_with_passphrase(self):
with patch('M2Crypto.RSA.RSA.save_pem', MagicMock()) as save_pem:
with patch('M2Crypto.RSA.RSA.save_pub_key', MagicMock()) as save_pub:
with patch('os.path.isfile', return_value=True):
self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048, passphrase='password'),
'/keydir{0}keyname.pem'.format(os.sep))
save_pem.assert_not_called()
save_pub.assert_not_called()
with patch('os.path.isfile', return_value=False):
self.assertEqual(crypt.gen_keys('/keydir', 'keyname', 2048, passphrase='password'),
'/keydir{0}keyname.pem'.format(os.sep))
callback = save_pem.call_args[1]['callback']
save_pem.assert_called_once_with('/keydir{0}keyname.pem'.format(os.sep),
cipher='des_ede3_cbc',
callback=callback)
self.assertEqual(callback(None), b'password')
save_pub.assert_called_once_with('/keydir{0}keyname.pub'.format(os.sep))
def test_sign_message(self):
key = M2Crypto.RSA.load_key_string(PRIVKEY_DATA)
with patch('salt.crypt.get_rsa_key', return_value=key):
self.assertEqual(SIG, salt.crypt.sign_message('/keydir/keyname.pem', MSG))
def test_sign_message_with_passphrase(self):
key = M2Crypto.RSA.load_key_string(PRIVKEY_DATA)
with patch('salt.crypt.get_rsa_key', return_value=key):
self.assertEqual(SIG, crypt.sign_message('/keydir/keyname.pem', MSG, passphrase='password'))
def test_verify_signature(self):
key = M2Crypto.RSA.load_pub_key_bio(M2Crypto.BIO.MemoryBuffer(PUBKEY_DATA.encode()))
with patch('M2Crypto.RSA.load_pub_key', return_value=key):
self.assertTrue(crypt.verify_signature('/keydir/keyname.pub', MSG, SIG))