Merge branch 'lyft-boto_secgroup-support-vpc' into develop

This commit is contained in:
Thomas S Hatch 2014-08-05 12:08:20 -06:00
commit 494a17dcb5
2 changed files with 350 additions and 128 deletions

View File

@ -61,7 +61,8 @@ def __virtual__():
return True
def exists(name, region=None, key=None, keyid=None, profile=None):
def exists(name=None, region=None, key=None, keyid=None, profile=None,
vpc_id=None, group_id=None):
'''
Check to see if an security group exists.
@ -72,11 +73,10 @@ def exists(name, region=None, key=None, keyid=None, profile=None):
conn = _get_conn(region, key, keyid, profile)
if not conn:
return False
try:
conn.get_all_security_groups([name])
group = _get_group(conn, name, vpc_id, group_id)
if group:
return True
except boto.exception.BotoServerError as e:
log.debug(e)
else:
return False
@ -105,6 +105,50 @@ def _split_rules(rules):
return split
def _get_group(conn, name=None, vpc_id=None, group_id=None, region=None):
'''
Get a group object given a name, name and vpc_id or group_id. Return a
boto.ec2.securitygroup.SecurityGroup object if the group is found, else
return None.
'''
if name:
if vpc_id is None:
logging.debug('getting group for {0}'.format(name))
group_filter = {'group-name': name}
filtered_groups = conn.get_all_security_groups(filters=group_filter)
# security groups can have the same name if groups exist in both
# EC2-Classic and EC2-VPC
# iterate through groups to ensure we return the EC2-Classic
# security group
for group in filtered_groups:
# a group in EC2-Classic will have vpc_id set to None
if group.vpc_id is None:
return group
return None
elif vpc_id:
logging.debug('getting group for {0} in vpc_id {1}'.format(name, vpc_id))
group_filter = {'group-name': name, 'vpc_id': vpc_id}
filtered_groups = conn.get_all_security_groups(filters=group_filter)
if len(filtered_groups) == 1:
return filtered_groups[0]
else:
return None
else:
return None
elif group_id:
try:
groups = conn.get_all_security_groups(group_ids=[group_id])
except boto.exception.BotoServerError as e:
log.debug(e)
return None
if len(groups) == 1:
return groups[0]
else:
return None
else:
return None
def get_group_id(name, vpc_id=None, region=None, key=None, keyid=None, profile=None):
'''
Get a Group ID given a Group Name or Group Name and VPC ID
@ -112,38 +156,19 @@ def get_group_id(name, vpc_id=None, region=None, key=None, keyid=None, profile=N
CLI example::
salt myminion boto_secgroup.get_group_id mysecgroup
'''
conn = _get_conn(region, key, keyid, profile)
if not conn:
return False
if vpc_id is None:
logging.debug('getting group_id for {0}'.format(name))
group_filter = {'group-name': name}
filtered_groups = conn.get_all_security_groups(filters=group_filter)
# security groups can have the same name if groups exist in both
# EC2-Classic and EC2-VPC
# iterate through groups to ensure we return the EC2-Classic
# security group
for group in filtered_groups:
# a group in EC2-Classic will have vpc_id set to None
if group.vpc_id is None:
return group.id
return False
elif vpc_id:
logging.debug('getting group_id for {0} in vpc_id {1}'.format(name, vpc_id))
group_filter = {'group-name': name, 'vpc_id': vpc_id}
filtered_groups = conn.get_all_security_groups(filters=group_filter)
if len(filtered_groups) == 1:
return filtered_groups[0].id
else:
return False
group = _get_group(conn, name, vpc_id)
if group:
return group.id
else:
return False
def get_config(name=None, group_id=None, region=None, key=None, keyid=None,
profile=None):
profile=None, vpc_id=None):
'''
Get the configuration for a security group.
@ -154,57 +179,50 @@ def get_config(name=None, group_id=None, region=None, key=None, keyid=None,
conn = _get_conn(region, key, keyid, profile)
if not conn:
return None
if not (name or group_id):
sg = _get_group(conn, name, vpc_id, group_id)
if sg:
ret = odict.OrderedDict()
ret['name'] = sg.name
# TODO: add support for vpc_id in return
# ret['vpc_id'] = sg.vpc_id
ret['group_id'] = sg.id
ret['owner_id'] = sg.owner_id
ret['description'] = sg.description
# TODO: add support for tags
_rules = []
for rule in sg.rules:
attrs = ['ip_protocol', 'from_port', 'to_port', 'grants']
_rule = odict.OrderedDict()
for attr in attrs:
val = getattr(rule, attr)
if not val:
continue
if attr == 'grants':
_grants = []
for grant in val:
g_attrs = {'name': 'source_group_name',
'owner_id': 'source_group_owner_id',
'group_id': 'source_group_group_id',
'cidr_ip': 'cidr_ip'}
_grant = odict.OrderedDict()
for g_attr, g_attr_map in g_attrs.iteritems():
g_val = getattr(grant, g_attr)
if not g_val:
continue
_grant[g_attr_map] = g_val
_grants.append(_grant)
_rule['grants'] = _grants
elif attr == 'from_port':
_rule[attr] = int(val)
elif attr == 'to_port':
_rule[attr] = int(val)
else:
_rule[attr] = val
_rules.append(_rule)
ret['rules'] = _split_rules(_rules)
return ret
else:
return None
try:
if name:
sg = conn.get_all_security_groups([name])
else:
sg = conn.get_all_security_groups(group_ids=[group_id])
except boto.exception.BotoServerError as e:
msg = 'Failed to get config for security group {0}.'.format(name)
log.error(msg)
log.debug(e)
return {}
sg = sg[0]
ret = odict.OrderedDict()
ret['name'] = name
ret['group_id'] = sg.id
ret['owner_id'] = sg.owner_id
ret['description'] = sg.description
# TODO: add support for tags
_rules = []
for rule in sg.rules:
attrs = ['ip_protocol', 'from_port', 'to_port', 'grants']
_rule = odict.OrderedDict()
for attr in attrs:
val = getattr(rule, attr)
if not val:
continue
if attr == 'grants':
_grants = []
for grant in val:
g_attrs = {'name': 'source_group_name',
'owner_id': 'source_group_owner_id',
'group_id': 'source_group_group_id',
'cidr_ip': 'cidr_ip'}
_grant = odict.OrderedDict()
for g_attr, g_attr_map in g_attrs.iteritems():
g_val = getattr(grant, g_attr)
if not g_val:
continue
_grant[g_attr_map] = g_val
_grants.append(_grant)
_rule['grants'] = _grants
elif attr == 'from_port':
_rule[attr] = int(val)
elif attr == 'to_port':
_rule[attr] = int(val)
else:
_rule[attr] = val
_rules.append(_rule)
ret['rules'] = _split_rules(_rules)
return ret
def create(name, description, vpc_id=None, region=None, key=None, keyid=None,
@ -229,8 +247,8 @@ def create(name, description, vpc_id=None, region=None, key=None, keyid=None,
return False
def delete(name, group_id=None, region=None, key=None, keyid=None,
profile=None):
def delete(name=None, group_id=None, region=None, key=None, keyid=None,
profile=None, vpc_id=None):
'''
Delete an autoscale group.
@ -241,85 +259,107 @@ def delete(name, group_id=None, region=None, key=None, keyid=None,
conn = _get_conn(region, key, keyid, profile)
if not conn:
return False
deleted = conn.delete_security_group(name, group_id)
if deleted:
log.info('Deleted security group {0}.'.format(name))
return True
group = _get_group(conn, name, vpc_id, group_id)
if group:
deleted = conn.delete_security_group(group_id=group.id)
if deleted:
log.info('Deleted security group {0} with id {1}.'.format(group.name,
group.id))
return True
else:
msg = 'Failed to delete security group {0}.'.format(name)
log.error(msg)
return False
else:
msg = 'Failed to delete security group {0}.'.format(name)
log.error(msg)
log.debug('Security group not found.')
return False
def authorize(name, source_group_name=None,
def authorize(name=None, source_group_name=None,
source_group_owner_id=None, ip_protocol=None,
from_port=None, to_port=None, cidr_ip=None, group_id=None,
source_group_group_id=None, region=None, key=None,
keyid=None, profile=None):
keyid=None, profile=None, vpc_id=None):
'''
Add a new rule to an existing security group.
CLI example::
salt myminion boto_secgroup.authorize mysecgroup ip_protocol=tcp from_port=80 to_port=80 cidr_ip='["10.0.0.0/0", "192.168.0.0/0"]'
salt myminion boto_secgroup.authorize mysecgroup ip_protocol=tcp from_port=80 to_port=80 cidr_ip='['10.0.0.0/8', '192.168.0.0/24']'
'''
conn = _get_conn(region, key, keyid, profile)
if not conn:
return False
try:
added = conn.authorize_security_group(
group_name=name, src_security_group_name=source_group_name,
src_security_group_owner_id=source_group_owner_id,
ip_protocol=ip_protocol, from_port=from_port, to_port=to_port,
cidr_ip=cidr_ip, group_id=group_id,
src_security_group_group_id=source_group_group_id)
if added:
log.info('Added rule to security group {0}.'.format(name))
return True
else:
msg = 'Failed to add rule to security group {0}.'.format(name)
group = _get_group(conn, name, vpc_id, group_id)
if group:
try:
added = conn.authorize_security_group(
src_security_group_name=source_group_name,
src_security_group_owner_id=source_group_owner_id,
ip_protocol=ip_protocol, from_port=from_port, to_port=to_port,
cidr_ip=cidr_ip, group_id=group.id,
src_security_group_group_id=source_group_group_id)
if added:
log.info('Added rule to security group {0} with id {1}'
.format(group.name, group.id))
return True
else:
msg = ('Failed to add rule to security group {0} with id {1}.'
.format(group.name))
log.error(msg)
return False
except boto.exception.EC2ResponseError as e:
log.debug(e)
msg = ('Failed to add rule to security group {0} with id {1}.'
.format(group.name, group.id))
log.error(msg)
return False
except boto.exception.EC2ResponseError as e:
log.debug(e)
msg = 'Failed to add rule to security group {0}.'.format(name)
log.error(msg)
else:
log.debug('Failed to add rule to security group.')
return False
def revoke(name, source_group_name=None,
def revoke(name=None, source_group_name=None,
source_group_owner_id=None, ip_protocol=None,
from_port=None, to_port=None, cidr_ip=None, group_id=None,
source_group_group_id=None, region=None, key=None,
keyid=None, profile=None):
keyid=None, profile=None, vpc_id=None):
'''
Remove a rule from an existing security group.
CLI example::
salt myminion boto_secgroup.revoke mysecgroup ip_protocol=tcp from_port=80 to_port=80 cidr_ip='["10.0.0.0/0", "192.168.0.0/0"]'
salt myminion boto_secgroup.revoke mysecgroup ip_protocol=tcp from_port=80 to_port=80 cidr_ip='10.0.0.0/8'
'''
conn = _get_conn(region, key, keyid, profile)
if not conn:
return False
try:
revoked = conn.revoke_security_group(
group_name=name, src_security_group_name=source_group_name,
src_security_group_owner_id=source_group_owner_id,
ip_protocol=ip_protocol, from_port=from_port, to_port=to_port,
cidr_ip=cidr_ip, group_id=group_id,
src_security_group_group_id=source_group_group_id)
if revoked:
log.info('Removed rule from security group {0}.'.format(name))
return True
else:
msg = 'Failed to remove rule from security group {0}.'.format(name)
group = _get_group(conn, name, vpc_id, group_id)
if group:
try:
revoked = conn.revoke_security_group(
src_security_group_name=source_group_name,
src_security_group_owner_id=source_group_owner_id,
ip_protocol=ip_protocol, from_port=from_port, to_port=to_port,
cidr_ip=cidr_ip, group_id=group.id,
src_security_group_group_id=source_group_group_id)
if revoked:
log.info('Removed rule from security group {0} with id {1}.'
.format(group.name, group.id))
return True
else:
msg = ('Failed to remove rule from security group {0} with id {1}.'
.format(group.name, group.id))
log.error(msg)
return False
except boto.exception.EC2ResponseError as e:
log.debug(e)
msg = ('Failed to remove rule from security group {0} with id {1}.'
.format(group.name, group.id))
log.error(msg)
return False
except boto.exception.EC2ResponseError as e:
log.debug(e)
msg = 'Failed to remove rule from security group {0}.'.format(name)
log.error(msg)
else:
log.debug('Failed to remove rule from security group.')
return False

View File

@ -4,6 +4,7 @@
import random
import string
from collections import OrderedDict
from copy import deepcopy
# import Python Third Party Libs
try:
@ -39,6 +40,11 @@ secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'
conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}
def _random_group_id():
group_id = 'sg-{0:x}'.format(random.randrange(2 ** 32))
return group_id
def _random_group_name():
group_name = 'boto_secgroup-{0}'.format(''.join((random.choice(string.ascii_lowercase)) for char in range(12)))
return group_name
@ -59,10 +65,48 @@ class Boto_SecgroupTestCase(TestCase):
self.assertEqual(boto_secgroup._split_rules(rules), split_rules)
@skipIf(missing_requirements, missing_requirements_msg)
# test can be enabled if running version of moto contains commit id
# https://github.com/spulec/moto/commit/cc0166964371f7b5247a49d45637a8f936ccbe6f
@skipIf(True, 'test skipped because of'
' https://github.com/spulec/moto/issues/152')
@mock_ec2
def test_create_ec2_classic(self):
'''
test of creation of an EC2-Classic security group. The test ensures
that a group was created with the desired name and description
'''
group_name = _random_group_name()
group_description = 'test_create_ec2_classic'
boto_secgroup.create(group_name, group_description, **conn_parameters)
conn = boto.ec2.connect_to_region(region)
group_filter = {'group-name': group_name, 'vpc_id': None}
secgroup_created_group = conn.get_all_security_groups(filters=group_filter)
# when https://github.com/spulec/moto/commit/cc0166964371f7b5247a49d45637a8f936ccbe6f
# released, test should be updated as follows:
# expected_create_result = [group_name, group_description, None]
# secgroup_create_result = [secgroup_created_group[0].name, secgroup_created_group[0].description, secgroup_created_group[0].vpc_id]
expected_create_result = [group_name, group_description]
secgroup_create_result = [secgroup_created_group[0].name, secgroup_created_group[0].description]
self.assertEqual(expected_create_result, secgroup_create_result)
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_create_ec2_vpc(self):
'''
test of creation of an EC2-VPC security group. The test ensures that a
group was created in a given vpc with the desired name and description
'''
group_name = _random_group_name()
group_description = 'test_create_ec2_vpc'
# create a group using boto_secgroup
boto_secgroup.create(group_name, group_description, vpc_id=vpc_id, **conn_parameters)
# confirm that the group actually exists
conn = boto.ec2.connect_to_region(region)
group_filter = {'group-name': group_name, 'vpc-id': vpc_id}
secgroup_created_group = conn.get_all_security_groups(filters=group_filter)
expected_create_result = [group_name, group_description, vpc_id]
secgroup_create_result = [secgroup_created_group[0].name, secgroup_created_group[0].description, secgroup_created_group[0].vpc_id]
self.assertEqual(expected_create_result, secgroup_create_result)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(True, 'test skipped due to error in moto return - fixed in'
' https://github.com/spulec/moto/commit/cc0166964371f7b5247a49d45637a8f936ccbe6f')
@mock_ec2
def test_get_group_id_ec2_classic(self):
'''
@ -83,6 +127,7 @@ class Boto_SecgroupTestCase(TestCase):
**conn_parameters)
self.assertEqual(group_classic.id, retreived_group_id)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(True, 'test skipped because moto does not yet support group'
' filters https://github.com/spulec/moto/issues/154')
@mock_ec2
@ -105,6 +150,143 @@ class Boto_SecgroupTestCase(TestCase):
**conn_parameters)
self.assertEqual(group_vpc.id, retreived_group_id)
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_get_config_single_rule_group_name(self):
'''
tests return of 'config' when given group name. get_config returns an OrderedDict.
'''
group_name = _random_group_name()
ip_protocol = 'tcp'
from_port = 22
to_port = 22
cidr_ip = '0.0.0.0/0'
conn = boto.ec2.connect_to_region(region)
group = conn.create_security_group(name=group_name, description=group_name)
group.authorize(ip_protocol=ip_protocol, from_port=from_port, to_port=to_port, cidr_ip=cidr_ip)
# setup the expected get_config result
expected_get_config_result = OrderedDict([('name', group.name), ('group_id', group.id), ('owner_id', u'111122223333'),
('description', group.description),
('rules', [{'to_port': to_port, 'from_port': from_port,
'ip_protocol': ip_protocol, 'cidr_ip': cidr_ip}])])
secgroup_get_config_result = boto_secgroup.get_config(group_id=group.id, **conn_parameters)
self.assertEqual(expected_get_config_result, secgroup_get_config_result)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(False, 'test skipped due to error in moto return - fixed in'
' https://github.com/spulec/moto/commit/cc0166964371f7b5247a49d45637a8f936ccbe6f')
@mock_ec2
def test_exists_true_name_classic(self):
'''
tests 'true' existence of a group in EC2-Classic when given name
'''
group_name = _random_group_name()
group_description = 'test_exists_true_ec2_classic'
conn = boto.ec2.connect_to_region(region)
group_classic = conn.create_security_group(group_name, group_description)
group_vpc = conn.create_security_group(group_name, group_description, vpc_id=vpc_id)
salt_exists_result = boto_secgroup.exists(name=group_name, **conn_parameters)
self.assertTrue(salt_exists_result)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(True, 'test skipped because moto does not yet support group'
' filters https://github.com/spulec/moto/issues/154')
@mock_ec2
def test_exists_false_name_classic(self):
pass
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_exists_true_name_vpc(self):
'''
tests 'true' existence of a group in EC2-VPC when given name and vpc_id
'''
group_name = _random_group_name()
group_description = 'test_exists_true_ec2_vpc'
conn = boto.ec2.connect_to_region(region)
conn.create_security_group(group_name, group_description, vpc_id=vpc_id)
salt_exists_result = boto_secgroup.exists(name=group_name, vpc_id=vpc_id, **conn_parameters)
self.assertTrue(salt_exists_result)
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_exists_false_name_vpc(self):
'''
tests 'false' existence of a group in vpc when given name and vpc_id
'''
group_name = _random_group_name()
salt_exists_result = boto_secgroup.exists(group_name, vpc_id=vpc_id, **conn_parameters)
self.assertFalse(salt_exists_result)
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_exists_true_group_id(self):
'''
tests 'true' existence of a group when given group_id
'''
group_name = _random_group_name()
group_description = 'test_exists_true_group_id'
conn = boto.ec2.connect_to_region(region)
group = conn.create_security_group(group_name, group_description)
salt_exists_result = boto_secgroup.exists(group_id=group.id, **conn_parameters)
self.assertTrue(salt_exists_result)
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test_exists_false_group_id(self):
'''
tests 'false' existence of a group when given group_id
'''
group_id = _random_group_id()
salt_exists_result = boto_secgroup.exists(group_id=group_id, **conn_parameters)
self.assertFalse(salt_exists_result)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(True, 'test skipped due to error in moto return - fixed in'
' https://github.com/spulec/moto/commit/cc0166964371f7b5247a49d45637a8f936ccbe6f')
@mock_ec2
def test_delete_group_ec2_classic(self):
'''
test deletion of a group in EC2-Classic. Test does the following:
1. creates two groups, in EC2-Classic and one in EC2-VPC
2. saves the group_ids to group_ids_pre_delete
3. removes the group in EC2-VPC
4. saves the group ids of groups to group_ids_post_delete
5. compares the group_ids_pre_delete and group_ids_post_delete lists
to ensure that the correct group was deleted
'''
group_name = _random_group_name()
group_description = 'test_delete_group_ec2_classic'
# create two groups using boto, one in EC2-Classic and one in EC2-VPC
conn = boto.ec2.connect_to_region(region)
group_classic = conn.create_security_group(name=group_name, description=group_description)
group_vpc = conn.create_security_group(name=group_name, description=group_description, vpc_id=vpc_id)
# creates a list of all the existing all_group_ids in an AWS account
all_groups = [group.id for group in conn.get_all_security_groups()]
# removes the EC2-Classic Security Group
deleted = boto_secgroup.delete(name=group_name, **conn_parameters)
expected_groups = deepcopy(all_groups)
expected_groups.remove(group_classic.id)
actual_groups = [group.id for group in conn.get_all_security_groups()]
self.assertEqual(expected_groups, actual_groups)
@skipIf(missing_requirements, missing_requirements_msg)
@skipIf(True, 'test skipped because moto does not yet support group'
' filters https://github.com/spulec/moto/issues/154')
@mock_ec2
def test_delete_group_name_ec2_vpc(self):
pass
@skipIf(missing_requirements, missing_requirements_msg)
@mock_ec2
def test__get_conn_true(self):
'''
tests ensures that _get_conn returns an boto.ec2.connection.EC2Connection object.
'''
conn = boto.ec2.connect_to_region(region)
salt_conn = boto_secgroup._get_conn(**conn_parameters)
self.assertEqual(conn.__class__, salt_conn.__class__)
if __name__ == '__main__':
from integration import run_tests