salt/tests/unit/modules/boto_vpc_test.py

215 lines
7.1 KiB
Python
Raw Normal View History

2014-07-30 07:29:45 +00:00
# -*- coding: utf-8 -*-
# import Python Third Party Libs
from boto.exception import BotoServerError
from mock import patch
2014-07-30 07:29:45 +00:00
try:
import boto
2014-09-28 16:09:34 +00:00
HAS_BOTO = True
except ImportError:
HAS_BOTO = False
try:
2014-07-30 07:29:45 +00:00
from moto import mock_ec2
2014-09-28 16:09:34 +00:00
HAS_MOTO = True
2014-07-30 07:29:45 +00:00
except ImportError:
HAS_MOTO = False
2014-07-30 07:29:45 +00:00
def mock_ec2(self):
'''
if the mock_ec2 function is not available due to import failure
this replaces the decorated function with stub_function.
Allows boto_vpc unit tests to use the @mock_ec2 decorator
without a "NameError: name 'mock_ec2' is not defined" error.
'''
2014-07-30 07:29:45 +00:00
def stub_function(self):
pass
2014-07-30 07:29:45 +00:00
return stub_function
# Import Python libs
from distutils.version import LooseVersion
2014-07-30 07:29:45 +00:00
# Import Salt Libs
from salt.modules import boto_vpc
# Import Salt Testing Libs
from salttesting import skipIf, TestCase
from salttesting.mock import NO_MOCK, NO_MOCK_REASON
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
2014-07-30 07:29:45 +00:00
# the boto_vpc module relies on the connect_to_region() method
# which was added in boto 2.8.0
# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
required_boto_version = '2.8.0'
2014-07-30 07:29:45 +00:00
region = 'us-east-1'
access_key = 'GKTADJGHEIQSXMKKRBJ08H'
secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'
conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}
cidr_block = '10.0.0.0/24'
2014-07-30 07:29:45 +00:00
def _has_required_boto():
'''
Returns True/False boolean depending on if Boto is installed and correct
version.
'''
if not HAS_BOTO:
return False
elif LooseVersion(boto.__version__) < LooseVersion(required_boto_version):
return False
else:
return True
2014-08-19 16:19:41 +00:00
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(HAS_BOTO is False, 'The boto module must be installed.')
@skipIf(HAS_MOTO is False, 'The moto module must be installed.')
@skipIf(_has_required_boto() is False, 'The boto module must be greater than'
' or equal to version {0}'
.format(required_boto_version))
2014-08-12 22:57:34 +00:00
class BotoVpcTestCase(TestCase):
2014-07-30 07:29:45 +00:00
'''
TestCase for salt.modules.boto_vpc module
'''
2014-09-28 15:31:23 +00:00
conn = None
def _create_vpc(self):
if not self.conn:
self.conn = boto.vpc.connect_to_region(region)
return self.conn.create_vpc(cidr_block)
def _create_subnet(self, vpc_id, cidr_block='10.0.0.0/25'):
if not self.conn:
self.conn = boto.vpc.connect_to_region(region)
return self.conn.create_subnet(vpc_id, cidr_block)
@mock_ec2
2014-07-30 07:29:45 +00:00
def test_get_subnet_association_single_subnet(self):
'''
tests that given multiple subnet ids in the same VPC that the VPC ID is
returned. The test is valuable because it uses a string as an argument
to subnets as opposed to a list.
'''
2014-09-28 15:31:23 +00:00
vpc = self._create_vpc()
subnet = self._create_subnet(vpc.id)
subnet_association = boto_vpc.get_subnet_association(subnets=subnet.id,
2014-09-28 16:09:34 +00:00
**conn_parameters)
2014-09-28 15:31:23 +00:00
self.assertEqual(vpc.id, subnet_association)
2014-07-30 07:29:45 +00:00
@mock_ec2
2014-07-30 07:29:45 +00:00
def test_get_subnet_association_multiple_subnets_same_vpc(self):
'''
tests that given multiple subnet ids in the same VPC that the VPC ID is
returned.
'''
2014-09-28 15:31:23 +00:00
vpc = self._create_vpc()
subnet_a = self._create_subnet(vpc.id, '10.0.0.0/25')
subnet_b = self._create_subnet(vpc.id, '10.0.0.128/25')
subnet_association = boto_vpc.get_subnet_association([subnet_a.id, subnet_b.id],
2014-09-28 16:09:34 +00:00
**conn_parameters)
2014-09-28 15:31:23 +00:00
self.assertEqual(vpc.id, subnet_association)
2014-07-30 07:29:45 +00:00
@mock_ec2
2014-07-30 07:29:45 +00:00
def test_get_subnet_association_multiple_subnets_different_vpc(self):
'''
tests that given multiple subnet ids in different VPCs that False is
returned.
'''
2014-09-28 15:31:23 +00:00
vpc_a = self._create_vpc()
vpc_b = self.conn.create_vpc(cidr_block)
subnet_a = self._create_subnet(vpc_a.id, '10.0.0.0/24')
subnet_b = self._create_subnet(vpc_b.id, '10.0.0.0/24')
2014-07-30 07:29:45 +00:00
subnet_assocation = boto_vpc.get_subnet_association([subnet_a.id, subnet_b.id],
**conn_parameters)
self.assertFalse(subnet_assocation)
@mock_ec2
2014-07-30 07:29:45 +00:00
def test_exists_true(self):
'''
tests True existence of a VPC.
'''
2014-09-28 15:31:23 +00:00
vpc = self._create_vpc()
2014-07-30 07:29:45 +00:00
vpc_exists = boto_vpc.exists(vpc.id, **conn_parameters)
self.assertTrue(vpc_exists)
@mock_ec2
def test_that_when_creating_a_vpc_succeeds_the_create_vpc_method_returns_true(self):
'''
tests True VPC created.
'''
vpc_creation_result = boto_vpc.create(cidr_block, **conn_parameters)
self.assertTrue(vpc_creation_result)
@mock_ec2
def test_that_when_creating_a_vpc_fails_the_create_vpc_method_returns_false(self):
'''
tests False VPC not created.
'''
with patch('moto.ec2.models.VPCBackend.create_vpc', side_effect=BotoServerError(400, 'Mocked error')):
vpc_creation_result = boto_vpc.create(cidr_block, **conn_parameters)
self.assertFalse(vpc_creation_result)
@mock_ec2
def test_that_when_deleting_an_existing_vpc_the_delete_vpc_method_returns_true(self):
vpc = self._create_vpc()
vpc_deletion_result = boto_vpc.delete(vpc.id, **conn_parameters)
self.assertTrue(vpc_deletion_result)
@mock_ec2
2014-09-28 15:59:52 +00:00
def test_that_when_deleting_a_non_existent_vpc_the_delete_vpc_method_returns_false(self):
vpc_deletion_result = boto_vpc.delete('1234', **conn_parameters)
self.assertFalse(vpc_deletion_result)
@mock_ec2
def test_that_when_creating_a_subnet_succeeds_the_create_subnet_method_returns_true(self):
2014-09-28 15:31:23 +00:00
vpc = self._create_vpc()
subnet_creation_result = boto_vpc.create_subnet(vpc.id, '10.0.0.0/24', **conn_parameters)
self.assertTrue(subnet_creation_result)
@mock_ec2
def test_that_when_creating_a_subnet_fails_the_create_subnet_method_returns_false(self):
2014-09-28 15:31:23 +00:00
vpc = self._create_vpc()
with patch('moto.ec2.models.SubnetBackend.create_subnet', side_effect=BotoServerError(400, 'Mocked error')):
subnet_creation_result = boto_vpc.create_subnet(vpc.id, '10.0.0.0/24', **conn_parameters)
self.assertFalse(subnet_creation_result)
@mock_ec2
def test_that_when_deleting_an_existing_subnet_the_delete_subnet_method_returns_true(self):
vpc = self._create_vpc()
subnet = self._create_subnet(vpc.id)
subnet_deletion_result = boto_vpc.delete_subnet(subnet.id, **conn_parameters)
self.assertTrue(subnet_deletion_result)
@mock_ec2
def test_that_when_deleting_a_non_existent_subnet_the_delete_vpc_method_returns_false(self):
subnet_deletion_result = boto_vpc.delete_subnet('1234', **conn_parameters)
self.assertFalse(subnet_deletion_result)
2014-09-28 16:09:34 +00:00
2014-07-30 07:29:45 +00:00
if __name__ == '__main__':
from integration import run_tests
2014-08-12 22:57:34 +00:00
run_tests(BotoVpcTestCase, needs_daemon=False)