diff --git a/tests/unit/modules/keystone_test.py b/tests/unit/modules/keystone_test.py new file mode 100644 index 0000000000..62d95d2107 --- /dev/null +++ b/tests/unit/modules/keystone_test.py @@ -0,0 +1,886 @@ +# -*- coding: utf-8 -*- +''' + :codeauthor: :email:`Jayesh Kariya ` +''' + +# Import Python Libs +from __future__ import absolute_import + +# Import Salt Testing Libs +from salttesting import TestCase, skipIf +from salttesting.mock import ( + MagicMock, + patch, + NO_MOCK, + NO_MOCK_REASON +) + +from salttesting.helpers import ensure_in_syspath + +ensure_in_syspath('../../') + +# Import Salt Libs +from salt.modules import keystone + +# Globals +keystone.__salt__ = {} + + +class MockEC2(object): + """ + Mock of EC2 class + """ + def __init__(self): + self.access = '' + self.secret = '' + self.tenant_id = '' + self.user_id = '' + self.connection_args = '' + self.profile = '' + + @staticmethod + def create(userid, tenantid): + """ + Mock of create method + """ + cr_ec2 = MockEC2() + cr_ec2.tenant_id = tenantid + cr_ec2.user_id = userid + return cr_ec2 + + def delete(self, userid, accesskey): + """ + Mock of delete method + """ + self.access = accesskey + self.user_id = userid + return True + + @staticmethod + def get(user_id, access, profile, **connection_args): + """ + Mock of get method + """ + cr_ec2 = MockEC2() + cr_ec2.profile = profile + cr_ec2.access = access + cr_ec2.user_id = user_id + cr_ec2.connection_args = connection_args + return cr_ec2 + + @staticmethod + def list(user_id): + """ + Mock of list method + """ + cr_ec2 = MockEC2() + cr_ec2.user_id = user_id + return [cr_ec2] + + +class MockEndpoints(object): + """ + Mock of Endpoints class + """ + def __init__(self): + self.id = '007' + self.region = 'region' + self.adminurl = 'adminurl' + self.internalurl = 'internalurl' + self.publicurl = 'publicurl' + self.service_id = '117' + + @staticmethod + def list(): + """ + Mock of list method + """ + return [MockEndpoints()] + + @staticmethod + def create(region, service_id, publicurl, adminurl, internalurl): + """ + Mock of create method + """ + return (region, service_id, publicurl, adminurl, internalurl) + + @staticmethod + def delete(id): + """ + Mock of delete method + """ + return id + + +class MockServices(object): + """ + Mock of Services class + """ + flag = None + + def __init__(self): + self.id = '117' + self.name = 'iptables' + self.description = 'description' + self.type = 'type' + + @staticmethod + def create(name, service_type, description): + """ + Mock of create method + """ + service = MockServices() + service.id = '005' + service.name = name + service.description = description + service.type = service_type + return service + + def get(self, service_id): + """ + Mock of get method + """ + service = MockServices() + if self.flag == 1: + service.id = 'asd' + return [service] + elif self.flag == 2: + service.id = service_id + return service + return [service] + + def list(self): + """ + Mock of list method + """ + service = MockServices() + if self.flag == 1: + service.id = 'asd' + return [service] + return [service] + + @staticmethod + def delete(service_id): + """ + Mock of delete method + """ + return service_id + + +class MockRoles(object): + """ + Mock of Roles class + """ + flag = None + + def __init__(self): + self.id = '113' + self.name = 'nova' + self.user_id = '446' + self.tenant_id = 'a1a1' + + @staticmethod + def create(name): + """ + Mock of create method + """ + return name + + def get(self, role_id): + """ + Mock of get method + """ + role = MockRoles() + if self.flag == 1: + role.id = None + return role + role.id = role_id + return role + + @staticmethod + def list(): + """ + Mock of list method + """ + return [MockRoles()] + + @staticmethod + def delete(role): + """ + Mock of delete method + """ + return role + + @staticmethod + def add_user_role(user_id, role_id, tenant_id): + """ + Mock of add_user_role method + """ + return (user_id, role_id, tenant_id) + + @staticmethod + def remove_user_role(user_id, role_id, tenant_id): + """ + Mock of remove_user_role method + """ + return (user_id, role_id, tenant_id) + + @staticmethod + def roles_for_user(user, tenant): + """ + Mock of roles_for_user method + """ + role = MockRoles() + role.user_id = user + role.tenant_id = tenant + return [role] + + +class MockTenants(object): + """ + Mock of Tenants class + """ + flag = None + + def __init__(self): + self.id = '446' + self.name = 'nova' + self.description = 'description' + self.enabled = 'True' + + @staticmethod + def create(name, description, enabled): + """ + Mock of create method + """ + tenant = MockTenants() + tenant.name = name + tenant.description = description + tenant.enabled = enabled + return tenant + + def get(self, tenant_id): + """ + Mock of get method + """ + tenant = MockTenants() + if self.flag == 1: + tenant.id = None + return tenant + tenant.id = tenant_id + return tenant + + @staticmethod + def list(): + """ + Mock of list method + """ + return [MockTenants()] + + @staticmethod + def delete(tenant_id): + """ + Mock of delete method + """ + return tenant_id + + +class MockServiceCatalog(object): + """ + Mock of ServiceCatalog class + """ + def __init__(self): + self.id = '446' + self.expires = 'No' + self.user_id = 'admin' + self.tenant_id = 'ae04' + + def get_token(self): + """ + Mock of get_token method + """ + return {'id': self.id, 'expires': self.expires, 'user_id': self.user_id, + 'tenant_id': self.tenant_id} + + +class MockUsers(object): + """ + Mock of Users class + """ + flag = None + + def __init__(self): + self.id = '446' + self.name = 'nova' + self.email = 'salt@saltstack.com' + self.enabled = 'True' + self.tenant_id = 'a1a1' + self.password = 'salt' + + def create(self, name, password, email, tenant_id, enabled): + """ + Mock of create method + """ + user = MockUsers() + user.name = name + user.password = password + user.email = email + user.enabled = enabled + self.tenant_id = tenant_id + return user + + def get(self, user_id): + """ + Mock of get method + """ + user = MockUsers() + if self.flag == 1: + user.id = None + return user + user.id = user_id + return user + + @staticmethod + def list(): + """ + Mock of list method + """ + return [MockUsers()] + + @staticmethod + def delete(user_id): + """ + Mock of delete method + """ + return user_id + + @staticmethod + def update(user, name, email, enabled): + """ + Mock of update method + """ + return (user, name, email, enabled) + + @staticmethod + def update_password(user, password): + """ + Mock of update_password method + """ + return (user, password) + + +class Unauthorized(Exception): + """ + The base exception class for all exceptions. + """ + def __init__(self, message='Test'): + super(Unauthorized, self).__init__(message) + self.msg = message + + +class MockExceptions(object): + """ + Mock of exceptions class + """ + def __init__(self): + self.Unauthorized = Unauthorized + + +class MockKeystoneClient(object): + """ + Mock of keystoneclient module + """ + def __init__(self): + self.exceptions = MockExceptions() + + +class MockClient(object): + """ + Mock of Client class + """ + flag = None + + def __init__(self): + self.ec2 = MockEC2() + self.endpoints = MockEndpoints() + self.services = MockServices() + self.roles = MockRoles() + self.tenants = MockTenants() + self.service_catalog = MockServiceCatalog() + self.users = MockUsers() + + def Client(self, **kwargs): + """ + Mock of Client method + """ + if self.flag == 1: + raise Unauthorized + return True + +keystone.client = MockClient() +keystone.keystoneclient = MockKeystoneClient() + + +@skipIf(NO_MOCK, NO_MOCK_REASON) +@patch('salt.modules.keystone.auth', return_value=MockClient()) +class KeystoneTestCase(TestCase): + ''' + Test cases for salt.modules.keystone + ''' + # 'ec2_credentials_create' function tests: 1 + + def test_ec2_credentials_create(self, mock): + ''' + Test if it create EC2-compatible credentials for user per tenant + ''' + self.assertDictEqual(keystone.ec2_credentials_create(), + {'Error': 'Could not resolve User ID'}) + + self.assertDictEqual(keystone.ec2_credentials_create(user_id='salt'), + {'Error': 'Could not resolve Tenant ID'}) + + self.assertDictEqual(keystone.ec2_credentials_create(user_id='salt', + tenant_id='72278'), + {'access': '', 'tenant_id': '72278', 'secret': '', + 'user_id': 'salt'}) + + # 'ec2_credentials_delete' function tests: 1 + + def test_ec2_credentials_delete(self, mock): + ''' + Test if it delete EC2-compatible credentials + ''' + self.assertDictEqual(keystone.ec2_credentials_delete(), + {'Error': 'Could not resolve User ID'}) + + self.assertEqual(keystone.ec2_credentials_delete(user_id='salt', + access_key='72278'), + 'ec2 key "72278" deleted under user id "salt"') + + # 'ec2_credentials_get' function tests: 1 + + def test_ec2_credentials_get(self, mock): + ''' + Test if it return ec2_credentials for a user + (keystone ec2-credentials-get) + ''' + self.assertDictEqual(keystone.ec2_credentials_get(), + {'Error': 'Unable to resolve user id'}) + + self.assertDictEqual(keystone.ec2_credentials_get(user_id='salt'), + {'Error': 'Access key is required'}) + + self.assertDictEqual(keystone.ec2_credentials_get(user_id='salt', + access='72278', + profile='openstack1'), + {'salt': {'access': '72278', 'secret': '', + 'tenant': '', 'user_id': 'salt'}}) + + # 'ec2_credentials_list' function tests: 1 + + def test_ec2_credentials_list(self, mock): + ''' + Test if it return a list of ec2_credentials + for a specific user (keystone ec2-credentials-list) + ''' + self.assertDictEqual(keystone.ec2_credentials_list(), + {'Error': 'Unable to resolve user id'}) + + self.assertDictEqual(keystone.ec2_credentials_list + (user_id='salt', profile='openstack1'), + {'salt': {'access': '', 'secret': '', + 'tenant_id': '', 'user_id': 'salt'}}) + + # 'endpoint_get' function tests: 1 + + def test_endpoint_get(self, mock): + ''' + Test if it return a specific endpoint (keystone endpoint-get) + ''' + self.assertDictEqual(keystone.endpoint_get('nova', profile='openstack'), + {'Error': 'Could not find the specified service'}) + + ret = {'Error': 'Could not find endpoint for the specified service'} + MockServices.flag = 1 + self.assertDictEqual(keystone.endpoint_get('iptables', + profile='openstack'), ret) + + MockServices.flag = 0 + self.assertDictEqual(keystone.endpoint_get('iptables', + profile='openstack'), + {'adminurl': 'adminurl', 'id': '007', + 'internalurl': 'internalurl', + 'publicurl': 'publicurl', 'region': 'region', + 'service_id': '117'}) + + # 'endpoint_list' function tests: 1 + + def test_endpoint_list(self, mock): + ''' + Test if it return a list of available endpoints + (keystone endpoints-list) + ''' + self.assertDictEqual(keystone.endpoint_list(profile='openstack1'), + {'007': {'adminurl': 'adminurl', 'id': '007', + 'internalurl': 'internalurl', + 'publicurl': 'publicurl', + 'region': 'region', 'service_id': '117'}}) + + # 'endpoint_create' function tests: 1 + + def test_endpoint_create(self, mock): + ''' + Test if it create an endpoint for an Openstack service + ''' + self.assertDictEqual(keystone.endpoint_create('nova'), + {'Error': 'Could not find the specified service'}) + + MockServices.flag = 2 + self.assertDictEqual(keystone.endpoint_create('iptables', + 'http://public/url', + 'http://internal/url', + 'http://adminurl/url'), + {'adminurl': 'adminurl', 'id': '007', + 'internalurl': 'internalurl', + 'publicurl': 'publicurl', 'region': 'region', + 'service_id': '117'}) + + # 'endpoint_delete' function tests: 1 + + def test_endpoint_delete(self, mock): + ''' + Test if it delete an endpoint for an Openstack service + ''' + ret = {'Error': 'Could not find any endpoints for the service'} + self.assertDictEqual(keystone.endpoint_delete('nova'), ret) + + with patch.object(keystone, 'endpoint_get', + MagicMock(side_effect=[{'id': '117'}, None])): + self.assertTrue(keystone.endpoint_delete('iptables')) + + # 'role_create' function tests: 1 + + def test_role_create(self, mock): + ''' + Test if it create named role + ''' + self.assertDictEqual(keystone.role_create('nova'), + {'Error': 'Role "nova" already exists'}) + + self.assertDictEqual(keystone.role_create('iptables'), + {'Error': 'Unable to resolve role id'}) + + # 'role_delete' function tests: 1 + + def test_role_delete(self, mock): + ''' + Test if it delete a role (keystone role-delete) + ''' + self.assertDictEqual(keystone.role_delete(), + {'Error': 'Unable to resolve role id'}) + + self.assertEqual(keystone.role_delete('iptables'), + 'Role ID iptables deleted') + + # 'role_get' function tests: 1 + + def test_role_get(self, mock): + ''' + Test if it return a specific roles (keystone role-get) + ''' + self.assertDictEqual(keystone.role_get(), + {'Error': 'Unable to resolve role id'}) + + self.assertDictEqual(keystone.role_get(name='nova'), + {'nova': {'id': '113', 'name': 'nova'}}) + + # 'role_list' function tests: 1 + + def test_role_list(self, mock): + ''' + Test if it return a list of available roles (keystone role-list) + ''' + self.assertDictEqual(keystone.role_list(), + {'nova': {'id': '113', 'name': 'nova'}}) + + # 'service_create' function tests: 1 + + def test_service_create(self, mock): + ''' + Test if it add service to Keystone service catalog + ''' + MockServices.flag = 2 + self.assertDictEqual(keystone.service_create('nova', 'compute', + 'OpenStack Service'), + {'iptables': {'description': 'description', + 'id': '005', + 'name': 'iptables', + 'type': 'type'}}) + + # 'service_delete' function tests: 1 + + def test_service_delete(self, mock): + ''' + Test if it delete a service from Keystone service catalog + ''' + self.assertEqual(keystone.service_delete('iptables'), + 'Keystone service ID "iptables" deleted') + + # 'service_get' function tests: 1 + + def test_service_get(self, mock): + ''' + Test if it return a list of available services (keystone services-list) + ''' + MockServices.flag = 0 + self.assertDictEqual(keystone.service_get(), + {'Error': 'Unable to resolve service id'}) + + MockServices.flag = 2 + self.assertDictEqual(keystone.service_get(service_id='c965'), + {'iptables': {'description': 'description', + 'id': 'c965', + 'name': 'iptables', + 'type': 'type'}}) + + # 'service_list' function tests: 1 + + def test_service_list(self, mock): + ''' + Test if it return a list of available services (keystone services-list) + ''' + MockServices.flag = 0 + self.assertDictEqual(keystone.service_list(profile='openstack1'), + {'iptables': {'description': 'description', + 'id': '117', 'name': 'iptables', + 'type': 'type'}}) + + # 'tenant_create' function tests: 1 + + def test_tenant_create(self, mock): + ''' + Test if it create a keystone tenant + ''' + self.assertDictEqual(keystone.tenant_create('nova'), + {'nova': {'description': 'description', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + + # 'tenant_delete' function tests: 1 + + def test_tenant_delete(self, mock): + ''' + Test if it delete a tenant (keystone tenant-delete) + ''' + self.assertDictEqual(keystone.tenant_delete(), + {'Error': 'Unable to resolve tenant id'}) + + self.assertEqual(keystone.tenant_delete('nova'), + 'Tenant ID nova deleted') + + # 'tenant_get' function tests: 1 + + def test_tenant_get(self, mock): + ''' + Test if it return a specific tenants (keystone tenant-get) + ''' + self.assertDictEqual(keystone.tenant_get(), + {'Error': 'Unable to resolve tenant id'}) + + self.assertDictEqual(keystone.tenant_get(tenant_id='446'), + {'nova': {'description': 'description', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + + # 'tenant_list' function tests: 1 + + def test_tenant_list(self, mock): + ''' + Test if it return a list of available tenants (keystone tenants-list) + ''' + self.assertDictEqual(keystone.tenant_list(), + {'nova': {'description': 'description', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + + # 'tenant_update' function tests: 1 + + def test_tenant_update(self, mock): + ''' + Test if it update a tenant's information (keystone tenant-update) + ''' + self.assertDictEqual(keystone.tenant_update(), + {'Error': 'Unable to resolve tenant id'}) + + # 'token_get' function tests: 1 + + def test_token_get(self, mock): + ''' + Test if it return the configured tokens (keystone token-get) + ''' + self.assertDictEqual(keystone.token_get(), {'expires': 'No', + 'id': '446', + 'tenant_id': 'ae04', + 'user_id': 'admin'}) + + # 'user_list' function tests: 1 + + def test_user_list(self, mock): + ''' + Test if it return a list of available users (keystone user-list) + ''' + self.assertDictEqual(keystone.user_list(), + {'nova': {'email': 'salt@saltstack.com', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + + # 'user_get' function tests: 1 + + def test_user_get(self, mock): + ''' + Test if it return a specific users (keystone user-get) + ''' + self.assertDictEqual(keystone.user_get(), + {'Error': 'Unable to resolve user id'}) + + self.assertDictEqual(keystone.user_get(user_id='446'), + {'nova': {'email': 'salt@saltstack.com', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + # 'user_create' function tests: 1 + + def test_user_create(self, mock): + ''' + Test if it create a user (keystone user-create) + ''' + self.assertDictEqual(keystone.user_create(name='nova', password='salt', + email='salt@saltstack.com', + tenant_id='a1a1'), + {'nova': {'email': 'salt@saltstack.com', + 'id': '446', 'name': 'nova', + 'enabled': 'True'}}) + + # 'user_delete' function tests: 1 + + def test_user_delete(self, mock): + ''' + Test if it delete a user (keystone user-delete) + ''' + self.assertDictEqual(keystone.user_delete(), + {'Error': 'Unable to resolve user id'}) + + self.assertEqual(keystone.user_delete('nova'), + 'User ID nova deleted') + + # 'user_update' function tests: 1 + + def test_user_update(self, mock): + ''' + Test if it update a user's information (keystone user-update) + ''' + self.assertDictEqual(keystone.user_update(), + {'Error': 'Unable to resolve user id'}) + + self.assertEqual(keystone.user_update('nova'), + 'Info updated for user ID nova') + + # 'user_verify_password' function tests: 1 + + def test_user_verify_password(self, mock): + ''' + Test if it verify a user's password + ''' + mock = MagicMock(return_value='http://127.0.0.1:35357/v2.0') + with patch.dict(keystone.__salt__, {'config.option': mock}): + self.assertDictEqual(keystone.user_verify_password(), + {'Error': 'Unable to resolve user name'}) + + self.assertTrue(keystone.user_verify_password(user_id='446', + name='nova')) + + MockClient.flag = 1 + self.assertFalse(keystone.user_verify_password(user_id='446', + name='nova')) + + # 'user_password_update' function tests: 1 + + def test_user_password_update(self, mock): + ''' + Test if it update a user's password (keystone user-password-update) + ''' + self.assertDictEqual(keystone.user_password_update(), + {'Error': 'Unable to resolve user id'}) + + self.assertEqual(keystone.user_password_update('nova'), + 'Password updated for user ID nova') + + # 'user_role_add' function tests: 1 + + def test_user_role_add(self, mock): + ''' + Test if it add role for user in tenant (keystone user-role-add) + ''' + self.assertEqual(keystone.user_role_add(user='nova', tenant='nova', + role='nova'), + '"nova" role added for user "nova" for "nova" tenant') + + MockRoles.flag = 1 + self.assertDictEqual(keystone.user_role_add(user='nova', tenant='nova', + role='nova'), + {'Error': 'Unable to resolve role id'}) + + MockTenants.flag = 1 + self.assertDictEqual(keystone.user_role_add(user='nova', tenant='nova'), + {'Error': 'Unable to resolve tenant id'}) + + MockUsers.flag = 1 + self.assertDictEqual(keystone.user_role_add(user='nova'), + {'Error': 'Unable to resolve user id'}) + + # 'user_role_remove' function tests: 1 + + def test_user_role_remove(self, mock): + ''' + Test if it add role for user in tenant (keystone user-role-add) + ''' + MockUsers.flag = 1 + self.assertDictEqual(keystone.user_role_remove(user='nova'), + {'Error': 'Unable to resolve user id'}) + + MockUsers.flag = 0 + MockTenants.flag = 1 + self.assertDictEqual(keystone.user_role_remove(user='nova', + tenant='nova'), + {'Error': 'Unable to resolve tenant id'}) + + MockTenants.flag = 0 + MockRoles.flag = 1 + self.assertDictEqual(keystone.user_role_remove(user='nova', + tenant='nova', + role='nova'), + {'Error': 'Unable to resolve role id'}) + + ret = '"nova" role removed for user "nova" under "nova" tenant' + MockRoles.flag = 0 + self.assertEqual(keystone.user_role_remove(user='nova', tenant='nova', + role='nova'), ret) + + # 'user_role_list' function tests: 1 + + def test_user_role_list(self, mock): + ''' + Test if it return a list of available user_roles + (keystone user-roles-list) + ''' + self.assertDictEqual(keystone.user_role_list(user='nova'), + {'Error': 'Unable to resolve user or tenant id'}) + + self.assertDictEqual(keystone.user_role_list(user_name='nova', + tenant_name='nova'), + {'nova': {'id': '113', 'name': 'nova', + 'tenant_id': '446', 'user_id': '446'}}) + + +if __name__ == '__main__': + from integration import run_tests + run_tests(KeystoneTestCase, needs_daemon=False)