mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 01:18:58 +00:00
626 lines
22 KiB
Python
626 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import absolute_import, unicode_literals
|
|
import textwrap
|
|
import subprocess
|
|
import socket
|
|
import inspect
|
|
import io
|
|
|
|
# Service manager imports
|
|
import sys
|
|
import os
|
|
import logging
|
|
import threading
|
|
import traceback
|
|
import time
|
|
|
|
import yaml
|
|
from tests.support.case import ModuleCase
|
|
from tests.support.mock import Mock
|
|
from tests.support.paths import CODE_DIR
|
|
from tests.support.unit import skipIf
|
|
|
|
from tests.support.helpers import (
|
|
with_system_user,
|
|
)
|
|
import salt.utils.files
|
|
import salt.utils.win_runas
|
|
import salt.ext.six
|
|
|
|
try:
|
|
import win32service
|
|
import win32serviceutil
|
|
import win32event
|
|
import servicemanager
|
|
import win32api
|
|
CODE_DIR = win32api.GetLongPathName(CODE_DIR)
|
|
HAS_WIN32 = True
|
|
except ImportError:
|
|
# Mock win32serviceutil object to avoid
|
|
# a stacktrace in the _ServiceManager class
|
|
win32serviceutil = Mock()
|
|
HAS_WIN32 = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PASSWORD = 'P@ssW0rd'
|
|
NOPRIV_STDERR = 'ERROR: Logged-on user does not have administrative privilege.\n'
|
|
PRIV_STDOUT = (
|
|
'\nINFO: The system global flag \'maintain objects list\' needs\n '
|
|
'to be enabled to see local opened files.\n See Openfiles '
|
|
'/? for more information.\n\n\nFiles opened remotely via local share '
|
|
'points:\n---------------------------------------------\n\n'
|
|
'INFO: No shared open files found.\n'
|
|
)
|
|
RUNAS_PATH = os.path.abspath(os.path.join(CODE_DIR, 'runas.py'))
|
|
RUNAS_OUT = os.path.abspath(os.path.join(CODE_DIR, 'runas.out'))
|
|
|
|
|
|
def default_target(service, *args, **kwargs):
|
|
while service.active:
|
|
time.sleep(service.timeout)
|
|
|
|
|
|
class _ServiceManager(win32serviceutil.ServiceFramework):
|
|
'''
|
|
A windows service manager
|
|
'''
|
|
|
|
_svc_name_ = "Service Manager"
|
|
_svc_display_name_ = "Service Manager"
|
|
_svc_description_ = "A Service Manager"
|
|
run_in_foreground = False
|
|
target = default_target
|
|
|
|
def __init__(self, args, target=None, timeout=60, active=True):
|
|
win32serviceutil.ServiceFramework.__init__(self, args)
|
|
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
|
|
self.timeout = timeout
|
|
self.active = active
|
|
if target is not None:
|
|
self.target = target
|
|
|
|
@classmethod
|
|
def log_error(cls, msg):
|
|
if cls.run_in_foreground:
|
|
logger.error(msg)
|
|
servicemanager.LogErrorMsg(msg)
|
|
|
|
@classmethod
|
|
def log_info(cls, msg):
|
|
if cls.run_in_foreground:
|
|
logger.info(msg)
|
|
servicemanager.LogInfoMsg(msg)
|
|
|
|
@classmethod
|
|
def log_exception(cls, msg):
|
|
if cls.run_in_foreground:
|
|
logger.exception(msg)
|
|
exc_info = sys.exc_info()
|
|
tb = traceback.format_tb(exc_info[2])
|
|
servicemanager.LogErrorMsg("{} {} {}".format(msg, exc_info[1], tb))
|
|
|
|
@property
|
|
def timeout_ms(self):
|
|
return self.timeout * 1000
|
|
|
|
def SvcStop(self):
|
|
"""
|
|
Stop the service by; terminating any subprocess call, notify
|
|
windows internals of the stop event, set the instance's active
|
|
attribute to 'False' so the run loops stop.
|
|
"""
|
|
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
|
|
win32event.SetEvent(self.hWaitStop)
|
|
self.active = False
|
|
|
|
def SvcDoRun(self):
|
|
"""
|
|
Run the monitor in a separete thread so the main thread is
|
|
free to react to events sent to the windows service.
|
|
"""
|
|
servicemanager.LogMsg(
|
|
servicemanager.EVENTLOG_INFORMATION_TYPE,
|
|
servicemanager.PYS_SERVICE_STARTED,
|
|
(self._svc_name_, ''),
|
|
)
|
|
self.log_info("Starting Service {}".format(self._svc_name_))
|
|
monitor_thread = threading.Thread(target=self.target_thread)
|
|
monitor_thread.start()
|
|
while self.active:
|
|
rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout_ms)
|
|
if rc == win32event.WAIT_OBJECT_0:
|
|
# Stop signal encountered
|
|
self.log_info("Stopping Service")
|
|
break
|
|
if not monitor_thread.is_alive():
|
|
self.log_info("Update Thread Died, Stopping Service")
|
|
break
|
|
|
|
def target_thread(self, *args, **kwargs):
|
|
"""
|
|
Target Thread, handles any exception in the target method and
|
|
logs them.
|
|
"""
|
|
self.log_info("Monitor")
|
|
try:
|
|
self.target(self, *args, **kwargs)
|
|
except Exception as exc:
|
|
# TODO: Add traceback info to windows event log objects
|
|
self.log_exception("Exception In Target")
|
|
|
|
@classmethod
|
|
def install(cls, username=None, password=None, start_type=None):
|
|
if hasattr(cls, '_svc_reg_class_'):
|
|
svc_class = cls._svc_reg_class_
|
|
else:
|
|
svc_class = win32serviceutil.GetServiceClassString(cls)
|
|
win32serviceutil.InstallService(
|
|
svc_class,
|
|
cls._svc_name_,
|
|
cls._svc_display_name_,
|
|
description=cls._svc_description_,
|
|
userName=username,
|
|
password=password,
|
|
startType=start_type,
|
|
)
|
|
|
|
@classmethod
|
|
def remove(cls):
|
|
win32serviceutil.RemoveService(
|
|
cls._svc_name_
|
|
)
|
|
|
|
@classmethod
|
|
def start(cls):
|
|
win32serviceutil.StartService(
|
|
cls._svc_name_
|
|
)
|
|
|
|
@classmethod
|
|
def restart(cls):
|
|
win32serviceutil.RestartService(
|
|
cls._svc_name_
|
|
)
|
|
|
|
@classmethod
|
|
def stop(cls):
|
|
win32serviceutil.StopService(
|
|
cls._svc_name_
|
|
)
|
|
|
|
|
|
def service_class_factory(cls_name, name, target=default_target, display_name='', description='', run_in_foreground=False):
|
|
frm = inspect.stack()[1]
|
|
mod = inspect.getmodule(frm[0])
|
|
if salt.ext.six.PY2:
|
|
cls_name = cls_name.encode()
|
|
return type(
|
|
cls_name,
|
|
(_ServiceManager, object),
|
|
{
|
|
'__module__': mod.__name__,
|
|
'_svc_name_': name,
|
|
'_svc_display_name_': display_name or name,
|
|
'_svc_description_': description,
|
|
'run_in_foreground': run_in_foreground,
|
|
'target': target,
|
|
},
|
|
)
|
|
|
|
|
|
if HAS_WIN32:
|
|
test_service = service_class_factory('test_service', 'test service')
|
|
|
|
|
|
SERVICE_SOURCE = '''
|
|
from __future__ import absolute_import, unicode_literals
|
|
import logging
|
|
logger = logging.getLogger()
|
|
logging.basicConfig(level=logging.DEBUG, format="%(message)s")
|
|
|
|
from tests.integration.utils.test_win_runas import service_class_factory
|
|
import salt.utils.win_runas
|
|
import sys
|
|
import yaml
|
|
|
|
OUTPUT = {}
|
|
USERNAME = '{}'
|
|
PASSWORD = '{}'
|
|
|
|
|
|
def target(service, *args, **kwargs):
|
|
service.log_info("target start")
|
|
if PASSWORD:
|
|
ret = salt.utils.win_runas.runas(
|
|
'cmd.exe /C OPENFILES',
|
|
username=USERNAME,
|
|
password=PASSWORD,
|
|
)
|
|
else:
|
|
ret = salt.utils.win_runas.runas(
|
|
'cmd.exe /C OPENFILES',
|
|
username=USERNAME,
|
|
)
|
|
|
|
service.log_info("win_runas returned %s" % ret)
|
|
with open(OUTPUT, 'w') as fp:
|
|
yaml.dump(ret, fp)
|
|
service.log_info("target stop")
|
|
|
|
|
|
# This class will get imported and run as the service
|
|
test_service = service_class_factory('test_service', 'test service', target=target)
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
test_service.stop()
|
|
except Exception as exc:
|
|
logger.debug("stop service failed, this is ok.")
|
|
try:
|
|
test_service.remove()
|
|
except Exception as exc:
|
|
logger.debug("remove service failed, this os ok.")
|
|
test_service.install()
|
|
sys.exit(0)
|
|
'''
|
|
|
|
|
|
def wait_for_service(name, timeout=200):
|
|
start = time.time()
|
|
while True:
|
|
status = win32serviceutil.QueryServiceStatus(name)
|
|
if status[1] == win32service.SERVICE_STOPPED:
|
|
break
|
|
if time.time() - start > timeout:
|
|
raise TimeoutError("Timeout waiting for service") # pylint: disable=undefined-variable
|
|
|
|
time.sleep(.3)
|
|
|
|
|
|
@skipIf(not HAS_WIN32, 'This test runs only on windows.')
|
|
class RunAsTest(ModuleCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(RunAsTest, cls).setUpClass()
|
|
cls.hostname = socket.gethostname()
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas(self, username):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, PASSWORD)
|
|
self.assertEqual(ret['stdout'], '')
|
|
self.assertEqual(ret['stderr'], NOPRIV_STDERR)
|
|
self.assertEqual(ret['retcode'], 1)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_no_pass(self, username):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)
|
|
self.assertEqual(ret['stdout'], '')
|
|
self.assertEqual(ret['stderr'], NOPRIV_STDERR)
|
|
self.assertEqual(ret['retcode'], 1)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_admin(self, username):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, PASSWORD)
|
|
self.assertEqual(ret['stdout'], PRIV_STDOUT)
|
|
self.assertEqual(ret['stderr'], '')
|
|
self.assertEqual(ret['retcode'], 0)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_admin_no_pass(self, username):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)
|
|
self.assertEqual(ret['stdout'], PRIV_STDOUT)
|
|
self.assertEqual(ret['stderr'], '')
|
|
self.assertEqual(ret['retcode'], 0)
|
|
|
|
def test_runas_system_user(self):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'SYSTEM')
|
|
self.assertEqual(ret['stdout'], PRIV_STDOUT)
|
|
self.assertEqual(ret['stderr'], '')
|
|
self.assertEqual(ret['retcode'], 0)
|
|
|
|
def test_runas_network_service(self):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'NETWORK SERVICE')
|
|
self.assertEqual(ret['stdout'], '')
|
|
self.assertEqual(ret['stderr'], NOPRIV_STDERR)
|
|
self.assertEqual(ret['retcode'], 1)
|
|
|
|
def test_runas_local_service(self):
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'LOCAL SERVICE')
|
|
self.assertEqual(ret['stdout'], '')
|
|
self.assertEqual(ret['stderr'], NOPRIV_STDERR)
|
|
self.assertEqual(ret['retcode'], 1)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_winrs(self, username):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
password = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
|
|
'''.format(username, PASSWORD))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 1)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_winrs_no_pass(self, username):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
|
|
'''.format(username))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 1)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_winrs_admin(self, username):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
password = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
|
|
'''.format(username, PASSWORD))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 0)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_winrs_admin_no_pass(self, username):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
|
|
'''.format(username))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 0)
|
|
|
|
def test_runas_winrs_system_user(self):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'SYSTEM')['retcode'])
|
|
''')
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 0)
|
|
|
|
def test_runas_winrs_network_service_user(self):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'NETWORK SERVICE')['retcode'])
|
|
''')
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 1)
|
|
|
|
def test_runas_winrs_local_service_user(self):
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'LOCAL SERVICE')['retcode'])
|
|
''')
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
ret = subprocess.call("cmd.exe /C winrs /r:{} python {}".format(
|
|
self.hostname, RUNAS_PATH), shell=True)
|
|
self.assertEqual(ret, 1)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_powershell_remoting(self, username):
|
|
psrp_wrap = (
|
|
'powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}'
|
|
)
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
password = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
|
|
'''.format(username, PASSWORD))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
psrp_wrap.format(self.hostname, cmd),
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 1)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_powershell_remoting_no_pass(self, username):
|
|
psrp_wrap = (
|
|
'powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}'
|
|
)
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
|
|
'''.format(username))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
psrp_wrap.format(self.hostname, cmd),
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 1)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_powershell_remoting_admin(self, username):
|
|
psrp_wrap = (
|
|
'powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}; exit $LASTEXITCODE'
|
|
)
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
password = '{}'
|
|
ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)
|
|
sys.exit(ret['retcode'])
|
|
'''.format(username, PASSWORD))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}; exit $LASTEXITCODE'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
psrp_wrap.format(self.hostname, cmd),
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_powershell_remoting_admin_no_pass(self, username):
|
|
psrp_wrap = (
|
|
'powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}; exit $LASTEXITCODE'
|
|
)
|
|
runaspy = textwrap.dedent('''
|
|
import sys
|
|
import salt.utils.win_runas
|
|
username = '{}'
|
|
sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
|
|
'''.format(username))
|
|
with salt.utils.files.fopen(RUNAS_PATH, 'w') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}; exit $LASTEXITCODE'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
psrp_wrap.format(self.hostname, cmd),
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_service(self, username, timeout=200):
|
|
if os.path.exists(RUNAS_OUT):
|
|
os.remove(RUNAS_OUT)
|
|
assert not os.path.exists(RUNAS_OUT)
|
|
runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, PASSWORD)
|
|
with io.open(RUNAS_PATH, 'w', encoding='utf-8') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
cmd,
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
win32serviceutil.StartService('test service')
|
|
wait_for_service('test service')
|
|
with salt.utils.files.fopen(RUNAS_OUT, 'r') as fp:
|
|
ret = yaml.load(fp)
|
|
assert ret['retcode'] == 1, ret
|
|
|
|
@with_system_user('test-runas', on_existing='delete', delete=True,
|
|
password=PASSWORD)
|
|
def test_runas_service_no_pass(self, username, timeout=200):
|
|
if os.path.exists(RUNAS_OUT):
|
|
os.remove(RUNAS_OUT)
|
|
assert not os.path.exists(RUNAS_OUT)
|
|
runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, '')
|
|
with io.open(RUNAS_PATH, 'w', encoding='utf-8') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
cmd,
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
win32serviceutil.StartService('test service')
|
|
wait_for_service('test service')
|
|
with salt.utils.files.fopen(RUNAS_OUT, 'r') as fp:
|
|
ret = yaml.load(fp)
|
|
assert ret['retcode'] == 1, ret
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_service_admin(self, username, timeout=200):
|
|
if os.path.exists(RUNAS_OUT):
|
|
os.remove(RUNAS_OUT)
|
|
assert not os.path.exists(RUNAS_OUT)
|
|
runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, PASSWORD)
|
|
with io.open(RUNAS_PATH, 'w', encoding='utf-8') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
cmd,
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
win32serviceutil.StartService('test service')
|
|
wait_for_service('test service')
|
|
with salt.utils.files.fopen(RUNAS_OUT, 'r') as fp:
|
|
ret = yaml.load(fp)
|
|
assert ret['retcode'] == 0, ret
|
|
|
|
@with_system_user('test-runas-admin', on_existing='delete', delete=True,
|
|
password=PASSWORD, groups=['Administrators'])
|
|
def test_runas_service_admin_no_pass(self, username, timeout=200):
|
|
if os.path.exists(RUNAS_OUT):
|
|
os.remove(RUNAS_OUT)
|
|
assert not os.path.exists(RUNAS_OUT)
|
|
runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, '')
|
|
with io.open(RUNAS_PATH, 'w', encoding='utf-8') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
cmd,
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
win32serviceutil.StartService('test service')
|
|
wait_for_service('test service')
|
|
with salt.utils.files.fopen(RUNAS_OUT, 'r') as fp:
|
|
ret = yaml.load(fp)
|
|
assert ret['retcode'] == 0, ret
|
|
|
|
def test_runas_service_system_user(self):
|
|
if os.path.exists(RUNAS_OUT):
|
|
os.remove(RUNAS_OUT)
|
|
assert not os.path.exists(RUNAS_OUT)
|
|
runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), 'SYSTEM', '')
|
|
with io.open(RUNAS_PATH, 'w', encoding='utf-8') as fp:
|
|
fp.write(runaspy)
|
|
cmd = 'python.exe {}'.format(RUNAS_PATH)
|
|
ret = subprocess.call(
|
|
cmd,
|
|
shell=True
|
|
)
|
|
self.assertEqual(ret, 0)
|
|
win32serviceutil.StartService('test service')
|
|
wait_for_service('test service')
|
|
with salt.utils.files.fopen(RUNAS_OUT, 'r') as fp:
|
|
ret = yaml.load(fp)
|
|
assert ret['retcode'] == 0, ret
|