mirror of
https://github.com/valitydev/osquery-1.git
synced 2024-11-07 18:08:53 +00:00
151 lines
5.0 KiB
Python
Executable File
151 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2014-present, Facebook, Inc.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the BSD-style license found in the
|
|
# LICENSE file in the root directory of this source tree. An additional grant
|
|
# of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
|
|
import sys
|
|
import shutil
|
|
import time
|
|
import argparse
|
|
import subprocess
|
|
import tempfile
|
|
from threading import Thread
|
|
|
|
try:
|
|
from utils import *
|
|
except ImportError:
|
|
print("Cannot import osquery testing utils from ./tools/tests")
|
|
exit(1)
|
|
|
|
|
|
def run_daemon(proc, output):
|
|
output[proc.pid] = profile_cmd("", proc=proc)
|
|
|
|
|
|
def audit(args):
|
|
def _run_procs(start):
|
|
procs = []
|
|
for i in range(3):
|
|
for j in range(100):
|
|
procs.append(subprocess.Popen("sleep %d" % 1,
|
|
shell=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE))
|
|
if not args["stat"]:
|
|
print("Finished launching processes: duration %6.4fs" % (
|
|
time.time() - start))
|
|
for p in procs:
|
|
p.communicate()
|
|
|
|
proc = None
|
|
thread = None
|
|
results = {}
|
|
if not args["baseline"]:
|
|
# Start a daemon, which will modify audit rules
|
|
test = args["run"]
|
|
if "args" in args:
|
|
test += " %s" % (args["args"])
|
|
dbpath = tempfile.mkdtemp()
|
|
test += " --database_path=%s" % (dbpath)
|
|
proc = subprocess.Popen(test,
|
|
shell=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE)
|
|
if not args["stat"]:
|
|
thread = Thread(target=run_daemon, args=(proc, results,))
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
# Run test applications to stress the audting (a fork bomb)
|
|
start_time = time.time()
|
|
_run_procs(start_time)
|
|
end_time = time.time()
|
|
|
|
# Clean up
|
|
if not args["baseline"]:
|
|
proc.kill()
|
|
shutil.rmtree(dbpath)
|
|
if not args["stat"]:
|
|
thread.join()
|
|
if proc.pid in results:
|
|
print("cpu: %6.2f, memory: %d, util: %6.2f" % (
|
|
results[proc.pid]["cpu_time"],
|
|
results[proc.pid]["memory"],
|
|
results[proc.pid]["utilization"]))
|
|
pass
|
|
return end_time - start_time
|
|
|
|
|
|
def single(args):
|
|
start_time = time.time()
|
|
if ARGS.verbose:
|
|
proc = subprocess.Popen(args, shell=True)
|
|
else:
|
|
proc = subprocess.Popen(args,
|
|
shell=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE)
|
|
if ARGS.verbose:
|
|
print("PID: %d" % (proc.pid))
|
|
stdout, stderr = proc.communicate()
|
|
end_time = time.time() - start_time
|
|
if proc.returncode is not 0:
|
|
if not ARGS.verbose:
|
|
print(stdout)
|
|
print(stderr)
|
|
print("%s Test failed. (total %6.4fs)" % (
|
|
red("FAILED"), end_time))
|
|
sys.exit(proc.returncode)
|
|
return end_time
|
|
|
|
|
|
def stress(args):
|
|
"""Small utility to run unittests several times."""
|
|
times = []
|
|
test = args["run"] if args["run"] is not None else ["make", "test"]
|
|
for i in xrange(args["num"]):
|
|
if args["audit"]:
|
|
times.append(audit(args))
|
|
else:
|
|
times.append(single(test))
|
|
if args["stat"]:
|
|
print("%6.4f" % (times[-1]))
|
|
else:
|
|
print("%s Tests passed (%d/%d) rounds. (average %6.4fs) " % (
|
|
green("PASSED"), i + 1, args["num"], sum(times) / len(times)))
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run tests many times")
|
|
parser.add_argument("-n", "--num", type=int, default=50,
|
|
help="Number of times to run tests")
|
|
parser.add_argument("-A", "--audit", action="store_true", default=False,
|
|
help="Perform exec/process auditing stress tests")
|
|
parser.add_argument("--baseline", action="store_true", default=False,
|
|
help="Run baselines when stressing auditing")
|
|
parser.add_argument("--args", default="",
|
|
help="Arguments to pass to test binary")
|
|
parser.add_argument("--stat", action="store_true", default=False,
|
|
help="Only print numerical values")
|
|
parser.add_argument("--verbose", action="store_true", default=False,
|
|
help="Do not consume stderr/stdout")
|
|
parser.add_argument("run", nargs="?", help="Run specific test binary")
|
|
ARGS = parser.parse_args()
|
|
|
|
# A baseline was requested, first run baselines then normal.
|
|
if ARGS.baseline:
|
|
print("Running baseline tests...")
|
|
stress(vars(ARGS))
|
|
ARGS.baseline = False
|
|
print("Finished. Running tests...")
|
|
|
|
stress(vars(ARGS))
|