2013-06-24 22:53:59 +00:00
|
|
|
# Import python libs
|
|
|
|
import os
|
2012-12-31 00:34:38 +00:00
|
|
|
import new
|
|
|
|
import sys
|
|
|
|
|
2013-06-24 22:53:59 +00:00
|
|
|
# Import salt libs
|
|
|
|
try:
|
|
|
|
import integration
|
|
|
|
except ImportError:
|
|
|
|
if __name__ == '__main__':
|
|
|
|
import sys
|
|
|
|
sys.path.insert(
|
|
|
|
0, os.path.abspath(
|
|
|
|
os.path.join(
|
|
|
|
os.path.dirname(__file__), '../../'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
import integration
|
|
|
|
|
2012-12-31 00:34:38 +00:00
|
|
|
|
2013-06-24 22:53:59 +00:00
|
|
|
# Import Salt Testing libs
|
|
|
|
from salttesting import TestCase, skipIf
|
2012-12-31 00:34:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
# wmi and pythoncom modules are platform specific...
|
|
|
|
wmi = new.module('wmi')
|
|
|
|
sys.modules['wmi'] = wmi
|
|
|
|
|
|
|
|
pythoncom = new.module('pythoncom')
|
|
|
|
sys.modules['pythoncom'] = pythoncom
|
2013-01-13 07:09:28 +00:00
|
|
|
wrong_version = True
|
2012-12-31 00:34:38 +00:00
|
|
|
|
|
|
|
try:
|
2013-01-04 17:43:37 +00:00
|
|
|
from mock import Mock, patch
|
2012-12-31 00:34:38 +00:00
|
|
|
has_mock = True
|
2013-01-04 17:43:37 +00:00
|
|
|
try:
|
|
|
|
from mock import call, ANY
|
|
|
|
wrong_version = False
|
|
|
|
except ImportError:
|
|
|
|
wrong_version = True
|
|
|
|
raise
|
2012-12-31 00:34:38 +00:00
|
|
|
WMI = Mock()
|
|
|
|
wmi.WMI = Mock(return_value=WMI)
|
|
|
|
pythoncom.CoInitialize = Mock()
|
2012-12-31 10:29:08 +00:00
|
|
|
pythoncom.CoUninitialize = Mock()
|
2012-12-31 00:34:38 +00:00
|
|
|
except ImportError:
|
|
|
|
has_mock = False
|
|
|
|
|
2013-06-24 22:53:59 +00:00
|
|
|
# This is imported late so mock can do it's job
|
2012-12-31 00:34:38 +00:00
|
|
|
import salt.modules.win_status as status
|
|
|
|
|
2013-01-04 17:43:37 +00:00
|
|
|
|
|
|
|
@skipIf(has_mock is False,
|
|
|
|
wrong_version and
|
2013-06-24 22:53:59 +00:00
|
|
|
'you need to upgrade your mock version to >= 0.8.0' or
|
|
|
|
'mock python module is unavailable')
|
2012-12-31 00:34:38 +00:00
|
|
|
class TestProcsBase(TestCase):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
TestCase.__init__(self, *args, **kwargs)
|
|
|
|
self.__processes = []
|
|
|
|
|
|
|
|
def add_process(
|
|
|
|
self,
|
|
|
|
pid=100,
|
|
|
|
cmd='cmd',
|
|
|
|
name='name',
|
|
|
|
user='user',
|
|
|
|
user_domain='domain',
|
|
|
|
get_owner_result=0):
|
|
|
|
process = Mock()
|
2013-06-24 22:53:59 +00:00
|
|
|
process.GetOwner = Mock(
|
|
|
|
return_value=(user_domain, get_owner_result, user)
|
|
|
|
)
|
2012-12-31 00:34:38 +00:00
|
|
|
process.ProcessId = pid
|
|
|
|
process.CommandLine = cmd
|
|
|
|
process.Name = name
|
|
|
|
self.__processes.append(process)
|
|
|
|
|
|
|
|
def call_procs(self):
|
|
|
|
WMI.win32_process = Mock(return_value=self.__processes)
|
|
|
|
self.result = status.procs()
|
|
|
|
|
|
|
|
|
|
|
|
class TestProcsCount(TestProcsBase):
|
|
|
|
def setUp(self):
|
|
|
|
self.add_process(pid=100)
|
|
|
|
self.add_process(pid=101)
|
|
|
|
self.call_procs()
|
|
|
|
|
|
|
|
def test_process_count(self):
|
|
|
|
self.assertEqual(len(self.result), 2)
|
|
|
|
|
|
|
|
def test_process_key_is_pid(self):
|
|
|
|
self.assertSetEqual(set(self.result.keys()), set([100, 101]))
|
|
|
|
|
|
|
|
|
|
|
|
class TestProcsAttributes(TestProcsBase):
|
|
|
|
def setUp(self):
|
|
|
|
self._expected_name = 'name'
|
|
|
|
self._expected_cmd = 'cmd'
|
|
|
|
self._expected_user = 'user'
|
|
|
|
self._expected_domain = 'domain'
|
|
|
|
pid = 100
|
|
|
|
self.add_process(
|
|
|
|
pid=pid,
|
|
|
|
cmd=self._expected_cmd,
|
|
|
|
user=self._expected_user,
|
|
|
|
user_domain=self._expected_domain,
|
|
|
|
get_owner_result=0)
|
|
|
|
self.call_procs()
|
|
|
|
self.proc = self.result[pid]
|
|
|
|
|
|
|
|
def test_process_cmd_is_set(self):
|
|
|
|
self.assertEqual(self.proc['cmd'], self._expected_cmd)
|
|
|
|
|
|
|
|
def test_process_name_is_set(self):
|
|
|
|
self.assertEqual(self.proc['name'], self._expected_name)
|
|
|
|
|
|
|
|
def test_process_user_is_set(self):
|
|
|
|
self.assertEqual(self.proc['user'], self._expected_user)
|
|
|
|
|
|
|
|
def test_process_user_domain_is_set(self):
|
|
|
|
self.assertEqual(self.proc['user_domain'], self._expected_domain)
|
|
|
|
|
|
|
|
|
|
|
|
class TestProcsUnicodeAttributes(TestProcsBase):
|
|
|
|
def setUp(self):
|
|
|
|
unicode_str = u'\xc1'
|
|
|
|
self.utf8str = unicode_str.encode('utf8')
|
2012-12-31 10:37:59 +00:00
|
|
|
pid = 100
|
2012-12-31 00:34:38 +00:00
|
|
|
self.add_process(
|
2012-12-31 10:37:59 +00:00
|
|
|
pid=pid,
|
2012-12-31 00:34:38 +00:00
|
|
|
user=unicode_str,
|
|
|
|
user_domain=unicode_str,
|
|
|
|
cmd=unicode_str,
|
|
|
|
name=unicode_str)
|
|
|
|
self.call_procs()
|
2012-12-31 10:37:59 +00:00
|
|
|
self.proc = self.result[pid]
|
2012-12-31 00:34:38 +00:00
|
|
|
|
|
|
|
def test_process_cmd_is_utf8(self):
|
|
|
|
self.assertEqual(self.proc['cmd'], self.utf8str)
|
|
|
|
|
|
|
|
def test_process_name_is_utf8(self):
|
|
|
|
self.assertEqual(self.proc['name'], self.utf8str)
|
|
|
|
|
|
|
|
def test_process_user_is_utf8(self):
|
|
|
|
self.assertEqual(self.proc['user'], self.utf8str)
|
|
|
|
|
|
|
|
def test_process_user_domain_is_utf8(self):
|
|
|
|
self.assertEqual(self.proc['user_domain'], self.utf8str)
|
|
|
|
|
|
|
|
|
|
|
|
class TestProcsWMIGetOwnerAccessDeniedWorkaround(TestProcsBase):
|
|
|
|
def setUp(self):
|
2013-06-24 22:53:59 +00:00
|
|
|
self.expected_user = 'SYSTEM'
|
|
|
|
self.expected_domain = 'NT AUTHORITY'
|
2012-12-31 00:34:38 +00:00
|
|
|
self.add_process(pid=0, get_owner_result=2)
|
|
|
|
self.add_process(pid=4, get_owner_result=2)
|
|
|
|
self.call_procs()
|
|
|
|
|
|
|
|
def test_user_is_set(self):
|
|
|
|
self.assertEqual(self.result[0]['user'], self.expected_user)
|
|
|
|
self.assertEqual(self.result[4]['user'], self.expected_user)
|
|
|
|
|
|
|
|
def test_process_user_domain_is_set(self):
|
|
|
|
self.assertEqual(self.result[0]['user_domain'], self.expected_domain)
|
|
|
|
self.assertEqual(self.result[4]['user_domain'], self.expected_domain)
|
|
|
|
|
|
|
|
|
|
|
|
class TestProcsWMIGetOwnerErrorsAreLogged(TestProcsBase):
|
|
|
|
def setUp(self):
|
|
|
|
self.expected_error_code = 8
|
|
|
|
self.add_process(get_owner_result=self.expected_error_code)
|
|
|
|
|
|
|
|
def test_error_logged_if_process_get_owner_fails(self):
|
|
|
|
with patch('salt.modules.win_status.log') as log:
|
|
|
|
self.call_procs()
|
|
|
|
log.warning.assert_called_once_with(ANY)
|
2013-06-24 22:53:59 +00:00
|
|
|
self.assertIn(
|
|
|
|
str(self.expected_error_code),
|
|
|
|
log.warning.call_args[0][0]
|
|
|
|
)
|
2012-12-31 00:34:38 +00:00
|
|
|
|
|
|
|
|
2012-12-31 10:16:50 +00:00
|
|
|
class TestEmptyCommandLine(TestProcsBase):
|
|
|
|
def setUp(self):
|
|
|
|
self.expected_error_code = 8
|
|
|
|
pid = 100
|
|
|
|
self.add_process(pid=pid, cmd=None)
|
|
|
|
self.call_procs()
|
|
|
|
self.proc = self.result[pid]
|
|
|
|
|
|
|
|
def test_cmd_is_empty_string(self):
|
|
|
|
self.assertEqual(self.proc['cmd'], '')
|
|
|
|
|
|
|
|
|
2013-02-17 12:05:11 +00:00
|
|
|
#class TestProcsComInitialization(TestProcsBase):
|
|
|
|
# def setUp(self):
|
|
|
|
# call_count = 5
|
|
|
|
# for _ in range(call_count):
|
|
|
|
# self.call_procs()
|
|
|
|
# self.expected_calls = [call()] * call_count
|
|
|
|
#
|
2013-05-01 23:06:17 +00:00
|
|
|
# def test_initialize_and_uninitialize_called(self):
|
2013-02-17 12:05:11 +00:00
|
|
|
# pythoncom.CoInitialize.assert_has_calls(self.expected_calls)
|
|
|
|
# pythoncom.CoUninitialize.assert_has_calls(self.expected_calls)
|
2012-12-31 10:29:08 +00:00
|
|
|
|
|
|
|
|
2013-06-24 22:53:59 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
from integration import run_tests
|
|
|
|
run_tests(
|
|
|
|
[
|
|
|
|
TestProcsCount,
|
|
|
|
TestProcsAttributes,
|
|
|
|
TestProcsUnicodeAttributes,
|
|
|
|
TestProcsWMIGetOwnerErrorsAreLogged,
|
|
|
|
TestProcsWMIGetOwnerAccessDeniedWorkaround,
|
|
|
|
],
|
|
|
|
needs_daemon=False
|
|
|
|
)
|