Merge pull request #36459 from cachedout/pr-36426

Pr 36426
This commit is contained in:
Mike Place 2016-09-21 15:34:28 +09:00 committed by GitHub
commit 3d23371ca2
3 changed files with 112 additions and 21 deletions

View File

@ -55,6 +55,15 @@ It's also possible to specify key, keyid and region via a profile:
key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
region: us-east-1
Additionally you can define cross account sqs:
.. code-block:: yaml
engines:
- sqs_events:
queue: prod
owner_acct_id: 111111111111
'''
# Import python libs
@ -114,9 +123,21 @@ def _get_sqs_conn(profile, region=None, key=None, keyid=None):
return conn
def start(queue, profile=None, tag='salt/engine/sqs'):
def _process_queue(q, q_name, fire_master, tag='salt/engine/sqs', owner_acct_id=None):
if not q:
log.warning('failure connecting to queue: {0}, '
'waiting 10 seconds.'.format(':'.join([_f for _f in (str(owner_acct_id), q_name) if _f])))
time.sleep(10)
else:
msgs = q.get_messages(wait_time_seconds=20)
for msg in msgs:
fire_master(tag=tag, data={'message': msg.get_body()})
msg.delete()
def start(queue, profile=None, tag='salt/engine/sqs', owner_acct_id=None):
'''
Listen to events and write them to a log file
Listen to sqs and fire message on event bus
'''
if __opts__.get('__role') == 'master':
fire_master = salt.utils.event.get_master_event(
@ -124,26 +145,11 @@ def start(queue, profile=None, tag='salt/engine/sqs'):
__opts__['sock_dir'],
listen=False).fire_event
else:
fire_master = None
def fire(tag, msg):
if fire_master:
fire_master(msg, tag)
else:
__salt__['event.send'](tag, msg)
fire_master = __salt__['event.send']
sqs = _get_sqs_conn(profile)
q = sqs.get_queue(queue)
q = None
while True:
if not q:
log.warning('failure connecting to queue: {0}, '
'waiting 10 seconds.'.format(queue))
time.sleep(10)
q = sqs.get_queue(queue)
if not q:
continue
msgs = q.get_messages(wait_time_seconds=20)
for msg in msgs:
fire(tag, {'message': msg.get_body()})
msg.delete()
q = sqs.get_queue(queue, owner_acct_id=owner_acct_id)
_process_queue(q, queue, fire_master, tag, owner_acct_id)

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
'''
unit tests for the sqs_events engine
'''
# Import Python libs
from __future__ import absolute_import
# Import Salt Testing Libs
from salttesting import skipIf, TestCase
from salttesting.mock import (
NO_MOCK,
NO_MOCK_REASON,
MagicMock,
patch)
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
# Import Salt Libs
from salt.engines import sqs_events
sqs_events.__salt__ = {}
sqs_events.__opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
@patch('salt.engines.sqs_events.boto.sqs')
class EngineSqsEventTestCase(TestCase):
'''
Test cases for salt.engine.sqs_events
'''
def sample_msg(self):
fake_msg = MagicMock()
fake_msg.get_body.return_value = "This is a test message"
fake_msg.delete.return_value = True
return fake_msg
# 'present' function tests: 1
@patch('salt.engines.sqs_events.log')
@patch('time.sleep', return_value=None)
def test_no_queue_present(self, mock_sleep, mock_logging, mock_sqs):
'''
Test to ensure the SQS engine logs a warning when queue not present
'''
q = None
q_name = 'mysqs'
mock_fire = MagicMock(return_value=True)
sqs_events._process_queue(q, q_name, mock_fire)
self.assertTrue(mock_logging.warning.called)
self.assertFalse(mock_sqs.queue.Queue().get_messages.called)
def test_minion_message_fires(self, mock_sqs):
'''
Test SQS engine correctly gets and fires messages on minion
'''
msgs = [self.sample_msg(), self.sample_msg()]
mock_sqs.queue.Queue().get_messages.return_value = msgs
q = mock_sqs.queue.Queue()
q_name = 'mysqs'
mock_event = MagicMock(return_value=True)
mock_fire = MagicMock(return_value=True)
with patch.dict(sqs_events.__salt__, {'event.send': mock_event}):
sqs_events._process_queue(q, q_name, mock_fire)
self.assertTrue(mock_sqs.queue.Queue().get_messages.called)
self.assertTrue(all(x.delete.called for x in msgs))
def test_master_message_fires(self, mock_sqs):
'''
Test SQS engine correctly gets and fires messages on master
'''
msgs = [self.sample_msg(), self.sample_msg()]
mock_sqs.queue.Queue().get_messages.return_value = msgs
q = mock_sqs.queue.Queue()
q_name = 'mysqs'
mock_fire = MagicMock(return_value=True)
sqs_events._process_queue(q, q_name, mock_fire)
self.assertTrue(mock_sqs.queue.Queue().get_messages.called, len(msgs))
self.assertTrue(mock_fire.called, len(msgs))
if __name__ == '__main__':
from integration import run_tests
run_tests(EngineSqsEventTestCase, needs_daemon=False)