Use the default shutdown flow within extensions

This commit is contained in:
Teddy Reed 2016-03-18 19:37:11 -07:00
parent 4609486f74
commit 15a998e54f
20 changed files with 203 additions and 132 deletions

View File

@ -162,7 +162,10 @@ elseif(DEFINED ENV{SANITIZE})
if(DEFINED ENV{SANITIZE_THREAD})
add_compile_options(-fsanitize=thread)
else()
add_compile_options(-fsanitize=leak -fsanitize=address)
add_compile_options(-fsanitize=address)
if (NOT APPLE)
add_compile_options(-fsanitize=leak)
endif()
endif()
set(SANITIZE_BLACKLIST "${CMAKE_SOURCE_DIR}/tools/tests/sanitize_blacklist.txt")
add_definitions(-fsanitize-blacklist=${SANITIZE_BLACKLIST})

View File

@ -10,6 +10,7 @@
#pragma once
#include <csignal>
#include <memory>
#include <mutex>
#include <string>
@ -82,6 +83,16 @@ using WriteLock = std::lock_guard<Mutex>;
/// The osquery tool type for runtime decisions.
extern ToolType kToolType;
/**
* @brief The requested exit code.
*
* Use Initializer::shutdown to request shutdown in most cases.
* This will raise a signal to the main thread requesting the dispatcher to
* interrupt all services. There is a thread requesting a join of all services
* that will continue the shutdown process.
*/
extern volatile std::sig_atomic_t kExitCode;
class Initializer : private boost::noncopyable {
public:
/**
@ -126,9 +137,31 @@ class Initializer : private boost::noncopyable {
/// Assume initialization finished, start work.
void start() const;
/// Turns off various aspects of osquery such as event loops.
void shutdown() const;
/**
* @brief Forcefully request the application stop.
*
* Since all osquery tools may implement various 'dispatched' services in the
* form of event handler threads or thrift service and client pools, a stop
* request should behave nicely and request these services stop.
*
* Use shutdown whenever you would normally call ::exit.
*
* @param retcode the requested return code for the process.
*/
static void requestShutdown(int retcode = EXIT_SUCCESS);
/// Exit immediately without requesting the dispatcher to stop.
static void shutdown(int retcode = EXIT_SUCCESS);
/**
* @brief Cleanly wait for all services and components to shutdown.
*
* Enter a join of all services followed by a sync wait for event loops.
* If the main thread is out of actions it can call ::waitForShutdown.
*/
static void waitForShutdown();
public:
/**
* @brief Check if a process is an osquery worker.
*
@ -151,9 +184,16 @@ class Initializer : private boost::noncopyable {
void initActivePlugin(const std::string& type, const std::string& name) const;
private:
/// A saved, mutable, reference to the process's argc.
int* argc_{nullptr};
/// A saved, mutable, reference to the process's argv.
char*** argv_{nullptr};
/// The deduced tool type, determined by initializer construction.
ToolType tool_;
/// The deduced program name determined by executing path.
std::string binary_;
};
@ -206,17 +246,6 @@ size_t getUnixTime();
*/
Status createPidFile();
/**
* @brief Forcefully request the application stop.
*
* Since all osquery tools may implement various 'dispatched' services in the
* form of event handler threads or thrift service and client pools, a stop
* request should behave nicely and request these services stop.
*
* Use shutdown whenever you would normally call ::exit.
*/
void shutdown(int recode, bool wait = false);
class DropPrivileges;
typedef std::shared_ptr<DropPrivileges> DropPrivilegesRef;

View File

@ -309,7 +309,9 @@ Status Config::load() {
content.first.c_str(),
content.second.c_str());
}
osquery::shutdown(EXIT_SUCCESS);
// Instead of forcing the shutdown, request one since the config plugin
// may have started services.
Initializer::requestShutdown();
}
status = update(response[0]);
}

View File

