Modernize boto_sqs execution and state modules

Headline changes:
- Move to boto3
- Use result-style error handling
- Optimize number of API calls:
  - Set attributes at creation time
  - Set all attributes in one API call
- Show diff when attribute changes detected
This commit is contained in:
Aneesh Agrawal 2017-07-28 20:47:56 +00:00
parent a2a7c475c1
commit c4f5888c85
3 changed files with 355 additions and 189 deletions

View File

@ -7,7 +7,7 @@ Connection module for Amazon SQS
:configuration: This module accepts explicit sqs credentials but can also utilize
IAM roles assigned to the instance through Instance Profiles. Dynamic
credentials are then automatically obtained from AWS API and no further
configuration is necessary. More Information available at:
configuration is necessary. More information available at:
.. code-block:: text
@ -39,44 +39,69 @@ Connection module for Amazon SQS
key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
region: us-east-1
:depends: boto
:depends: boto3
'''
# keep lint from choking on _get_conn and _cache_id
#pylint: disable=E0602
# pylint: disable=E0602
from __future__ import absolute_import
# Import Python libs
import logging
import json
# Import Salt libs
import salt.ext.six as six
from salt.ext.six import string_types
from salt.ext.six.moves.urllib.parse import urlparse as _urlparse # pylint: disable=import-error,no-name-in-module
log = logging.getLogger(__name__)
__func_alias__ = {
'list_': 'list',
}
# Import third party libs
try:
# pylint: disable=unused-import
import boto
import boto.sqs
import boto3
import botocore
# pylint: enable=unused-import
logging.getLogger('boto').setLevel(logging.CRITICAL)
HAS_BOTO = True
logging.getLogger('boto3').setLevel(logging.CRITICAL)
HAS_BOTO3 = True
except ImportError:
HAS_BOTO = False
from salt.ext.six import string_types
HAS_BOTO3 = False
def __virtual__():
'''
Only load if boto libraries exist.
Only load if boto3 libraries exist.
'''
if not HAS_BOTO:
return (False, 'The boto_sqs module could not be loaded: boto libraries not found')
__utils__['boto.assign_funcs'](__name__, 'sqs', pack=__salt__)
if not HAS_BOTO3:
return (False, 'The boto_sqs module could not be loaded: boto3 libraries not found')
__utils__['boto3.assign_funcs'](__name__, 'sqs', pack=__salt__)
return True
def _preprocess_attributes(attributes):
'''
Pre-process incoming queue attributes before setting them
'''
if isinstance(attributes, string_types):
attributes = json.loads(attributes)
def stringified(val):
# Some attributes take full json policy documents, but they take them
# as json strings. Convert the value back into a json string.
if isinstance(val, dict):
return json.dumps(val)
return val
return dict(
(attr, stringified(val)) for attr, val in six.iteritems(attributes)
)
def exists(name, region=None, key=None, keyid=None, profile=None):
'''
Check to see if a queue exists.
@ -89,13 +114,24 @@ def exists(name, region=None, key=None, keyid=None, profile=None):
'''
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
if conn.get_queue(name):
return True
else:
return False
try:
conn.get_queue_url(QueueName=name)
except botocore.exceptions.ClientError as e:
missing_code = 'AWS.SimpleQueueService.NonExistentQueue'
if e.response.get('Error', {}).get('Code') == missing_code:
return {'result': False}
return {'error': __utils__['boto3.get_error'](e)}
return {'result': True}
def create(name, region=None, key=None, keyid=None, profile=None):
def create(
name,
attributes=None,
region=None,
key=None,
keyid=None,
profile=None,
):
'''
Create an SQS queue.
@ -107,15 +143,15 @@ def create(name, region=None, key=None, keyid=None, profile=None):
'''
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
if not conn.get_queue(name):
try:
conn.create_queue(name)
except boto.exception.SQSError:
msg = 'Failed to create queue {0}'.format(name)
log.error(msg)
return False
log.info('Created queue {0}'.format(name))
return True
if attributes is None:
attributes = {}
attributes = _preprocess_attributes(attributes)
try:
conn.create_queue(QueueName=name, Attributes=attributes)
except botocore.exceptions.ClientError as e:
return {'error': __utils__['boto3.get_error'](e)}
return {'result': True}
def delete(name, region=None, key=None, keyid=None, profile=None):
@ -130,37 +166,15 @@ def delete(name, region=None, key=None, keyid=None, profile=None):
'''
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
queue_obj = conn.get_queue(name)
if queue_obj:
deleted_queue = conn.delete_queue(queue_obj)
if not deleted_queue:
msg = 'Failed to delete queue {0}'.format(name)
log.error(msg)
return False
return True
def get_all_queues(prefix=None, region=None, key=None, keyid=None, profile=None):
'''
Return a list of Queue() objects describing all visible queues.
.. versionadded:: 2016.11.0
CLI Example:
.. code-block:: bash
salt myminion boto_sqs.get_all_queues region=us-east-1 --output yaml
'''
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
try:
return conn.get_all_queues(prefix=prefix)
except boto.exception.SQSError:
log.error('Error listing queues')
return []
url = conn.get_queue_url(QueueName=name)['QueueUrl']
conn.delete_queue(QueueUrl=url)
except botocore.exceptions.ClientError as e:
return {'error': __utils__['boto3.get_error'](e)}
return {'result': True}
def list(prefix=None, region=None, key=None, keyid=None, profile=None):
def list_(prefix='', region=None, key=None, keyid=None, profile=None):
'''
Return a list of the names of all visible queues.
@ -172,8 +186,19 @@ def list(prefix=None, region=None, key=None, keyid=None, profile=None):
salt myminion boto_sqs.list region=us-east-1
'''
return [r.name for r in get_all_queues(prefix=prefix, region=region, key=key,
keyid=keyid, profile=profile)]
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
def extract_name(queue_url):
# Note: this logic taken from boto, so should be safe
return _urlparse(queue_url).path.split('/')[2]
try:
r = conn.list_queues(QueueNamePrefix=prefix)
# The 'QueueUrls' attribute is missing if there are no queues
urls = r.get('QueueUrls', [])
return {'result': [extract_name(url) for url in urls]}
except botocore.exceptions.ClientError as e:
return {'error': __utils__['boto3.get_error'](e)}
def get_attributes(name, region=None, key=None, keyid=None, profile=None):
@ -187,17 +212,23 @@ def get_attributes(name, region=None, key=None, keyid=None, profile=None):
salt myminion boto_sqs.get_attributes myqueue
'''
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
if not conn:
return {}
queue_obj = conn.get_queue(name)
if not queue_obj:
log.error('Queue {0} does not exist.'.format(name))
return {}
return conn.get_queue_attributes(queue_obj)
try:
url = conn.get_queue_url(QueueName=name)['QueueUrl']
r = conn.get_queue_attributes(QueueUrl=url, AttributeNames=['All'])
return {'result': r['Attributes']}
except botocore.exceptions.ClientError as e:
return {'error': __utils__['boto3.get_error'](e)}
def set_attributes(name, attributes, region=None, key=None, keyid=None,
profile=None):
def set_attributes(
name,
attributes,
region=None,
key=None,
keyid=None,
profile=None,
):
'''
Set attributes on an SQS queue.
@ -207,22 +238,13 @@ def set_attributes(name, attributes, region=None, key=None, keyid=None,
salt myminion boto_sqs.set_attributes myqueue '{ReceiveMessageWaitTimeSeconds: 20}' region=us-east-1
'''
ret = True
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
queue_obj = conn.get_queue(name)
if not queue_obj:
log.error('Queue {0} does not exist.'.format(name))
ret = False
if isinstance(attributes, string_types):
attributes = json.loads(attributes)
for attr, val in six.iteritems(attributes):
attr_set = queue_obj.set_attribute(attr, val)
if not attr_set:
msg = 'Failed to set attribute {0} = {1} on queue {2}'
log.error(msg.format(attr, val, name))
ret = False
else:
msg = 'Set attribute {0} = {1} on queue {2}'
log.info(msg.format(attr, val, name))
return ret
attributes = _preprocess_attributes(attributes)
try:
url = conn.get_queue_url(QueueName=name)['QueueUrl']
conn.set_queue_attributes(QueueUrl=url, Attributes=attributes)
except botocore.exceptions.ClientError as e:
return {'error': __utils__['boto3.get_error'](e)}
return {'result': True}

View File

@ -57,10 +57,16 @@ passed in as a dict, or as a string to pull from pillars or minion config:
keyid: GKTADJGHEIQSXMKKRBJ08H
key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
'''
from __future__ import absolute_import
import salt.ext.six as six
import logging
import difflib
import json
import logging
import yaml
import salt.ext.six as six
log = logging.getLogger(__name__)
@ -78,7 +84,8 @@ def present(
region=None,
key=None,
keyid=None,
profile=None):
profile=None,
):
'''
Ensure the SQS queue exists.
@ -101,68 +108,143 @@ def present(
A dict with region, key and keyid, or a pillar key (string)
that contains a dict with region, key and keyid.
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
comments = []
ret = {'name': name, 'result': True, 'changes': {}}
is_present = __salt__['boto_sqs.exists'](name, region, key, keyid, profile)
r = __salt__['boto_sqs.exists'](
name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
ret['comment'] = '\n'.join(comments + [str(r['error'])])
return ret
if not is_present:
if r['result']:
comments.append('SQS queue {0} present.'.format(name))
else:
if __opts__['test']:
msg = 'AWS SQS queue {0} is set to be created.'.format(name)
ret['comment'] = msg
ret['result'] = None
comments.append('SQS queue {0} is set to be created.'.format(name))
ret['pchanges'] = {'old': None, 'new': name}
ret['comment'] = '\n'.join(comments)
return ret
created = __salt__['boto_sqs.create'](name, region, key, keyid,
profile)
if created:
ret['changes']['old'] = None
ret['changes']['new'] = {'queue': name}
else:
r = __salt__['boto_sqs.create'](
name,
attributes=attributes,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
ret['comment'] = 'Failed to create {0} AWS queue'.format(name)
comments.append('Failed to create SQS queue {0}: {1}'.format(
name,
str(r['error']),
))
ret['comment'] = '\n'.join(comments)
return ret
else:
ret['comment'] = '{0} present.'.format(name)
comments.append('SQS queue {0} created.'.format(name))
ret['changes']['old'] = None
ret['changes']['new'] = name
# Return immediately, as the create call also set all attributes
ret['comment'] = '\n'.join(comments)
return ret
if not attributes:
ret['comment'] = '\n'.join(comments)
return ret
r = __salt__['boto_sqs.get_attributes'](
name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
comments.append('Failed to get queue attributes: {0}'.format(
str(r['error']),
))
ret['comment'] = '\n'.join(comments)
return ret
current_attributes = r['result']
attrs_to_set = {}
_attributes = __salt__['boto_sqs.get_attributes'](name, region, key, keyid,
profile)
if attributes:
for attr, val in six.iteritems(attributes):
_val = _attributes.get(attr, None)
if attr == 'Policy':
# Normalize these guys by brute force..
if isinstance(_val, six.string_types):
_val = json.loads(_val)
if isinstance(val, six.string_types):
val = json.loads(val)
if _val != val:
log.debug('Policies differ:\n{0}\n{1}'.format(_val, val))
attrs_to_set[attr] = json.dumps(val, sort_keys=True)
elif str(_val) != str(val):
log.debug('Attributes differ:\n{0}\n{1}'.format(_val, val))
attrs_to_set[attr] = val
attr_names = ','.join(attrs_to_set)
if attrs_to_set:
if __opts__['test']:
ret['comment'] = 'Attribute(s) {0} to be set on {1}.'.format(
attr_names, name)
ret['result'] = None
return ret
msg = (' Setting {0} attribute(s).'.format(attr_names))
ret['comment'] = ret['comment'] + msg
if 'new' in ret['changes']:
ret['changes']['new']['attributes_set'] = []
else:
ret['changes']['new'] = {'attributes_set': []}
for attr, val in six.iteritems(attrs_to_set):
set_attr = __salt__['boto_sqs.set_attributes'](name, {attr: val},
region, key, keyid,
profile)
if not set_attr:
ret['result'] = False
msg = 'Set attribute {0}.'.format(attr)
ret['changes']['new']['attributes_set'].append(attr)
else:
ret['comment'] = ret['comment'] + ' Attributes set.'
for attr, val in six.iteritems(attributes):
_val = current_attributes.get(attr, None)
if attr == 'Policy':
# Normalize by brute force
if isinstance(_val, six.string_types):
_val = json.loads(_val)
if isinstance(val, six.string_types):
val = json.loads(val)
if _val != val:
log.debug('Policies differ:\n{0}\n{1}'.format(_val, val))
attrs_to_set[attr] = json.dumps(val, sort_keys=True)
elif str(_val) != str(val):
log.debug('Attributes differ:\n{0}\n{1}'.format(_val, val))
attrs_to_set[attr] = val
attr_names = ', '.join(attrs_to_set)
if not attrs_to_set:
comments.append('Queue attributes already set correctly.')
ret['comment'] = '\n'.join(comments)
return ret
final_attributes = current_attributes.copy()
final_attributes.update(attrs_to_set)
def _yaml_safe_dump(attrs):
'''Safely dump YAML using a readable flow style'''
dumper_name = 'IndentedSafeOrderedDumper'
dumper = __utils__['yamldumper.get_dumper'](dumper_name)
return yaml.dump(
attrs,
default_flow_style=False,
Dumper=dumper,
)
attributes_diff = ''.join(difflib.unified_diff(
_yaml_safe_dump(current_attributes).splitlines(True),
_yaml_safe_dump(final_attributes).splitlines(True),
))
if __opts__['test']:
ret['result'] = None
comments.append('Attribute(s) {0} set to be updated:'.format(
attr_names,
))
comments.append(attributes_diff)
ret['pchanges'] = {'attributes': {'diff': attributes_diff}}
ret['comment'] = '\n'.join(comments)
return ret
r = __salt__['boto_sqs.set_attributes'](
name,
attrs_to_set,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
comments.append('Failed to set queue attributes: {0}'.format(
str(r['error']),
))
ret['comment'] = '\n'.join(comments)
return ret
comments.append('Updated SQS queue attribute(s) {0}.'.format(attr_names))
ret['changes']['attributes'] = {'diff': attributes_diff}
ret['comment'] = '\n'.join(comments)
return ret
@ -171,7 +253,8 @@ def absent(
region=None,
key=None,
keyid=None,
profile=None):
profile=None,
):
'''
Ensure the named sqs queue is deleted.
@ -193,23 +276,44 @@ def absent(
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
is_present = __salt__['boto_sqs.exists'](name, region, key, keyid, profile)
r = __salt__['boto_sqs.exists'](
name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
ret['comment'] = str(r['error'])
return ret
if is_present:
if __opts__['test']:
ret['comment'] = 'AWS SQS queue {0} is set to be removed.'.format(
name)
ret['result'] = None
return ret
deleted = __salt__['boto_sqs.delete'](name, region, key, keyid,
profile)
if deleted:
ret['changes']['old'] = name
ret['changes']['new'] = None
else:
ret['result'] = False
ret['comment'] = 'Failed to delete {0} sqs queue.'.format(name)
else:
ret['comment'] = '{0} does not exist in {1}.'.format(name, region)
if not r['result']:
ret['comment'] = 'SQS queue {0} does not exist in {1}.'.format(
name,
region
)
return ret
if __opts__['test']:
ret['result'] = None
ret['comment'] = 'SQS queue {0} is set to be removed.'.format(name)
ret['pchanges'] = {'old': name, 'new': None}
return ret
r = __salt__['boto_sqs.delete'](
name,
region=region,
key=key,
keyid=keyid,
profile=profile,
)
if 'error' in r:
ret['result'] = False
ret['comment'] = str(r['error'])
return ret
ret['comment'] = 'SQS queue {0} was deleted.'.format(name)
ret['changes']['old'] = name
ret['changes']['new'] = None
return ret

View File

@ -4,6 +4,7 @@
'''
# Import Python libs
from __future__ import absolute_import
import textwrap
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
@ -11,6 +12,8 @@ from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, MagicMock, patch
# Import Salt Libs
import salt.config
import salt.loader
import salt.states.boto_sqs as boto_sqs
@ -20,7 +23,24 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin):
Test cases for salt.states.boto_sqs
'''
def setup_loader_modules(self):
return {boto_sqs: {}}
utils = salt.loader.utils(
self.opts,
whitelist=['boto3', 'yamldumper'],
context={}
)
return {
boto_sqs: {
'__utils__': utils,
}
}
@classmethod
def setUpClass(cls):
cls.opts = salt.config.DEFAULT_MINION_OPTS
@classmethod
def tearDownClass(cls):
del cls.opts
# 'present' function tests: 1
@ -29,37 +49,55 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin):
Test to ensure the SQS queue exists.
'''
name = 'mysqs'
attributes = {'ReceiveMessageWaitTimeSeconds': 20}
attributes = {'DelaySeconds': 20}
base_ret = {'name': name, 'changes': {}}
ret = {'name': name,
'result': False,
'changes': {},
'comment': ''}
mock = MagicMock(side_effect=[False, False, True, True])
mock_bool = MagicMock(return_value=False)
mock_attr = MagicMock(return_value={})
mock = MagicMock(
side_effect=[{'result': b} for b in [False, False, True, True]],
)
mock_bool = MagicMock(return_value={'error': 'create error'})
mock_attr = MagicMock(return_value={'result': {}})
with patch.dict(boto_sqs.__salt__,
{'boto_sqs.exists': mock,
'boto_sqs.create': mock_bool,
'boto_sqs.get_attributes': mock_attr}):
with patch.dict(boto_sqs.__opts__, {'test': False}):
comt = ('Failed to create {0} AWS queue'.format(name))
ret.update({'comment': comt})
comt = 'Failed to create SQS queue {0}: create error'.format(
name,
)
ret = base_ret.copy()
ret.update({'result': False, 'comment': comt})
self.assertDictEqual(boto_sqs.present(name), ret)
with patch.dict(boto_sqs.__opts__, {'test': True}):
comt = ('AWS SQS queue {0} is set to be created.'.format(name))
ret.update({'comment': comt, 'result': None})
comt = 'SQS queue {0} is set to be created.'.format(name)
ret = base_ret.copy()
ret.update({
'result': None,
'comment': comt,
'pchanges': {'old': None, 'new': 'mysqs'},
})
self.assertDictEqual(boto_sqs.present(name), ret)
comt = ('Attribute(s) ReceiveMessageWaitTimeSeconds'
' to be set on mysqs.')
ret.update({'comment': comt})
diff = textwrap.dedent('''\
---
+++
@@ -1 +1 @@
-{}
+DelaySeconds: 20
''')
comt = textwrap.dedent('''\
SQS queue mysqs present.
Attribute(s) DelaySeconds set to be updated:
''') + diff
ret.update({
'comment': comt,
'pchanges': {'attributes': {'diff': diff}},
})
self.assertDictEqual(boto_sqs.present(name, attributes), ret)
comt = ('mysqs present. Attributes set.')
ret.update({'comment': comt, 'result': True})
comt = ('SQS queue mysqs present.')
ret = base_ret.copy()
ret.update({'result': True, 'comment': comt})
self.assertDictEqual(boto_sqs.present(name), ret)
# 'absent' function tests: 1
@ -69,20 +107,22 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin):
Test to ensure the named sqs queue is deleted.
'''
name = 'test.example.com.'
base_ret = {'name': name, 'changes': {}}
ret = {'name': name,
'result': True,
'changes': {},
'comment': ''}
mock = MagicMock(side_effect=[False, True])
mock = MagicMock(side_effect=[{'result': False}, {'result': True}])
with patch.dict(boto_sqs.__salt__,
{'boto_sqs.exists': mock}):
comt = ('{0} does not exist in None.'.format(name))
ret.update({'comment': comt})
comt = ('SQS queue {0} does not exist in None.'.format(name))
ret = base_ret.copy()
ret.update({'result': True, 'comment': comt})
self.assertDictEqual(boto_sqs.absent(name), ret)
with patch.dict(boto_sqs.__opts__, {'test': True}):
comt = ('AWS SQS queue {0} is set to be removed.'.format(name))
ret.update({'comment': comt, 'result': None})
comt = ('SQS queue {0} is set to be removed.'.format(name))
ret = base_ret.copy()
ret.update({
'result': None,
'comment': comt,
'pchanges': {'old': name, 'new': None},
})
self.assertDictEqual(boto_sqs.absent(name), ret)