windows: Adding osqueryi and osqueryd integration tests (#3479)

This commit is contained in:
Nick Anderson 2017-07-21 11:20:56 -07:00 committed by GitHub
parent 891a6fb17a
commit dfbcd50737
8 changed files with 333 additions and 119 deletions

View File

@ -67,7 +67,7 @@ macro(ADD_DEFAULT_LINKS TARGET ADDITIONAL)
endmacro()
macro(ADD_OSQUERY_PYTHON_TEST TEST_NAME SOURCE)
if(NOT DEFINED ENV{SKIP_INTEGRATION_TESTS} AND NOT WINDOWS)
if(NOT DEFINED ENV{SKIP_INTEGRATION_TESTS})
add_test(NAME python_${TEST_NAME}
COMMAND ${PYTHON_EXECUTABLE} "${CMAKE_SOURCE_DIR}/tools/tests/${SOURCE}"
--build "${CMAKE_BINARY_DIR}"

View File

@ -609,8 +609,12 @@ void Initializer::start() const {
if (!s.ok()) {
auto severity = (Watcher::get().hasManagedExtensions()) ? google::GLOG_ERROR
: google::GLOG_INFO;
google::LogMessage(__FILE__, __LINE__, severity).stream()
<< "Cannot start extension manager: " + s.getMessage();
if (severity == google::GLOG_INFO) {
VLOG(1) << "Cannot start extension manager: " + s.getMessage();
} else {
google::LogMessage(__FILE__, __LINE__, severity).stream()
<< "Cannot start extension manager: " + s.getMessage();
}
}
// Then set the config plugin, which uses a single/active plugin.

View File

@ -20,5 +20,5 @@ for %%t in (osquery_tests,osquery_additional_tests,osquery_tables_tests) do (
cmake --build . --target %%t --config Release -- /verbosity:minimal /maxcpucount
if errorlevel 1 goto end
)
ctest --output-on-failure
ctest -C Release --output-on-failure
:end

View File

@ -1,9 +1,11 @@
# List of python modules containing unittests
ADD_OSQUERY_PYTHON_TEST(test_osqueryi test_osqueryi.py)
ADD_OSQUERY_PYTHON_TEST(test_osqueryd test_osqueryd.py)
ADD_OSQUERY_PYTHON_TEST(test_extensions test_extensions.py)
ADD_OSQUERY_PYTHON_TEST(test_additional test_additional.py)
ADD_OSQUERY_PYTHON_TEST(test_example_queries test_example_queries.py)
if(NOT WINDOWS)
ADD_OSQUERY_PYTHON_TEST(test_extensions test_extensions.py)
ADD_OSQUERY_PYTHON_TEST(test_additional test_additional.py)
ADD_OSQUERY_PYTHON_TEST(test_example_queries test_example_queries.py)
endif()
# If the build hosts are building release packages
if(DEFINED ENV{RUN_RELEASE_TESTS})

View File

@ -14,6 +14,7 @@ from __future__ import print_function
# from __future__ import unicode_literals
import copy
import getpass
import os
import psutil
import random
@ -21,25 +22,43 @@ import re
import signal
import subprocess
import sys
import tempfile
import time
import threading
import unittest
import utils
import pexpect
import timeout_decorator
# TODO: Find an implementation that will work for Windows, for now, disable.
# https://goo.gl/T4AgV5
if os.name == "nt":
# We redefine timeout_decorator on windows
class timeout_decorator:
@staticmethod
def timeout(*args, **kwargs):
# return a no-op decorator
return lambda f: f
else:
import timeout_decorator
# We use a generic 'expect' style subprocess manager on Windows
if os.name == "nt":
from winexpect import REPLWrapper, WinExpectSpawn
else:
import pexpect
# While this path can be variable, in practice is lives statically.
OSQUERY_DEPENDENCIES = os.getenv('OSQUERY_DEPS', "/usr/local/osquery")
OSQUERY_DEPENDENCIES = os.getenv("OSQUERY_DEPS", "/usr/local/osquery")
sys.path = [OSQUERY_DEPENDENCIES + "/lib/python2.7/site-packages"] + sys.path
try:
from pexpect.replwrap import REPLWrapper
except ImportError as e:
print("Could not import pexpect.replwrap: %s" % (str(e)))
print(" Need pexpect version 3.3, installed version: %s" % (
str(pexpect.__version__)))
print(" pexpect location: %s" % (str(pexpect.__file__)))
exit(1)
if os.name != "nt":
try:
from pexpect.replwrap import REPLWrapper
except ImportError as e:
print("Could not import pexpect.replwrap: %s" % (str(e)))
print(" Need pexpect version 3.3, installed version: %s" %
(str(pexpect.__version__)))
print(" pexpect location: %s" % (str(pexpect.__file__)))
exit(1)
try:
import argparse
@ -57,24 +76,45 @@ except ImportError as e:
print(str(e))
exit(1)
def getUserId():
if os.name == "nt":
return getpass.getuser()
return "%d" % os.getuid()
'''Defaults that should be used in integration tests.'''
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_DIR = "/tmp/osquery-tests-python%d/" % (os.getuid())
CONFIG_NAME = CONFIG_DIR + "tests"
CONFIG_DIR = os.path.join(tempfile.gettempdir(),
"osquery-tests-python-%s" % (getUserId()))
CONFIG_NAME = os.path.join(CONFIG_DIR, "tests")
DEFAULT_CONFIG = {
"options": {
"flagfile": "/dev/null",
"database_path": "%s.db" % CONFIG_NAME,
"pidfile": "%s.pid" % CONFIG_NAME,
"config_path": "%s.conf" % CONFIG_NAME,
"extensions_autoload": "/dev/null",
"extensions_socket": "%s.em" % CONFIG_NAME,
"extensions_interval": "1",
"extensions_timeout": "0",
"watchdog_level": "3",
"disable_logging": "true",
"disable_events": "true",
"force": "true",
"flagfile":
"/dev/null" if os.name == "posix" else "",
"database_path":
"%s.db" % CONFIG_NAME,
"pidfile":
"%s.pid" % CONFIG_NAME,
"config_path":
"%s.conf" % CONFIG_NAME,
"extensions_autoload":
"/dev/null" if os.name == "posix" else "",
"extensions_socket":
"%s.em" % (CONFIG_NAME
if os.name == "posix" else "\\\\.\\pipe\\tests"),
"extensions_interval":
"1",
"extensions_timeout":
"0",
"watchdog_level":
"3",
"disable_logging":
"true",
"disable_events":
"true",
"force":
"true",
},
"schedule": {},
}
@ -99,7 +139,7 @@ class OsqueryWrapper(REPLWrapper):
'''A pexpect wrapper intended for interacting with the osqueryi REPL'''
PROMPT = u'osquery> '
CONTINUATION_PROMPT = u' ...> '
ERROR_PREFIX = 'Error:'
ERROR_PREFIX = u'Error:'
def __init__(self, command='../osqueryi', args={}, env={}):
global CONFIG_NAME, CONFIG
@ -107,9 +147,13 @@ class OsqueryWrapper(REPLWrapper):
for option in args.keys():
options[option] = args[option]
options["database_path"] += str(random.randint(1000, 9999))
command = command + " " + " ".join(["--%s=%s" % (k, v) for
k, v in options.iteritems()])
proc = pexpect.spawn(command, env=env)
command = command + " " + " ".join(
["--%s=%s" % (k, v) for k, v in options.iteritems()])
if os.name == "nt":
proc = WinExpectSpawn(command, env=env)
else:
proc = pexpect.spawn(command, env=env)
super(OsqueryWrapper, self).__init__(
proc,
self.PROMPT,
@ -241,15 +285,16 @@ class ProcRunner(object):
def kill(self, children=False):
self.requireStarted()
sig = signal.SIGINT if os.name == "nt" else signal.SIGKILL
if children:
for child in self.getChildren():
try:
os.kill(child, 9)
os.kill(child, sig)
except:
pass
if self.proc:
try:
os.kill(self.pid, 9)
os.kill(self.pid, sig)
except:
pass
self.proc = None
@ -288,6 +333,29 @@ class ProcRunner(object):
return False
def getLatestOsqueryBinary(binary):
if os.name == "posix":
return os.path.join(ARGS.build, "osquery", binary)
release_path = os.path.abspath(
os.path.join(ARGS.build, "osquery", "Release", "{}.exe".format(binary)))
relwithdebinfo_path = os.path.abspath(
os.path.join(ARGS.build, "osquery", "RelWithDebInfo", "{}.exe".format(binary)))
if os.path.exists(release_path) and os.path.exists(relwithdebinfo_path):
if os.stat(release_path).st_mtime > os.stat(
relwithdebinfo_path).st_mtime:
return release_path
else:
return relwithdebinfo_path
elif os.path.exists(release_path):
return release_path
elif os.path.exists(relwithdebinfo_path):
return relwithdebinfo_path
else:
return None
class ProcessGenerator(object):
'''Helper methods to patch into a unittest'''
generators = []
@ -295,14 +363,17 @@ class ProcessGenerator(object):
def setUp(self):
utils.reset_dir(CONFIG_DIR)
def _run_daemon(self, options={}, silent=False, options_only={},
def _run_daemon(self,
options={},
silent=False,
options_only={},
overwrite={}):
'''Spawn an osquery daemon process'''
global ARGS, CONFIG_NAME, CONFIG
config = copy.deepcopy(CONFIG)
config["options"]["database_path"] += str(random.randint(1000, 9999))
config["options"][
"extensions_socket"] += str(random.randint(1000, 9999))
config["options"]["extensions_socket"] += str(
random.randint(1000, 9999))
for option in options.keys():
config["options"][option] = options[option]
flags = ["--%s=%s" % (k, v) for k, v in config["options"].items()]
@ -311,7 +382,7 @@ class ProcessGenerator(object):
for key in overwrite:
config[key] = overwrite[key]
utils.write_config(config)
binary = os.path.join(ARGS.build, "osquery", "osqueryd")
binary = getLatestOsqueryBinary('osqueryd')
daemon = ProcRunner("daemon", binary, flags, silent=silent)
daemon.options = config["options"]
@ -322,21 +393,20 @@ class ProcessGenerator(object):
'''Spawn an osquery extension (example_extension)'''
global ARGS, CONFIG
config = copy.deepcopy(CONFIG)
config["options"][
"extensions_socket"] += str(random.randint(1000, 9999))
config["options"]["extensions_socket"] += str(
random.randint(1000, 9999))
binary = os.path.join(ARGS.build, "osquery", "example_extension.ext")
if path is not None:
config["options"]["extensions_socket"] = path
extension = ProcRunner("extension",
binary,
[
"--socket=%s" % config["options"][
"extensions_socket"],
"--verbose" if not silent else "",
"--timeout=%d" % timeout,
"--interval=%d" % 0,
],
silent=silent)
extension = ProcRunner(
"extension",
binary, [
"--socket=%s" % config["options"]["extensions_socket"],
"--verbose" if not silent else "",
"--timeout=%d" % timeout,
"--interval=%d" % 0,
],
silent=silent)
self.generators.append(extension)
extension.options = config["options"]
return extension
@ -348,10 +418,11 @@ class ProcessGenerator(object):
Unittest should stop processes they generate, but on failure the
tearDown method will cleanup.
'''
sig = signal.SIGINT if os.name == "nt" else signal.SIGKILL
for generator in self.generators:
if generator.pid is not None:
try:
os.kill(generator.pid, signal.SIGKILL)
os.kill(generator.pid, sig)
except Exception as e:
pass
@ -411,13 +482,13 @@ class EXClient(object):
def getEM(self):
'''Return an extension manager (osquery core) client.'''
if self._manager is None:
raise(Exception, "The EXClient must be 'setUp' with a manager")
raise (Exception, "The EXClient must be 'setUp' with a manager")
return self._manager.Client(self.protocol)
def getEX(self):
'''Return an extension (osquery extension) client.'''
if self._client is None:
raise(Exception, "The EXClient must be 'setUp' with a client")
raise (Exception, "The EXClient must be 'setUp' with a client")
return self._client.Client(self.protocol)
@ -426,7 +497,8 @@ class Autoloader(object):
def __init__(self, autoloads=[]):
global CONFIG_DIR
self.path = CONFIG_DIR + "ext.load" + str(random.randint(1000, 9999))
self.path = os.path.join(CONFIG_DIR,
"ext.load" + str(random.randint(1000, 9999)))
with open(self.path, "w") as fh:
fh.write("\n".join(autoloads))
@ -438,13 +510,11 @@ class Autoloader(object):
class TimeoutRunner(object):
def __init__(self, cmd=[], timeout_sec=1):
self.stdout = None
self.stderr = None
self.proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
kill_proc = lambda p: p.kill()
timer = threading.Timer(timeout_sec, kill_proc, [self.proc])
timer.start()
@ -462,46 +532,47 @@ def flaky(gen):
import traceback
exc_type, exc_obj, tb = sys.exc_info()
exceptions.append(e)
print (traceback.format_tb(tb)[1])
print(traceback.format_tb(tb)[1])
return False
def wrapper(this):
for i in range(3):
if attempt(this):
return True
i = 1
for exc in exceptions:
print("Test (attempt %d) %s::%s failed: %s" % (
i,
this.__class__.__name__,
gen.__name__, str(exc[0])))
print("Test (attempt %d) %s::%s failed: %s" %
(i, this.__class__.__name__, gen.__name__, str(exc[0])))
i += 1
if len(exceptions) > 0:
raise exceptions[0]
return False
return wrapper
class Tester(object):
def __init__(self):
global ARGS, CONFIG, CONFIG_DIR
parser = argparse.ArgumentParser(description=(
"osquery python integration testing."
))
parser = argparse.ArgumentParser(
description=("osquery python integration testing."))
parser.add_argument(
"--config", metavar="FILE", default=None,
help="Use special options from a config."
)
"--config",
metavar="FILE",
default=None,
help="Use special options from a config.")
parser.add_argument(
"--verbose", default=False, action="store_true",
help="Run daemons and extensions with --verbose"
)
"--verbose",
default=False,
action="store_true",
help="Run daemons and extensions with --verbose")
# Directory structure options
parser.add_argument(
"--build", metavar="PATH", default=".",
help="Path to osquery build (./build/<sys>/)."
)
"--build",
metavar="PATH",
default=".",
help="Path to osquery build (./build/<sys>/).")
ARGS = parser.parse_args()
if not os.path.exists(ARGS.build):
@ -517,7 +588,8 @@ class Tester(object):
@timeout_decorator.timeout(20 * 60)
def run(self):
os.setpgrp()
if os.name == "posix":
os.setpgrp()
unittest_args = [sys.argv[0]]
if ARGS.verbose:
unittest_args += ["-v"]
@ -534,8 +606,8 @@ def expect(functional, expected, interval=0.01, timeout=4):
if len(result) == expected:
break
except Exception as e:
print("Expect exception (%s): %s not %s" % (
str(e), str(functional), expected))
print("Expect exception (%s): %s not %s" %
(str(e), str(functional), expected))
return None
if delay >= timeout:
return None
@ -545,7 +617,6 @@ def expect(functional, expected, interval=0.01, timeout=4):
class QueryTester(ProcessGenerator, unittest.TestCase):
def setUp(self):
self.binary = os.path.join(ARGS.build, "osquery", "osqueryi")
self.daemon = self._run_daemon({
@ -573,8 +644,8 @@ class QueryTester(ProcessGenerator, unittest.TestCase):
self.assertEqual(result.status.code, 0)
return result.response
except Exception as e:
print("General exception executing query: %s (%s)" % (
utils.lightred(query), str(e)))
print("General exception executing query: %s (%s)" %
(utils.lightred(query), str(e)))
raise e
def _execute_set(self, queries):
@ -618,6 +689,19 @@ def getTestDirectory(base):
return path
# Grab the latest info log
def getLatestInfoLog(base):
info_path = os.path.join(base, "osqueryd.INFO")
if os.name != "nt":
return info_path
query = "select path from file where path like '{}' ORDER BY mtime DESC LIMIT 1;".format(info_path+'%')
osqueryi = OsqueryWrapper(getLatestOsqueryBinary('osqueryi'))
results = osqueryi.run_query(query)
if len(results) > 0:
return results[0]["path"]
return ""
def loadThriftFromBuild(build_dir):
'''Find and import the thrift-generated python interface.'''
thrift_path = build_dir + "/generated/gen-py"

View File

@ -35,20 +35,23 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
@test_base.flaky
def test_2_daemon_with_option(self):
logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
daemon = self._run_daemon({
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
daemon = self._run_daemon(
{
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"verbose": True,
})
info_path = os.path.join(logger_path, "osqueryd.INFO")
"logger_path": logger_path,
"verbose": True,
})
info_path = test_base.getLatestInfoLog(logger_path)
self.assertTrue(daemon.isAlive())
def info_exists():
return os.path.exists(info_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
self.assertTrue(os.path.exists(info_path))
@ -60,6 +63,7 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
if os.environ.get('SANITIZE') is not None:
return
daemon = self._run_daemon({
"allow_unsafe": True,
"disable_watchdog": False,
"ephemeral": True,
"disable_database": True,
@ -115,8 +119,9 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
})
self.assertTrue(daemon.isAlive())
# Send a SIGHUP
os.kill(daemon.proc.pid, signal.SIGHUP)
# Send SIGHUP on posix. Windows does not have SIGHUP so we use SIGTERM
sig = signal.SIGHUP if os.name != "nt" else signal.SIGTERM
os.kill(daemon.proc.pid, sig)
self.assertTrue(daemon.isAlive())
@test_base.flaky
@ -133,28 +138,31 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
# Send a SIGINT
os.kill(daemon.pid, signal.SIGINT)
self.assertTrue(daemon.isDead(daemon.pid, 10))
self.assertTrue(daemon.retcode in [128 + signal.SIGINT, -2])
if os.name != "nt":
self.assertTrue(daemon.retcode in [128 + signal.SIGINT, -2])
@test_base.flaky
def test_6_logger_mode(self):
logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
test_mode = 0754 # Strange mode that should never exist
daemon = self._run_daemon({
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"logger_mode": test_mode,
"verbose": True,
})
info_path = os.path.join(logger_path, "osqueryd.INFO")
test_mode = 0754 # Strange mode that should never exist
daemon = self._run_daemon(
{
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"logger_mode": test_mode,
"verbose": True,
})
info_path = test_base.getLatestInfoLog(logger_path)
results_path = os.path.join(logger_path, "osqueryd.results.log")
self.assertTrue(daemon.isAlive())
def info_exists():
return os.path.exists(info_path)
def results_exists():
return os.path.exists(results_path)
@ -167,7 +175,8 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
self.assertTrue(os.path.exists(pth))
# Only apply the mode checks to .log files.
if pth.find('.log') > 0:
# TODO: Add ACL checks for Windows logs
if pth.find('.log') > 0 and os.name != "nt":
rpath = os.path.realpath(pth)
mode = os.stat(rpath).st_mode & 0777
self.assertEqual(mode, test_mode)
@ -186,10 +195,12 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
})
info_path = os.path.join(logger_path, "osqueryd.INFO")
def pathDoesntExist():
if os.path.exists(info_path):
return False
return True
self.assertTrue(daemon.isAlive())
self.assertTrue(pathDoesntExist())
daemon.kill()

View File

@ -15,7 +15,9 @@ from __future__ import print_function
import os
import random
import sys
import unittest
import utils
# osquery-specific testing utils
import test_base
@ -25,12 +27,13 @@ EXIT_CATASTROPHIC = 78
class OsqueryiTest(unittest.TestCase):
def setUp(self):
self.binary = os.path.join(test_base.ARGS.build, "osquery", "osqueryi")
self.osqueryi = test_base.OsqueryWrapper(self.binary)
self.binary = test_base.getLatestOsqueryBinary('osqueryi')
self.osqueryi = test_base.OsqueryWrapper(command=self.binary)
self.dbpath = "%s%s" % (
test_base.CONFIG["options"]["database_path"],
str(random.randint(1000, 9999)))
@unittest.skipIf(os.name == "nt", "stderr tests not supported on Windows.")
def test_error(self):
'''Test that we throw an error on bad query'''
self.osqueryi.run_command(' ')
@ -55,7 +58,7 @@ class OsqueryiTest(unittest.TestCase):
@test_base.flaky
def test_config_dump(self):
'''Test that config raw output is dumped when requested'''
config = "%s/test_noninline_packs.conf" % test_base.SCRIPT_DIR
config = os.path.join(test_base.SCRIPT_DIR, "test_noninline_packs.conf")
proc = test_base.TimeoutRunner([
self.binary,
"--config_dump",
@ -63,8 +66,14 @@ class OsqueryiTest(unittest.TestCase):
],
SHELL_TIMEOUT)
content = ""
with open(config, 'r') as fh: content = fh.read()
self.assertEqual(proc.stdout, "{\"%s\": %s}\n" % (config, content))
with open(config, 'r') as fh:
content = fh.read()
actual = proc.stdout
if os.name == "nt":
actual = actual.replace('\r', '')
self.assertEqual(actual, '{"%s": %s}\n' % (config, content))
print (proc.stderr)
self.assertEqual(proc.proc.poll(), 0)
@ -90,7 +99,7 @@ class OsqueryiTest(unittest.TestCase):
self.binary,
"--config_check",
"--database_path=%s" % (self.dbpath),
"--config_path=%s/test.badconfig" % test_base.SCRIPT_DIR
"--config_path=%s" % os.path.join(test_base.SCRIPT_DIR, "test.badconfig")
],
SHELL_TIMEOUT)
self.assertEqual(proc.proc.poll(), 1)
@ -114,11 +123,11 @@ class OsqueryiTest(unittest.TestCase):
@test_base.flaky
def test_config_check_example(self):
'''Test that the example config passes'''
example_path = "deployment/osquery.example.conf"
example_path = os.path.join("deployment", "osquery.example.conf")
proc = test_base.TimeoutRunner([
self.binary,
"--config_check",
"--config_path=%s/../%s" % (test_base.SCRIPT_DIR, example_path)
"--config_path=%s" % os.path.join(test_base.SCRIPT_DIR, "..", example_path)
],
SHELL_TIMEOUT)
self.assertEqual(proc.stdout, "")
@ -183,7 +192,9 @@ class OsqueryiTest(unittest.TestCase):
self.assertTrue(0 <= int(row['minutes']) <= 60)
self.assertTrue(0 <= int(row['seconds']) <= 60)
# TODO: Running foreign table tests as non-priv user fails
@test_base.flaky
@unittest.skipIf(os.name == "nt", "foreign table tests not supported on Windows.")
def test_foreign_tables(self):
'''Requires the --enable_foreign flag to add at least one table.'''
self.osqueryi.run_command(' ')
@ -195,6 +206,7 @@ class OsqueryiTest(unittest.TestCase):
osqueryi2 = test_base.OsqueryWrapper(self.binary,
args={"enable_foreign": True})
osqueryi2.run_command(' ')
# This execution fails if the user is not Administrator on Windows
result = osqueryi2.run_query(query)
after = int(result[0]['c'])
self.assertGreater(after, before)

101
tools/tests/winexpect.py Normal file
View File

@ -0,0 +1,101 @@
#!/usr/bin/env python
# Copyright (c) 2014-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
"""
A Windows specific implementation of REPLWrapper from pexpect.
As no good implementation exists, we roll our own generic class that
handles all of the necessary functionality for our integration tests to
run on Windows systems.
"""
import os
import shlex
import subprocess
import threading
import time
try:
from Queue import Queue, Empty
except ImportError:
# TODO: Get on all python3
from queue import Queue, Empty
class REPLWrapper(object):
def __init__(self,
proc,
orig_prompt,
prompt_change,
continuation_prompt='',
timeout=2):
self.child = proc
self.prompt = orig_prompt
self.prompt_change = prompt_change
self.continuation_prompt = continuation_prompt
self.timeout = timeout
# We currently only support 1 query at a time.
def run_command(self, command):
res = ''
command = command.strip()
if not command:
return res
try:
self.child.proc.stdin.write(command + '\r\n')
self.child.proc.stdin.flush()
# Wait for stderr/stdout to populate for at most timeout seconds
for i in xrange(self.timeout):
if not self.child.out_queue.empty():
break
time.sleep(1)
while not self.child.out_queue.empty():
l = self.child.out_queue.get_nowait()
res += l
except Exception as e:
print('[-] Failed to communicate with client: {}'.format(e))
return res
class WinExpectSpawn(object):
def __init__(self, command='', cwd=None, env=None):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs = dict(
bufsize=1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
env=env)
kwargs['startupinfo'] = si
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
argv = shlex.split(command, posix=False)
self.proc = subprocess.Popen(argv, **kwargs)
# Spawn a new thread for "non-blocking" reads.
self.out_queue = Queue()
self.stdout_thread = threading.Thread(
target=self.read_pipe, args=(self.proc.stdout, self.out_queue))
self.stdout_thread.daemon = True
self.stdout_thread.start()
# TODO: Figure out how to get stderr as well as stdout
#self.stderr_thread = threading.Thread(target=self.read_pipe,
# args=(self.proc.stderr, self.out_queue))
#self.stderr_thread.daemon = True
#self.stderr_thread.start()
# Thread worker used to insert stderr/stdout into a thread-safe queue
def read_pipe(self, out, queue):
for l in iter(out.readline, b''):
queue.put(l)
out.close()