@ -9,16 +9,15 @@
*/
#include <chrono>
#include <csignal>
#include <iostream>
#include <random>
#include <thread>
#include <stdio.h>
#include <syslog.h>
#include <time.h>
#include <unistd.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/filesystem.hpp>
#include <osquery/config.h>
@ -95,8 +94,6 @@ enum {
/// Seconds to alarm and quit for non-responsive event loops.
#define SIGNAL_ALARM_TIMEOUT 4
namespace fs = boost::filesystem;
namespace {
extern "C" {
static inline bool hasWorkerVariable() {
@ -107,41 +104,51 @@ volatile std::sig_atomic_t kHandledSignal{0};
static inline bool isWatcher() { return (osquery::Watcher::getWorker() > 0); }
void signalHandler(int signal) {
void signalHandler(int num) {
// Inform exit status of main threads blocked by service joins.
if (kHandledSignal == 0) {
kHandledSignal = signal;
kHandledSignal = num;
// If no part of osquery requested an interruption then the exit 'wanted'
// code becomes the signal number.
if (num != SIGUSR1 && osquery::kExitCode == 0) {
// The only exception is SIGUSR1 which is used to signal the main thread
// to interrupt dispatched services.
osquery::kExitCode = 128 + num;
}
// Handle signals based on a tri-state (worker, watcher, neither).
if (num == SIGHUP) {
if (!isWatcher() || hasWorkerVariable()) {
// Reload configuration.
}
} else if (num == SIGTERM || num == SIGINT || num == SIGABRT ||
num == SIGUSR1) {
// Time to stop, set an upper bound time constraint on how long threads
// have to terminate (join). Publishers may be in 20ms or similar sleeps.
alarm(SIGNAL_ALARM_TIMEOUT);
// Restore the default signal handler.
std::signal(num, SIG_DFL);
// The watcher waits for the worker to die.
if (isWatcher()) {
// Bind the fate of the worker to this watcher.
osquery::Watcher::bindFates();
} else {
// Otherwise the worker or non-watched process joins.
// Stop thrift services/clients/and their thread pools.
osquery::Dispatcher::stopServices();
}
}
}
// Handle signals based on a tri-state (worker, watcher, neither).
if (signal == SIGHUP) {
if (!isWatcher() || hasWorkerVariable()) {
// Reload configuration.
}
} else if (signal == SIGTERM || signal == SIGINT || signal == SIGABRT) {
// Time to stop, set an upper bound time constraint on how long threads
// have to terminate (join). Publishers may be in 20ms or similar sleeps.
alarm(SIGNAL_ALARM_TIMEOUT);
// Restore the default signal handler.
std::signal(signal, SIG_DFL);
// The watcher waits for the worker to die.
if (isWatcher()) {
// Bind the fate of the worker to this watcher.
osquery::Watcher::bindFates();
} else {
// Otherwise the worker or non-watched process joins.
osquery::EventFactory::end(true);
// Re-raise the handled signal, which has since been restored to default.
raise(signal);
}
} else if (signal == SIGALRM) {
if (num == SIGALRM) {
// Restore the default signal handler for SIGALRM.
std::signal(SIGALRM, SIG_DFL);
// Took too long to stop.
VLOG(1) << "Cannot stop event publisher threads";
VLOG(1) << "Cannot stop event publisher threads or services";
raise((kHandledSignal != 0) ? kHandledSignal : SIGALRM);
}
@ -172,6 +179,11 @@ DECLARE_string(database_path);
ToolType kToolType = OSQUERY_TOOL_UNKNOWN;
volatile std::sig_atomic_t kExitCode{0};
/// The saved thread ID for shutdown to short-circuit raising a signal.
static std::thread::id kMainThreadId;
void printUsage(const std::string& binary, int tool) {
// Parse help options before gflags. Only display osquery-related options.
fprintf(stdout, DESCRIPTION, kVersion.c_str());
@ -207,6 +219,8 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
tool_(tool),
binary_((tool == OSQUERY_TOOL_DAEMON) ? "osqueryd" : "osqueryi") {
std::srand(chrono_clock::now().time_since_epoch().count());
// The 'main' thread is that which executes the initializer.
kMainThreadId = std::this_thread::get_id();
// Handled boost filesystem locale problems fixes in 1.56.
// See issue #1559 for the discussion and upstream boost patch.
@ -223,7 +237,7 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
help == "-h") &&
tool != OSQUERY_TOOL_TEST) {
printUsage(binary_, tool_);
::exit(EXIT_SUCCESS);
shutdown();
}
}
@ -280,6 +294,7 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
std::signal(SIGINT, signalHandler);
std::signal(SIGHUP, signalHandler);
std::signal(SIGALRM, signalHandler);
std::signal(SIGUSR1, signalHandler);
// If the caller is checking configuration, disable the watchdog/worker.
if (FLAGS_config_check) {
@ -309,7 +324,7 @@ void Initializer::initDaemon() const {
// OS X uses launchd to daemonize.
if (osquery::FLAGS_daemonize) {
if (daemon(0, 0) == -1) {
::exit(EXIT_FAILURE);
shutdown(EXIT_FAILURE);
}
}
#endif
@ -328,7 +343,7 @@ void Initializer::initDaemon() const {
auto pid_status = createPidFile();
if (!pid_status.ok()) {
LOG(ERROR) << binary_ << " initialize failed: " << pid_status.toString();
::exit(EXIT_FAILURE);
shutdown(EXIT_FAILURE);
}
// Nice ourselves if using a watchdog and the level is not too permissive.
@ -374,7 +389,7 @@ void Initializer::initWatcher() const {
} else {
retcode = EXIT_FAILURE;
}
osquery::shutdown(retcode, true);
requestShutdown(retcode);
}
}
@ -443,7 +458,7 @@ void Initializer::initActivePlugin(const std::string& type,
LOG(ERROR) << "Cannot activate " << name << " " << type
<< " plugin: " << status.getMessage();
osquery::shutdown(EXIT_CATASTROPHIC);
requestShutdown(EXIT_CATASTROPHIC);
}
void Initializer::start() const {
@ -469,7 +484,7 @@ void Initializer::start() const {
LOG(ERROR) << RLOG(1629) << binary_
<< " initialize failed: Could not initialize database";
auto retcode = (isWorker()) ? EXIT_CATASTROPHIC : EXIT_FAILURE;
::exit(retcode);
requestShutdown(retcode);
}
}
@ -491,12 +506,13 @@ void Initializer::start() const {
std::cerr << "Error reading config: " << s.toString() << "\n";
}
// A configuration check exits the application.
osquery::shutdown(s.getCode());
// Make sure to request a shutdown as plugins may have created services.
requestShutdown(s.getCode());
}
if (FLAGS_database_dump) {
dumpDatabase();
osquery::shutdown(EXIT_SUCCESS);
requestShutdown();
}
// Load the osquery config using the default/active config plugin.
@ -528,18 +544,32 @@ void Initializer::start() const {
EventFactory::delay();
}
void Initializer::shutdown() const { osquery::shutdown(EXIT_SUCCESS, false); }
void shutdown(int retcode, bool wait) {
// End any event type run loops.
EventFactory::end(wait);
// Stop thrift services/clients/and their thread pools.
Dispatcher::stopServices();
void Initializer::waitForShutdown() {
// Attempt to be the only place in code where a join is attempted.
Dispatcher::joinServices();
// End any event type run loops.
EventFactory::end(true);
// Hopefully release memory used by global string constructors in gflags.
GFLAGS_NAMESPACE::ShutDownCommandLineFlags();
DatabasePlugin::shutdown();
::exit(retcode);
::exit((kExitCode != 0) ? kExitCode : EXIT_SUCCESS);
}
void Initializer::requestShutdown(int retcode) {
// Stop thrift services/clients/and their thread pools.
kExitCode = retcode;
if (std::this_thread::get_id() != kMainThreadId) {
raise(SIGUSR1);
} else {
// The main thread is requesting a shutdown, meaning in almost every case
// it is NOT waiting for a shutdown.
// Exceptions include: tight request / wait in an exception handler or
// custom signal handling.
Dispatcher::stopServices();
waitForShutdown();
}
}
void Initializer::shutdown(int retcode) { ::exit(retcode); }
}

View File

@ -317,7 +317,8 @@ void WatcherRunner::createWorker() {
auto qd = SQL::selectAllFrom("processes", "pid", EQUALS, INTEGER(getpid()));
if (qd.size() != 1 || qd[0].count("path") == 0 || qd[0]["path"].size() == 0) {
LOG(ERROR) << "osquery watcher cannot determine process path for worker";
osquery::shutdown(EXIT_FAILURE);
Initializer::requestShutdown(EXIT_FAILURE);
return;
}
// Set an environment signaling to potential plugin-dependent workers to wait
@ -333,21 +334,24 @@ void WatcherRunner::createWorker() {
// osqueryd binary has become unsafe.
LOG(ERROR) << RLOG(1382)
<< "osqueryd has unsafe permissions: " << exec_path.string();
osquery::shutdown(EXIT_FAILURE);
Initializer::requestShutdown(EXIT_FAILURE);
return;
}
auto worker_pid = fork();
if (worker_pid < 0) {
// Unrecoverable error, cannot create a worker process.
LOG(ERROR) << "osqueryd could not create a worker process";
osquery::shutdown(EXIT_FAILURE);
Initializer::shutdown(EXIT_FAILURE);
return;
} else if (worker_pid == 0) {
// This is the new worker process, no watching needed.
setenv("OSQUERY_WORKER", std::to_string(getpid()).c_str(), 1);
execve(exec_path.string().c_str(), argv_, environ);
// Code should never reach this point.
LOG(ERROR) << "osqueryd could not start worker process";
osquery::shutdown(EXIT_CATASTROPHIC);
Initializer::shutdown(EXIT_CATASTROPHIC);
return;
}
Watcher::setWorker(worker_pid);
@ -381,7 +385,7 @@ bool WatcherRunner::createExtension(const std::string& extension) {
if (ext_pid < 0) {
// Unrecoverable error, cannot create an extension process.
LOG(ERROR) << "Cannot create extension process: " << extension;
osquery::shutdown(EXIT_FAILURE);
Initializer::shutdown(EXIT_FAILURE);
} else if (ext_pid == 0) {
// Pass the current extension socket and a set timeout to the extension.
setenv("OSQUERY_EXTENSION", std::to_string(getpid()).c_str(), 1);
@ -399,7 +403,7 @@ bool WatcherRunner::createExtension(const std::string& extension) {
environ);
// Code should never reach this point.
VLOG(1) << "Could not start extension process: " << extension;
osquery::shutdown(EXIT_FAILURE);
Initializer::shutdown(EXIT_FAILURE);
}
Watcher::setExtension(extension, ext_pid);
@ -416,7 +420,8 @@ void WatcherWatcherRunner::start() {
VLOG(1) << "osqueryd worker (" << getpid()
<< ") detected killed watcher (" << watcher_ << ")";
// The watcher watcher is a thread. Do not join services after removing.
raise(SIGKILL);
Initializer::requestShutdown();
break;
}
pauseMilli(getWorkerLimit(INTERVAL) * 1000);
}

View File

@ -670,7 +670,7 @@ static void output_csv(struct callback_data *p, const char *z, int bSep) {
*/
static void interrupt_handler(int signal) {
if (signal == SIGINT) {
osquery::shutdown(130);
osquery::Initializer::requestShutdown(130);
}
seenInterrupt = 1;
}

View File

@ -55,24 +55,29 @@ Status Dispatcher::addService(InternalRunnableRef service) {
auto& self = instance();
auto thread = std::make_shared<std::thread>(
std::bind(&InternalRunnable::run, &*service));
WriteLock lock(self.mutex_);
self.service_threads_.push_back(thread);
self.services_.push_back(std::move(service));
return Status(0, "OK");
}
void Dispatcher::joinServices() {
for (auto& thread : instance().service_threads_) {
auto& self = instance();
WriteLock join_lock(self.join_mutex_);
for (auto& thread : self.service_threads_) {
// Boost threads would have been interrupted, and joined using the
// provided thread instance.
thread->join();
}
instance().services_.clear();
instance().service_threads_.clear();
WriteLock lock(self.mutex_);
self.services_.clear();
self.service_threads_.clear();
}
void Dispatcher::stopServices() {
auto& self = instance();
WriteLock lock(self.mutex_);
for (const auto& service : self.services_) {
while (true) {
// Wait for each thread's entry point (start) meaning the thread context
@ -82,7 +87,7 @@ void Dispatcher::stopServices() {
}
// We only need to check if std::terminate is called very quickly after
// the std::thread is created.
::usleep(200);
::usleep(20);
}
service->interrupt();
}

View File

@ -179,6 +179,12 @@ class Dispatcher : private boost::noncopyable {
/// The set of shared osquery services.
std::vector<InternalRunnableRef> services_;
// Protection around service access.
mutable Mutex mutex_;
// Protection around double joins.
mutable Mutex join_mutex_;
private:
friend class ExtensionsTest;
};

View File

@ -136,16 +136,9 @@ void SchedulerRunner::start() {
}
}
Status startScheduler() {
if (startScheduler(FLAGS_schedule_timeout, 1).ok()) {
Dispatcher::joinServices();
return Status(0, "OK");
}
return Status(1, "Could not start scheduler");
}
void startScheduler() { startScheduler(FLAGS_schedule_timeout, 1); }
Status startScheduler(unsigned long int timeout, size_t interval) {
void startScheduler(unsigned long int timeout, size_t interval) {
Dispatcher::addService(std::make_shared<SchedulerRunner>(timeout, interval));
return Status(0, "OK");
}
}

View File

@ -41,8 +41,8 @@ class SchedulerRunner : public InternalRunnable {
};
/// Start querying according to the config's schedule
Status startScheduler();
void startScheduler();
/// Helper scheduler start with variable settings for testing.
Status startScheduler(unsigned long int timeout, size_t interval);
void startScheduler(unsigned long int timeout, size_t interval);
}

View File

@ -21,15 +21,10 @@ REGISTER(SCNetworkEventPublisher, "event_publisher", "scnetwork");
void SCNetworkEventPublisher::tearDown() {
stop();
for (auto target : targets_) {
CFRelease(target);
}
targets_.clear();
for (auto context : contexts_) {
CFRelease(context);
}
contexts_.clear();
WriteLock lock(mutex_);
clearAll();
run_loop_ = nullptr;
}
void SCNetworkEventPublisher::Callback(const SCNetworkReachabilityRef target,
@ -91,6 +86,21 @@ void SCNetworkEventPublisher::addAddress(
addTarget(sc, target);
}
void SCNetworkEventPublisher::clearAll() {
for (auto& target : targets_) {
CFRelease(target);
}
targets_.clear();
for (auto& context : contexts_) {
delete context;
}
contexts_.clear();
target_names_.clear();
target_addresses_.clear();
}
void SCNetworkEventPublisher::configure() {
// Must stop before clearing contexts.
stop();
@ -98,10 +108,7 @@ void SCNetworkEventPublisher::configure() {
{
WriteLock lock(mutex_);
// Clear all targets.
targets_.clear();
contexts_.clear();
target_names_.clear();
target_addresses_.clear();
clearAll();
for (const auto& sub : subscriptions_) {
auto sc = getSubscriptionContext(sub->context);

View File

@ -61,6 +61,7 @@ class SCNetworkEventPublisher
public:
void configure() override;
Status setUp() override { return Status(1, "Publisher not used"); }
void tearDown() override;
// Entrypoint to the run loop
@ -89,6 +90,9 @@ class SCNetworkEventPublisher
void addTarget(const SCNetworkSubscriptionContextRef& sc,
const SCNetworkReachabilityRef& target);
/// Helper method to clear all targets.
void clearAll();
private:
/// Configured hostname targets.
std::vector<std::string> target_names_;

View File

@ -599,10 +599,12 @@ Status EventFactory::run(EventPublisherID& type_id) {
publisher->restart_count_++;
osquery::publisherSleep(EVENTS_COOLOFF);
}
// The runloop status is not reflective of the event type's.
VLOG(1) << "Event publisher " << publisher->type()
<< " run loop terminated for reason: " << status.getMessage();
// Publishers auto tear down when their run loop stops.
if (!status.ok()) {
// The runloop status is not reflective of the event type's.
VLOG(1) << "Event publisher " << publisher->type()
<< " run loop terminated for reason: " << status.getMessage();
// Publishers auto tear down when their run loop stops.
}
publisher->tearDown();
// Do not remove the publisher from the event factory.

View File

@ -92,9 +92,10 @@ int main(int argc, char* argv[]) {
auto status = startExtension("example", "0.0.1");
if (!status.ok()) {
LOG(ERROR) << status.getMessage();
runner.requestShutdown(status.getCode());
}
// Finally shutdown.
runner.shutdown();
// Finally wait for a signal / interrupt to shutdown.
runner.waitForShutdown();
return 0;
}

View File

@ -12,6 +12,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <osquery/core.h>
#include <osquery/filesystem.h>
#include <osquery/logger.h>
#include <osquery/registry.h>
@ -89,7 +90,9 @@ void ExtensionWatcher::start() {
void ExtensionWatcher::exitFatal(int return_code) {
// Exit the extension.
::exit(return_code);
// We will save the wanted return code and raise an interrupt.
// This interrupt will be handled by the main thread then join the watchers.
Initializer::requestShutdown(return_code);
}
void ExtensionWatcher::watch() {
@ -316,18 +319,7 @@ Status startExtension(const std::string& name,
// If the extension failed to start then the EM is most likely unavailable.
return status;
}
try {
// The extension does nothing but serve the thrift API.
// Join on both the thrift and extension manager watcher services.
Dispatcher::joinServices();
} catch (const std::exception& e) {
// The extension manager may shutdown without notifying the extension.
return Status(0, e.what());
}
// An extension will only return on failure.
return Status(0, "Extension was shutdown");
return Status(0);
}
Status startExtension(const std::string& manager_path,

View File

@ -38,13 +38,9 @@ int main(int argc, char* argv[]) {
}
// Begin the schedule runloop.
s = osquery::startScheduler();
if (!s.ok()) {
LOG(ERROR) << "Error starting scheduler: " << s.toString();
}
// Finally shutdown.
runner.shutdown();
osquery::startScheduler();
// Finally wait for a signal / interrupt to shutdown.
runner.waitForShutdown();
return 0;
}

View File

@ -99,6 +99,6 @@ int main(int argc, char *argv[]) {
}
// Finally shutdown.
runner.shutdown();
runner.requestShutdown();
return retcode;
}

View File

@ -420,8 +420,7 @@ class TimeoutRunner(object):
def __init__(self, cmd=[], timeout_sec=1):
self.stdout = None
self.stderr = None
self.proc = subprocess.Popen(" ".join(cmd),
shell=True,
self.proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
kill_proc = lambda p: p.kill()

View File

@ -93,9 +93,7 @@ class DaemonTests(test_base.ProcessGenerator, unittest.TestCase):
# Send a SIGINT
os.kill(daemon.pid, signal.SIGINT)
self.assertTrue(daemon.isDead(daemon.pid, 10))
acceptable_retcodes = [-1, -2, -1 * signal.SIGINT]
self.assertTrue(daemon.retcode in acceptable_retcodes)
self.assertEqual(daemon.retcode, 128 + signal.SIGINT)
@test_base.flaky
def test_6_logger_mode(self):

View File

@ -21,7 +21,7 @@ import unittest
import test_base
SHELL_TIMEOUT = 10
EXIT_CATASTROPHIC = 78
class OsqueryiTest(unittest.TestCase):
def setUp(self):
@ -109,8 +109,7 @@ class OsqueryiTest(unittest.TestCase):
self.assertNotEqual(proc.stderr, "")
self.assertNotEqual(proc.proc.poll(), 0)
# Also do not accept a SIGSEG
# It should exit EX_CONFIG = 78
self.assertEqual(proc.proc.poll(), 78)
self.assertEqual(proc.proc.poll(), EXIT_CATASTROPHIC)
@test_base.flaky
def test_config_check_example(self):