diff --git a/salt/modules/boto_sqs.py b/salt/modules/boto_sqs.py index bc02e66d68..575e65c83a 100644 --- a/salt/modules/boto_sqs.py +++ b/salt/modules/boto_sqs.py @@ -7,7 +7,7 @@ Connection module for Amazon SQS :configuration: This module accepts explicit sqs credentials but can also utilize IAM roles assigned to the instance through Instance Profiles. Dynamic credentials are then automatically obtained from AWS API and no further - configuration is necessary. More Information available at: + configuration is necessary. More information available at: .. code-block:: text @@ -39,44 +39,69 @@ Connection module for Amazon SQS key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs region: us-east-1 -:depends: boto +:depends: boto3 ''' # keep lint from choking on _get_conn and _cache_id -#pylint: disable=E0602 +# pylint: disable=E0602 from __future__ import absolute_import # Import Python libs import logging import json + +# Import Salt libs import salt.ext.six as six +from salt.ext.six import string_types +from salt.ext.six.moves.urllib.parse import urlparse as _urlparse # pylint: disable=import-error,no-name-in-module log = logging.getLogger(__name__) +__func_alias__ = { + 'list_': 'list', +} + # Import third party libs try: # pylint: disable=unused-import - import boto - import boto.sqs + import boto3 + import botocore # pylint: enable=unused-import - logging.getLogger('boto').setLevel(logging.CRITICAL) - HAS_BOTO = True + logging.getLogger('boto3').setLevel(logging.CRITICAL) + HAS_BOTO3 = True except ImportError: - HAS_BOTO = False - -from salt.ext.six import string_types + HAS_BOTO3 = False def __virtual__(): ''' - Only load if boto libraries exist. + Only load if boto3 libraries exist. ''' - if not HAS_BOTO: - return (False, 'The boto_sqs module could not be loaded: boto libraries not found') - __utils__['boto.assign_funcs'](__name__, 'sqs', pack=__salt__) + if not HAS_BOTO3: + return (False, 'The boto_sqs module could not be loaded: boto3 libraries not found') + __utils__['boto3.assign_funcs'](__name__, 'sqs', pack=__salt__) return True +def _preprocess_attributes(attributes): + ''' + Pre-process incoming queue attributes before setting them + ''' + if isinstance(attributes, string_types): + attributes = json.loads(attributes) + + def stringified(val): + # Some attributes take full json policy documents, but they take them + # as json strings. Convert the value back into a json string. + if isinstance(val, dict): + return json.dumps(val) + return val + + return dict( + (attr, stringified(val)) for attr, val in six.iteritems(attributes) + ) + + def exists(name, region=None, key=None, keyid=None, profile=None): ''' Check to see if a queue exists. @@ -89,13 +114,24 @@ def exists(name, region=None, key=None, keyid=None, profile=None): ''' conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) - if conn.get_queue(name): - return True - else: - return False + try: + conn.get_queue_url(QueueName=name) + except botocore.exceptions.ClientError as e: + missing_code = 'AWS.SimpleQueueService.NonExistentQueue' + if e.response.get('Error', {}).get('Code') == missing_code: + return {'result': False} + return {'error': __utils__['boto3.get_error'](e)} + return {'result': True} -def create(name, region=None, key=None, keyid=None, profile=None): +def create( + name, + attributes=None, + region=None, + key=None, + keyid=None, + profile=None, +): ''' Create an SQS queue. @@ -107,15 +143,15 @@ def create(name, region=None, key=None, keyid=None, profile=None): ''' conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) - if not conn.get_queue(name): - try: - conn.create_queue(name) - except boto.exception.SQSError: - msg = 'Failed to create queue {0}'.format(name) - log.error(msg) - return False - log.info('Created queue {0}'.format(name)) - return True + if attributes is None: + attributes = {} + attributes = _preprocess_attributes(attributes) + + try: + conn.create_queue(QueueName=name, Attributes=attributes) + except botocore.exceptions.ClientError as e: + return {'error': __utils__['boto3.get_error'](e)} + return {'result': True} def delete(name, region=None, key=None, keyid=None, profile=None): @@ -130,37 +166,15 @@ def delete(name, region=None, key=None, keyid=None, profile=None): ''' conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) - queue_obj = conn.get_queue(name) - if queue_obj: - deleted_queue = conn.delete_queue(queue_obj) - if not deleted_queue: - msg = 'Failed to delete queue {0}'.format(name) - log.error(msg) - return False - return True - - -def get_all_queues(prefix=None, region=None, key=None, keyid=None, profile=None): - ''' - Return a list of Queue() objects describing all visible queues. - - .. versionadded:: 2016.11.0 - - CLI Example: - - .. code-block:: bash - - salt myminion boto_sqs.get_all_queues region=us-east-1 --output yaml - ''' - conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) try: - return conn.get_all_queues(prefix=prefix) - except boto.exception.SQSError: - log.error('Error listing queues') - return [] + url = conn.get_queue_url(QueueName=name)['QueueUrl'] + conn.delete_queue(QueueUrl=url) + except botocore.exceptions.ClientError as e: + return {'error': __utils__['boto3.get_error'](e)} + return {'result': True} -def list(prefix=None, region=None, key=None, keyid=None, profile=None): +def list_(prefix='', region=None, key=None, keyid=None, profile=None): ''' Return a list of the names of all visible queues. @@ -172,8 +186,19 @@ def list(prefix=None, region=None, key=None, keyid=None, profile=None): salt myminion boto_sqs.list region=us-east-1 ''' - return [r.name for r in get_all_queues(prefix=prefix, region=region, key=key, - keyid=keyid, profile=profile)] + conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) + + def extract_name(queue_url): + # Note: this logic taken from boto, so should be safe + return _urlparse(queue_url).path.split('/')[2] + + try: + r = conn.list_queues(QueueNamePrefix=prefix) + # The 'QueueUrls' attribute is missing if there are no queues + urls = r.get('QueueUrls', []) + return {'result': [extract_name(url) for url in urls]} + except botocore.exceptions.ClientError as e: + return {'error': __utils__['boto3.get_error'](e)} def get_attributes(name, region=None, key=None, keyid=None, profile=None): @@ -187,17 +212,23 @@ def get_attributes(name, region=None, key=None, keyid=None, profile=None): salt myminion boto_sqs.get_attributes myqueue ''' conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) - if not conn: - return {} - queue_obj = conn.get_queue(name) - if not queue_obj: - log.error('Queue {0} does not exist.'.format(name)) - return {} - return conn.get_queue_attributes(queue_obj) + + try: + url = conn.get_queue_url(QueueName=name)['QueueUrl'] + r = conn.get_queue_attributes(QueueUrl=url, AttributeNames=['All']) + return {'result': r['Attributes']} + except botocore.exceptions.ClientError as e: + return {'error': __utils__['boto3.get_error'](e)} -def set_attributes(name, attributes, region=None, key=None, keyid=None, - profile=None): +def set_attributes( + name, + attributes, + region=None, + key=None, + keyid=None, + profile=None, +): ''' Set attributes on an SQS queue. @@ -207,22 +238,13 @@ def set_attributes(name, attributes, region=None, key=None, keyid=None, salt myminion boto_sqs.set_attributes myqueue '{ReceiveMessageWaitTimeSeconds: 20}' region=us-east-1 ''' - ret = True conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile) - queue_obj = conn.get_queue(name) - if not queue_obj: - log.error('Queue {0} does not exist.'.format(name)) - ret = False - if isinstance(attributes, string_types): - attributes = json.loads(attributes) - for attr, val in six.iteritems(attributes): - attr_set = queue_obj.set_attribute(attr, val) - if not attr_set: - msg = 'Failed to set attribute {0} = {1} on queue {2}' - log.error(msg.format(attr, val, name)) - ret = False - else: - msg = 'Set attribute {0} = {1} on queue {2}' - log.info(msg.format(attr, val, name)) - return ret + attributes = _preprocess_attributes(attributes) + + try: + url = conn.get_queue_url(QueueName=name)['QueueUrl'] + conn.set_queue_attributes(QueueUrl=url, Attributes=attributes) + except botocore.exceptions.ClientError as e: + return {'error': __utils__['boto3.get_error'](e)} + return {'result': True} diff --git a/salt/states/boto_sqs.py b/salt/states/boto_sqs.py index a97b37ba5d..0857e50379 100644 --- a/salt/states/boto_sqs.py +++ b/salt/states/boto_sqs.py @@ -57,10 +57,16 @@ passed in as a dict, or as a string to pull from pillars or minion config: keyid: GKTADJGHEIQSXMKKRBJ08H key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs ''' + from __future__ import absolute_import -import salt.ext.six as six -import logging + +import difflib import json +import logging + +import yaml + +import salt.ext.six as six log = logging.getLogger(__name__) @@ -78,7 +84,8 @@ def present( region=None, key=None, keyid=None, - profile=None): + profile=None, +): ''' Ensure the SQS queue exists. @@ -101,68 +108,143 @@ def present( A dict with region, key and keyid, or a pillar key (string) that contains a dict with region, key and keyid. ''' - ret = {'name': name, 'result': True, 'comment': '', 'changes': {}} + comments = [] + ret = {'name': name, 'result': True, 'changes': {}} - is_present = __salt__['boto_sqs.exists'](name, region, key, keyid, profile) + r = __salt__['boto_sqs.exists']( + name, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: + ret['result'] = False + ret['comment'] = '\n'.join(comments + [str(r['error'])]) + return ret - if not is_present: + if r['result']: + comments.append('SQS queue {0} present.'.format(name)) + else: if __opts__['test']: - msg = 'AWS SQS queue {0} is set to be created.'.format(name) - ret['comment'] = msg ret['result'] = None + comments.append('SQS queue {0} is set to be created.'.format(name)) + ret['pchanges'] = {'old': None, 'new': name} + ret['comment'] = '\n'.join(comments) return ret - created = __salt__['boto_sqs.create'](name, region, key, keyid, - profile) - if created: - ret['changes']['old'] = None - ret['changes']['new'] = {'queue': name} - else: + + r = __salt__['boto_sqs.create']( + name, + attributes=attributes, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: ret['result'] = False - ret['comment'] = 'Failed to create {0} AWS queue'.format(name) + comments.append('Failed to create SQS queue {0}: {1}'.format( + name, + str(r['error']), + )) + ret['comment'] = '\n'.join(comments) return ret - else: - ret['comment'] = '{0} present.'.format(name) + + comments.append('SQS queue {0} created.'.format(name)) + ret['changes']['old'] = None + ret['changes']['new'] = name + # Return immediately, as the create call also set all attributes + ret['comment'] = '\n'.join(comments) + return ret + + if not attributes: + ret['comment'] = '\n'.join(comments) + return ret + + r = __salt__['boto_sqs.get_attributes']( + name, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: + ret['result'] = False + comments.append('Failed to get queue attributes: {0}'.format( + str(r['error']), + )) + ret['comment'] = '\n'.join(comments) + return ret + current_attributes = r['result'] + attrs_to_set = {} - _attributes = __salt__['boto_sqs.get_attributes'](name, region, key, keyid, - profile) - if attributes: - for attr, val in six.iteritems(attributes): - _val = _attributes.get(attr, None) - if attr == 'Policy': - # Normalize these guys by brute force.. - if isinstance(_val, six.string_types): - _val = json.loads(_val) - if isinstance(val, six.string_types): - val = json.loads(val) - if _val != val: - log.debug('Policies differ:\n{0}\n{1}'.format(_val, val)) - attrs_to_set[attr] = json.dumps(val, sort_keys=True) - elif str(_val) != str(val): - log.debug('Attributes differ:\n{0}\n{1}'.format(_val, val)) - attrs_to_set[attr] = val - attr_names = ','.join(attrs_to_set) - if attrs_to_set: - if __opts__['test']: - ret['comment'] = 'Attribute(s) {0} to be set on {1}.'.format( - attr_names, name) - ret['result'] = None - return ret - msg = (' Setting {0} attribute(s).'.format(attr_names)) - ret['comment'] = ret['comment'] + msg - if 'new' in ret['changes']: - ret['changes']['new']['attributes_set'] = [] - else: - ret['changes']['new'] = {'attributes_set': []} - for attr, val in six.iteritems(attrs_to_set): - set_attr = __salt__['boto_sqs.set_attributes'](name, {attr: val}, - region, key, keyid, - profile) - if not set_attr: - ret['result'] = False - msg = 'Set attribute {0}.'.format(attr) - ret['changes']['new']['attributes_set'].append(attr) - else: - ret['comment'] = ret['comment'] + ' Attributes set.' + for attr, val in six.iteritems(attributes): + _val = current_attributes.get(attr, None) + if attr == 'Policy': + # Normalize by brute force + if isinstance(_val, six.string_types): + _val = json.loads(_val) + if isinstance(val, six.string_types): + val = json.loads(val) + if _val != val: + log.debug('Policies differ:\n{0}\n{1}'.format(_val, val)) + attrs_to_set[attr] = json.dumps(val, sort_keys=True) + elif str(_val) != str(val): + log.debug('Attributes differ:\n{0}\n{1}'.format(_val, val)) + attrs_to_set[attr] = val + attr_names = ', '.join(attrs_to_set) + + if not attrs_to_set: + comments.append('Queue attributes already set correctly.') + ret['comment'] = '\n'.join(comments) + return ret + + final_attributes = current_attributes.copy() + final_attributes.update(attrs_to_set) + + def _yaml_safe_dump(attrs): + '''Safely dump YAML using a readable flow style''' + dumper_name = 'IndentedSafeOrderedDumper' + dumper = __utils__['yamldumper.get_dumper'](dumper_name) + return yaml.dump( + attrs, + default_flow_style=False, + Dumper=dumper, + ) + attributes_diff = ''.join(difflib.unified_diff( + _yaml_safe_dump(current_attributes).splitlines(True), + _yaml_safe_dump(final_attributes).splitlines(True), + )) + + if __opts__['test']: + ret['result'] = None + comments.append('Attribute(s) {0} set to be updated:'.format( + attr_names, + )) + comments.append(attributes_diff) + ret['pchanges'] = {'attributes': {'diff': attributes_diff}} + ret['comment'] = '\n'.join(comments) + return ret + + r = __salt__['boto_sqs.set_attributes']( + name, + attrs_to_set, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: + ret['result'] = False + comments.append('Failed to set queue attributes: {0}'.format( + str(r['error']), + )) + ret['comment'] = '\n'.join(comments) + return ret + + comments.append('Updated SQS queue attribute(s) {0}.'.format(attr_names)) + ret['changes']['attributes'] = {'diff': attributes_diff} + ret['comment'] = '\n'.join(comments) return ret @@ -171,7 +253,8 @@ def absent( region=None, key=None, keyid=None, - profile=None): + profile=None, +): ''' Ensure the named sqs queue is deleted. @@ -193,23 +276,44 @@ def absent( ''' ret = {'name': name, 'result': True, 'comment': '', 'changes': {}} - is_present = __salt__['boto_sqs.exists'](name, region, key, keyid, profile) + r = __salt__['boto_sqs.exists']( + name, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: + ret['result'] = False + ret['comment'] = str(r['error']) + return ret - if is_present: - if __opts__['test']: - ret['comment'] = 'AWS SQS queue {0} is set to be removed.'.format( - name) - ret['result'] = None - return ret - deleted = __salt__['boto_sqs.delete'](name, region, key, keyid, - profile) - if deleted: - ret['changes']['old'] = name - ret['changes']['new'] = None - else: - ret['result'] = False - ret['comment'] = 'Failed to delete {0} sqs queue.'.format(name) - else: - ret['comment'] = '{0} does not exist in {1}.'.format(name, region) + if not r['result']: + ret['comment'] = 'SQS queue {0} does not exist in {1}.'.format( + name, + region + ) + return ret + if __opts__['test']: + ret['result'] = None + ret['comment'] = 'SQS queue {0} is set to be removed.'.format(name) + ret['pchanges'] = {'old': name, 'new': None} + return ret + + r = __salt__['boto_sqs.delete']( + name, + region=region, + key=key, + keyid=keyid, + profile=profile, + ) + if 'error' in r: + ret['result'] = False + ret['comment'] = str(r['error']) + return ret + + ret['comment'] = 'SQS queue {0} was deleted.'.format(name) + ret['changes']['old'] = name + ret['changes']['new'] = None return ret diff --git a/tests/unit/states/test_boto_sqs.py b/tests/unit/states/test_boto_sqs.py index 1ee4c9820e..4c50a5449f 100644 --- a/tests/unit/states/test_boto_sqs.py +++ b/tests/unit/states/test_boto_sqs.py @@ -4,6 +4,7 @@ ''' # Import Python libs from __future__ import absolute_import +import textwrap # Import Salt Testing Libs from tests.support.mixins import LoaderModuleMockMixin @@ -11,6 +12,8 @@ from tests.support.unit import skipIf, TestCase from tests.support.mock import NO_MOCK, NO_MOCK_REASON, MagicMock, patch # Import Salt Libs +import salt.config +import salt.loader import salt.states.boto_sqs as boto_sqs @@ -20,7 +23,24 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin): Test cases for salt.states.boto_sqs ''' def setup_loader_modules(self): - return {boto_sqs: {}} + utils = salt.loader.utils( + self.opts, + whitelist=['boto3', 'yamldumper'], + context={} + ) + return { + boto_sqs: { + '__utils__': utils, + } + } + + @classmethod + def setUpClass(cls): + cls.opts = salt.config.DEFAULT_MINION_OPTS + + @classmethod + def tearDownClass(cls): + del cls.opts # 'present' function tests: 1 @@ -29,37 +49,55 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin): Test to ensure the SQS queue exists. ''' name = 'mysqs' - attributes = {'ReceiveMessageWaitTimeSeconds': 20} + attributes = {'DelaySeconds': 20} + base_ret = {'name': name, 'changes': {}} - ret = {'name': name, - 'result': False, - 'changes': {}, - 'comment': ''} - - mock = MagicMock(side_effect=[False, False, True, True]) - mock_bool = MagicMock(return_value=False) - mock_attr = MagicMock(return_value={}) + mock = MagicMock( + side_effect=[{'result': b} for b in [False, False, True, True]], + ) + mock_bool = MagicMock(return_value={'error': 'create error'}) + mock_attr = MagicMock(return_value={'result': {}}) with patch.dict(boto_sqs.__salt__, {'boto_sqs.exists': mock, 'boto_sqs.create': mock_bool, 'boto_sqs.get_attributes': mock_attr}): with patch.dict(boto_sqs.__opts__, {'test': False}): - comt = ('Failed to create {0} AWS queue'.format(name)) - ret.update({'comment': comt}) + comt = 'Failed to create SQS queue {0}: create error'.format( + name, + ) + ret = base_ret.copy() + ret.update({'result': False, 'comment': comt}) self.assertDictEqual(boto_sqs.present(name), ret) with patch.dict(boto_sqs.__opts__, {'test': True}): - comt = ('AWS SQS queue {0} is set to be created.'.format(name)) - ret.update({'comment': comt, 'result': None}) + comt = 'SQS queue {0} is set to be created.'.format(name) + ret = base_ret.copy() + ret.update({ + 'result': None, + 'comment': comt, + 'pchanges': {'old': None, 'new': 'mysqs'}, + }) self.assertDictEqual(boto_sqs.present(name), ret) - - comt = ('Attribute(s) ReceiveMessageWaitTimeSeconds' - ' to be set on mysqs.') - ret.update({'comment': comt}) + diff = textwrap.dedent('''\ + --- + +++ + @@ -1 +1 @@ + -{} + +DelaySeconds: 20 + ''') + comt = textwrap.dedent('''\ + SQS queue mysqs present. + Attribute(s) DelaySeconds set to be updated: + ''') + diff + ret.update({ + 'comment': comt, + 'pchanges': {'attributes': {'diff': diff}}, + }) self.assertDictEqual(boto_sqs.present(name, attributes), ret) - comt = ('mysqs present. Attributes set.') - ret.update({'comment': comt, 'result': True}) + comt = ('SQS queue mysqs present.') + ret = base_ret.copy() + ret.update({'result': True, 'comment': comt}) self.assertDictEqual(boto_sqs.present(name), ret) # 'absent' function tests: 1 @@ -69,20 +107,22 @@ class BotoSqsTestCase(TestCase, LoaderModuleMockMixin): Test to ensure the named sqs queue is deleted. ''' name = 'test.example.com.' + base_ret = {'name': name, 'changes': {}} - ret = {'name': name, - 'result': True, - 'changes': {}, - 'comment': ''} - - mock = MagicMock(side_effect=[False, True]) + mock = MagicMock(side_effect=[{'result': False}, {'result': True}]) with patch.dict(boto_sqs.__salt__, {'boto_sqs.exists': mock}): - comt = ('{0} does not exist in None.'.format(name)) - ret.update({'comment': comt}) + comt = ('SQS queue {0} does not exist in None.'.format(name)) + ret = base_ret.copy() + ret.update({'result': True, 'comment': comt}) self.assertDictEqual(boto_sqs.absent(name), ret) with patch.dict(boto_sqs.__opts__, {'test': True}): - comt = ('AWS SQS queue {0} is set to be removed.'.format(name)) - ret.update({'comment': comt, 'result': None}) + comt = ('SQS queue {0} is set to be removed.'.format(name)) + ret = base_ret.copy() + ret.update({ + 'result': None, + 'comment': comt, + 'pchanges': {'old': name, 'new': None}, + }) self.assertDictEqual(boto_sqs.absent(name), ret)