Process Operations - osquery/core Integration (#2087)

This integrates the process abstraction operations within osquery core.
This commit is contained in:
yying 2016-05-13 22:47:45 -04:00 committed by Teddy Reed
parent 484cf9c919
commit 15d1455957
9 changed files with 190 additions and 133 deletions

View File

@ -14,9 +14,12 @@
#include <thread> #include <thread>
#include <stdio.h> #include <stdio.h>
#include <syslog.h>
#include <time.h> #include <time.h>
#ifndef WIN32
#include <syslog.h>
#include <unistd.h> #include <unistd.h>
#endif
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
@ -31,6 +34,7 @@
#include <osquery/registry.h> #include <osquery/registry.h>
#include "osquery/core/watcher.h" #include "osquery/core/watcher.h"
#include "osquery/core/process.h"
#if defined(__linux__) || defined(__FreeBSD__) #if defined(__linux__) || defined(__FreeBSD__)
#include <sys/resource.h> #include <sys/resource.h>
@ -97,13 +101,16 @@ enum {
namespace { namespace {
extern "C" { extern "C" {
static inline bool hasWorkerVariable() { static inline bool hasWorkerVariable() {
return (getenv("OSQUERY_WORKER") != nullptr); return ::osquery::getEnvVar("OSQUERY_WORKER").is_initialized();
} }
volatile std::sig_atomic_t kHandledSignal{0}; volatile std::sig_atomic_t kHandledSignal{0};
static inline bool isWatcher() { return (osquery::Watcher::getWorker() > 0); } static inline bool isWatcher() {
return (osquery::Watcher::getWorker().isValid());
}
#ifndef WIN32
void signalHandler(int num) { void signalHandler(int num) {
// Inform exit status of main threads blocked by service joins. // Inform exit status of main threads blocked by service joins.
if (kHandledSignal == 0) { if (kHandledSignal == 0) {
@ -157,6 +164,7 @@ void signalHandler(int num) {
// managed extension processes. // managed extension processes.
} }
} }
#endif
} }
} }
@ -164,7 +172,7 @@ namespace osquery {
using chrono_clock = std::chrono::high_resolution_clock; using chrono_clock = std::chrono::high_resolution_clock;
#ifndef __APPLE__ #if !defined(__APPLE__) && !defined(WIN32)
CLI_FLAG(bool, daemonize, false, "Run as daemon (osqueryd only)"); CLI_FLAG(bool, daemonize, false, "Run as daemon (osqueryd only)");
#endif #endif
@ -226,7 +234,11 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
try { try {
boost::filesystem::path::codecvt(); boost::filesystem::path::codecvt();
} catch (const std::runtime_error& e) { } catch (const std::runtime_error& e) {
#ifdef WIN32
setlocale(LC_ALL, "C");
#else
setenv("LC_ALL", "C", 1); setenv("LC_ALL", "C", 1);
#endif
} }
// osquery implements a custom help/usage output. // osquery implements a custom help/usage output.
@ -285,6 +297,7 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
} }
} }
#ifndef WIN32
// All tools handle the same set of signals. // All tools handle the same set of signals.
// If a daemon process is a watchdog the signal is passed to the worker, // If a daemon process is a watchdog the signal is passed to the worker,
// unless the worker has not yet started. // unless the worker has not yet started.
@ -294,6 +307,7 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
std::signal(SIGHUP, signalHandler); std::signal(SIGHUP, signalHandler);
std::signal(SIGALRM, signalHandler); std::signal(SIGALRM, signalHandler);
std::signal(SIGUSR1, signalHandler); std::signal(SIGUSR1, signalHandler);
#endif
// If the caller is checking configuration, disable the watchdog/worker. // If the caller is checking configuration, disable the watchdog/worker.
if (FLAGS_config_check) { if (FLAGS_config_check) {
@ -304,7 +318,8 @@ Initializer::Initializer(int& argc, char**& argv, ToolType tool)
initStatusLogger(binary_); initStatusLogger(binary_);
if (tool != OSQUERY_EXTENSION) { if (tool != OSQUERY_EXTENSION) {
if (isWorker()) { if (isWorker()) {
VLOG(1) << "osquery worker initialized [watcher=" << getppid() << "]"; VLOG(1) << "osquery worker initialized [watcher="
<< PlatformProcess::getLauncherProcess()->pid() << "]";
} else { } else {
VLOG(1) << "osquery initialized [version=" << kVersion << "]"; VLOG(1) << "osquery initialized [version=" << kVersion << "]";
} }
@ -319,7 +334,7 @@ void Initializer::initDaemon() const {
return; return;
} }
#ifndef __APPLE__ #if !defined(__APPLE__) && !defined(WIN32)
// OS X uses launchd to daemonize. // OS X uses launchd to daemonize.
if (osquery::FLAGS_daemonize) { if (osquery::FLAGS_daemonize) {
if (daemon(0, 0) == -1) { if (daemon(0, 0) == -1) {
@ -328,9 +343,11 @@ void Initializer::initDaemon() const {
} }
#endif #endif
#ifndef WIN32
// Print the version to SYSLOG. // Print the version to SYSLOG.
syslog( syslog(
LOG_NOTICE, "%s started [version=%s]", binary_.c_str(), kVersion.c_str()); LOG_NOTICE, "%s started [version=%s]", binary_.c_str(), kVersion.c_str());
#endif
// Check if /var/osquery exists // Check if /var/osquery exists
if ((Flag::isDefault("pidfile") || Flag::isDefault("database_path")) && if ((Flag::isDefault("pidfile") || Flag::isDefault("database_path")) &&
@ -350,7 +367,8 @@ void Initializer::initDaemon() const {
FLAGS_watchdog_level >= WATCHDOG_LEVEL_DEFAULT && FLAGS_watchdog_level >= WATCHDOG_LEVEL_DEFAULT &&
FLAGS_watchdog_level != WATCHDOG_LEVEL_DEBUG) { FLAGS_watchdog_level != WATCHDOG_LEVEL_DEBUG) {
// Set CPU scheduling I/O limits. // Set CPU scheduling I/O limits.
setpriority(PRIO_PGRP, 0, 10); setToBackgroundPriority();
#ifdef __linux__ #ifdef __linux__
// Using: ioprio_set(IOPRIO_WHO_PGRP, 0, IOPRIO_CLASS_IDLE); // Using: ioprio_set(IOPRIO_WHO_PGRP, 0, IOPRIO_CLASS_IDLE);
syscall(SYS_ioprio_set, IOPRIO_WHO_PGRP, 0, IOPRIO_CLASS_IDLE); syscall(SYS_ioprio_set, IOPRIO_WHO_PGRP, 0, IOPRIO_CLASS_IDLE);
@ -413,7 +431,8 @@ void Initializer::initWorker(const std::string& name) const {
// Start a 'watcher watcher' thread to exit the process if the watcher exits. // Start a 'watcher watcher' thread to exit the process if the watcher exits.
// In this case the parent process is called the 'watcher' process. // In this case the parent process is called the 'watcher' process.
Dispatcher::addService(std::make_shared<WatcherWatcherRunner>(getppid())); Dispatcher::addService(std::make_shared<WatcherWatcherRunner>(
PlatformProcess::getLauncherProcess()));
} }
void Initializer::initWorkerWatcher(const std::string& name) const { void Initializer::initWorkerWatcher(const std::string& name) const {
@ -452,7 +471,7 @@ void Initializer::initActivePlugin(const std::string& type,
} }
// The plugin is not local and is not active, wait and retry. // The plugin is not local and is not active, wait and retry.
delay += kExtensionInitializeLatencyUS; delay += kExtensionInitializeLatencyUS;
::usleep(kExtensionInitializeLatencyUS); sleepFor(kExtensionInitializeLatencyUS);
} while (delay < timeout); } while (delay < timeout);
LOG(ERROR) << "Cannot activate " << name << " " << type LOG(ERROR) << "Cannot activate " << name << " " << type
@ -531,12 +550,14 @@ void Initializer::start() const {
} }
initLogger(binary_); initLogger(binary_);
#ifndef WIN32
// Initialize the distributed plugin, if necessary // Initialize the distributed plugin, if necessary
if (!FLAGS_disable_distributed) { if (!FLAGS_disable_distributed) {
if (Registry::exists("distributed", FLAGS_distributed_plugin)) { if (Registry::exists("distributed", FLAGS_distributed_plugin)) {
initActivePlugin("distributed", FLAGS_distributed_plugin); initActivePlugin("distributed", FLAGS_distributed_plugin);
} }
} }
#endif
// Start event threads. // Start event threads.
osquery::attachEvents(); osquery::attachEvents();
@ -559,7 +580,9 @@ void Initializer::requestShutdown(int retcode) {
// Stop thrift services/clients/and their thread pools. // Stop thrift services/clients/and their thread pools.
kExitCode = retcode; kExitCode = retcode;
if (std::this_thread::get_id() != kMainThreadId) { if (std::this_thread::get_id() != kMainThreadId) {
#ifndef WIN32
raise(SIGUSR1); raise(SIGUSR1);
#endif
} else { } else {
// The main thread is requesting a shutdown, meaning in almost every case // The main thread is requesting a shutdown, meaning in almost every case
// it is NOT waiting for a shutdown. // it is NOT waiting for a shutdown.
@ -572,3 +595,4 @@ void Initializer::requestShutdown(int retcode) {
void Initializer::shutdown(int retcode) { ::exit(retcode); } void Initializer::shutdown(int retcode) { ::exit(retcode); }
} }

View File

@ -56,13 +56,13 @@ std::shared_ptr<PlatformProcess> PlatformProcess::getLauncherProcess() {
} }
std::shared_ptr<PlatformProcess> PlatformProcess::launchWorker( std::shared_ptr<PlatformProcess> PlatformProcess::launchWorker(
const std::string& exec_path, const std::string& name) { const std::string& exec_path, int argc, char** argv) {
auto worker_pid = ::fork(); auto worker_pid = ::fork();
if (worker_pid < 0) { if (worker_pid < 0) {
return std::shared_ptr<PlatformProcess>(); return std::shared_ptr<PlatformProcess>();
} else if (worker_pid == 0) { } else if (worker_pid == 0) {
setEnvVar("OSQUERY_WORKER", std::to_string(::getpid()).c_str()); setEnvVar("OSQUERY_WORKER", std::to_string(::getpid()).c_str());
::execle(exec_path.c_str(), name.c_str(), nullptr, ::environ); ::execve(exec_path.c_str(), argv, ::environ);
// Code should never reach this point // Code should never reach this point
LOG(ERROR) << "osqueryd could not start worker process"; LOG(ERROR) << "osqueryd could not start worker process";

View File

@ -27,6 +27,7 @@
namespace osquery { namespace osquery {
#ifdef WIN32 #ifdef WIN32
/// Unfortunately, pid_t is not defined in Windows, however, DWORD is the /// Unfortunately, pid_t is not defined in Windows, however, DWORD is the
/// most appropriate alternative since process ID on Windows are stored in /// most appropriate alternative since process ID on Windows are stored in
/// a DWORD. /// a DWORD.
@ -57,8 +58,7 @@ enum ProcessState {
* @brief Platform-agnostic process object. * @brief Platform-agnostic process object.
* *
* PlatformProcess is a specialized, platform-agnostic class that handles the * PlatformProcess is a specialized, platform-agnostic class that handles the
* process operation needs * process operation needs of osquery.
* of osquery.
*/ */
class PlatformProcess : private boost::noncopyable { class PlatformProcess : private boost::noncopyable {
public: public:
@ -101,11 +101,12 @@ class PlatformProcess : private boost::noncopyable {
/** /**
* @brief Creates a new worker process. * @brief Creates a new worker process.
* *
* Launches a worker process given a worker executable path and a worker name. * Launches a worker process given a worker executable path, number of
* Any double quotes in the worker name will be stripped out. * arguments, and an array of arguments. All double quotes within each entry
* in the array of arguments will be supplanted with a preceding blackslash.
*/ */
static std::shared_ptr<PlatformProcess> launchWorker( static std::shared_ptr<PlatformProcess> launchWorker(
const std::string& exec_path, const std::string& name); const std::string& exec_path, int argc, char** argv);
/** /**
* @brief Creates a new extension process. * @brief Creates a new extension process.

View File

@ -34,8 +34,10 @@ extern const char *kOsqueryTestModuleName;
/// These are the expected arguments for our test worker process. /// These are the expected arguments for our test worker process.
extern const char *kExpectedWorkerArgs[]; extern const char *kExpectedWorkerArgs[];
extern const size_t kExpectedWorkerArgsCount;
/// These are the expected arguments for our test extensions process. /// These are the expected arguments for our test extensions process.
extern const char *kExpectedExtensionArgs[]; extern const char *kExpectedExtensionArgs[];
extern const size_t kExpectedExtensionArgsCount;
} }

View File

@ -105,16 +105,20 @@ TEST_F(ProcessTests, test_getpid) {
TEST_F(ProcessTests, test_envVar) { TEST_F(ProcessTests, test_envVar) {
auto val = getEnvVar("GTEST_OSQUERY"); auto val = getEnvVar("GTEST_OSQUERY");
EXPECT_FALSE(val); EXPECT_FALSE(val);
EXPECT_FALSE(val.is_initialized());
EXPECT_TRUE(setEnvVar("GTEST_OSQUERY", "true")); EXPECT_TRUE(setEnvVar("GTEST_OSQUERY", "true"));
val = getEnvVar("GTEST_OSQUERY"); val = getEnvVar("GTEST_OSQUERY");
EXPECT_FALSE(!val);
EXPECT_TRUE(val.is_initialized());
EXPECT_EQ(*val, "true"); EXPECT_EQ(*val, "true");
EXPECT_TRUE(unsetEnvVar("GTEST_OSQUERY")); EXPECT_TRUE(unsetEnvVar("GTEST_OSQUERY"));
val = getEnvVar("GTEST_OSQUERY"); val = getEnvVar("GTEST_OSQUERY");
EXPECT_FALSE(val); EXPECT_FALSE(val);
EXPECT_FALSE(val.is_initialized());
} }
TEST_F(ProcessTests, test_launchExtension) { TEST_F(ProcessTests, test_launchExtension) {
@ -136,9 +140,23 @@ TEST_F(ProcessTests, test_launchExtension) {
TEST_F(ProcessTests, test_launchWorker) { TEST_F(ProcessTests, test_launchWorker) {
{ {
std::vector<char *> argv;
for (size_t i = 0; i < kExpectedWorkerArgsCount; i++) {
char *entry = new char[strlen(kExpectedWorkerArgs[i]) + 1];
EXPECT_NE(entry, nullptr);
memset(entry, '\0', strlen(kExpectedWorkerArgs[i]) + 1);
memcpy(entry, kExpectedWorkerArgs[i], strlen(kExpectedWorkerArgs[i]));
argv.push_back(entry);
}
argv.push_back(nullptr);
std::shared_ptr<osquery::PlatformProcess> process = std::shared_ptr<osquery::PlatformProcess> process =
osquery::PlatformProcess::launchWorker(kProcessTestExecPath.c_str(), osquery::PlatformProcess::launchWorker(
kExpectedWorkerArgs[0]); kProcessTestExecPath.c_str(), kExpectedWorkerArgsCount, &argv[0]);
for (size_t i = 0; i < argv.size(); i++) {
delete argv[i];
}
EXPECT_TRUE(process.get()); EXPECT_TRUE(process.get());
int code = 0; int code = 0;
@ -164,19 +182,6 @@ TEST_F(ProcessTests, test_launchExtensionQuotes) {
EXPECT_EQ(code, EXTENSION_SUCCESS_CODE); EXPECT_EQ(code, EXTENSION_SUCCESS_CODE);
} }
} }
TEST_F(ProcessTests, test_launchWorkerQuotes) {
{
std::shared_ptr<osquery::PlatformProcess> process =
osquery::PlatformProcess::launchWorker(kProcessTestExecPath.c_str(),
"worker\"-test");
EXPECT_TRUE(process.get());
int code = 0;
EXPECT_TRUE(getProcessExitCode(*process, code));
EXPECT_EQ(code, WORKER_SUCCESS_CODE);
}
}
#endif #endif
} }

View File

@ -12,7 +12,10 @@
#include <math.h> #include <math.h>
#include <signal.h> #include <signal.h>
#ifndef WIN32
#include <sys/wait.h> #include <sys/wait.h>
#endif
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
@ -21,6 +24,7 @@
#include <osquery/sql.h> #include <osquery/sql.h>
#include "osquery/core/watcher.h" #include "osquery/core/watcher.h"
#include "osquery/core/process.h"
extern char** environ; extern char** environ;
@ -70,9 +74,9 @@ void Watcher::resetExtensionCounters(const std::string& extension,
state.last_respawn_time = respawn_time; state.last_respawn_time = respawn_time;
} }
std::string Watcher::getExtensionPath(pid_t child) { std::string Watcher::getExtensionPath(const PlatformProcess& child) {
for (const auto& extension : extensions()) { for (const auto& extension : extensions()) {
if (extension.second == child) { if (*extension.second == child) {
return extension.first; return extension.first;
} }
} }
@ -85,8 +89,8 @@ void Watcher::removeExtensionPath(const std::string& extension) {
instance().extension_states_.erase(extension); instance().extension_states_.erase(extension);
} }
PerformanceState& Watcher::getState(pid_t child) { PerformanceState& Watcher::getState(const PlatformProcess& child) {
if (child == instance().worker_) { if (child == *instance().worker_) {
return instance().state_; return instance().state_;
} else { } else {
return instance().extension_states_[getExtensionPath(child)]; return instance().extension_states_[getExtensionPath(child)];
@ -97,13 +101,14 @@ PerformanceState& Watcher::getState(const std::string& extension) {
return instance().extension_states_[extension]; return instance().extension_states_[extension];
} }
void Watcher::setExtension(const std::string& extension, pid_t child) { void Watcher::setExtension(const std::string& extension,
const std::shared_ptr<PlatformProcess>& child) {
WatcherLocker locker; WatcherLocker locker;
instance().extensions_[extension] = child; instance().extensions_[extension] = child;
} }
void Watcher::reset(pid_t child) { void Watcher::reset(const PlatformProcess& child) {
if (child == instance().worker_) { if (child == *instance().worker_) {
instance().worker_ = 0; instance().worker_ = 0;
resetWorkerCounters(0); resetWorkerCounters(0);
return; return;
@ -111,15 +116,15 @@ void Watcher::reset(pid_t child) {
// If it was not the worker pid then find the extension name to reset. // If it was not the worker pid then find the extension name to reset.
for (const auto& extension : extensions()) { for (const auto& extension : extensions()) {
if (extension.second == child) { if (*extension.second == child) {
setExtension(extension.first, 0); setExtension(extension.first, std::make_shared<PlatformProcess>());
resetExtensionCounters(extension.first, 0); resetExtensionCounters(extension.first, 0);
} }
} }
} }
void Watcher::addExtensionPath(const std::string& path) { void Watcher::addExtensionPath(const std::string& path) {
setExtension(path, 0); setExtension(path, std::make_shared<PlatformProcess>());
resetExtensionCounters(path, 0); resetExtensionCounters(path, 0);
} }
@ -132,7 +137,7 @@ bool Watcher::hasManagedExtensions() {
// Setting this counter to 0 will prevent the worker from waiting for missing // Setting this counter to 0 will prevent the worker from waiting for missing
// dependent config plugins. Otherwise, its existence, will cause a worker to // dependent config plugins. Otherwise, its existence, will cause a worker to
// wait for missing plugins to broadcast from managed extensions. // wait for missing plugins to broadcast from managed extensions.
return (getenv("OSQUERY_EXTENSIONS") != nullptr); return getEnvVar("OSQUERY_EXTENSIONS").is_initialized();
} }
bool WatcherRunner::ok() { bool WatcherRunner::ok() {
@ -142,7 +147,7 @@ bool WatcherRunner::ok() {
return false; return false;
} }
// Watcher is OK to run if a worker or at least one extension exists. // Watcher is OK to run if a worker or at least one extension exists.
return (Watcher::getWorker() >= 0 || Watcher::hasManagedExtensions()); return (Watcher::getWorker().isValid() || Watcher::hasManagedExtensions());
} }
void WatcherRunner::start() { void WatcherRunner::start() {
@ -163,7 +168,7 @@ void WatcherRunner::start() {
// Loop over every managed extension and check sanity. // Loop over every managed extension and check sanity.
std::vector<std::string> failing_extensions; std::vector<std::string> failing_extensions;
for (const auto& extension : Watcher::extensions()) { for (const auto& extension : Watcher::extensions()) {
if (!watch(extension.second)) { if (!watch(*extension.second)) {
if (!createExtension(extension.first)) { if (!createExtension(extension.first)) {
failing_extensions.push_back(extension.first); failing_extensions.push_back(extension.first);
} }
@ -177,18 +182,19 @@ void WatcherRunner::start() {
} while (!interrupted() && ok()); } while (!interrupted() && ok());
} }
bool WatcherRunner::watch(pid_t child) { bool WatcherRunner::watch(const PlatformProcess& child) const {
int status = 0; int status = 0;
pid_t result = waitpid(child, &status, WNOHANG);
ProcessState result = checkChildProcessStatus(child, status);
if (Watcher::fatesBound()) { if (Watcher::fatesBound()) {
// A signal was handled while the watcher was watching. // A signal was handled while the watcher was watching.
return false; return false;
} }
if (child == 0 || result < 0) { if (!child.isValid() || result == PROCESS_ERROR) {
// Worker does not exist or never existed. // Worker does not exist or never existed.
return false; return false;
} else if (result == 0) { } else if (result == PROCESS_STILL_ALIVE) {
// If the inspect finds problems it will stop/restart the worker. // If the inspect finds problems it will stop/restart the worker.
if (!isChildSane(child)) { if (!isChildSane(child)) {
stopChild(child); stopChild(child);
@ -197,22 +203,24 @@ bool WatcherRunner::watch(pid_t child) {
return true; return true;
} }
if (WIFEXITED(status)) { if (result == PROCESS_EXITED) {
// If the worker process existed, store the exit code. // If the worker process existed, store the exit code.
Watcher::instance().worker_status_ = WEXITSTATUS(status); Watcher::instance().worker_status_ = status;
} }
return true; return true;
} }
void WatcherRunner::stopChild(pid_t child) { void WatcherRunner::stopChild(const PlatformProcess& child) const {
kill(child, SIGKILL); child.kill();
// Clean up the defunct (zombie) process. // Clean up the defunct (zombie) process.
waitpid(-1, 0, WNOHANG); cleanupDefunctProcesses();
} }
bool WatcherRunner::isChildSane(pid_t child) { bool WatcherRunner::isChildSane(const PlatformProcess& child) const {
auto rows = SQL::selectAllFrom("processes", "pid", EQUALS, INTEGER(child)); auto rows =
SQL::selectAllFrom("processes", "pid", EQUALS, INTEGER(child.pid()));
if (rows.size() == 0) { if (rows.size() == 0) {
// Could not find worker process? // Could not find worker process?
return false; return false;
@ -270,7 +278,7 @@ bool WatcherRunner::isChildSane(pid_t child) {
// Only make a decision about the child sanity if it is still the watcher's // Only make a decision about the child sanity if it is still the watcher's
// child. It's possible for the child to die, and its pid reused. // child. It's possible for the child to die, and its pid reused.
if (parent != getpid()) { if (parent != PlatformProcess::getCurrentProcess()->pid()) {
// The child's parent is not the watcher. // The child's parent is not the watcher.
Watcher::reset(child); Watcher::reset(child);
// Do not stop or call the child insane, since it is not our child. // Do not stop or call the child insane, since it is not our child.
@ -279,13 +287,13 @@ bool WatcherRunner::isChildSane(pid_t child) {
if (sustained_latency > 0 && if (sustained_latency > 0 &&
sustained_latency * iv >= getWorkerLimit(LATENCY_LIMIT)) { sustained_latency * iv >= getWorkerLimit(LATENCY_LIMIT)) {
LOG(WARNING) << "osqueryd worker (" << child LOG(WARNING) << "osqueryd worker (" << child.pid()
<< ") system performance limits exceeded"; << ") system performance limits exceeded";
return false; return false;
} }
// Check if the private memory exceeds a memory limit. // Check if the private memory exceeds a memory limit.
if (footprint > 0 && footprint > getWorkerLimit(MEMORY_LIMIT) * 1024 * 1024) { if (footprint > 0 && footprint > getWorkerLimit(MEMORY_LIMIT) * 1024 * 1024) {
LOG(WARNING) << "osqueryd worker (" << child LOG(WARNING) << "osqueryd worker (" << child.pid()
<< ") memory limits exceeded: " << footprint; << ") memory limits exceeded: " << footprint;
return false; return false;
} }
@ -295,6 +303,7 @@ bool WatcherRunner::isChildSane(pid_t child) {
if (use_worker_) { if (use_worker_) {
relayStatusLogs(); relayStatusLogs();
} }
return true; return true;
} }
@ -315,7 +324,11 @@ void WatcherRunner::createWorker() {
} }
// Get the path of the current process. // Get the path of the current process.
auto qd = SQL::selectAllFrom("processes", "pid", EQUALS, INTEGER(getpid())); auto qd =
SQL::selectAllFrom("processes",
"pid",
EQUALS,
INTEGER(PlatformProcess::getCurrentProcess()->pid()));
if (qd.size() != 1 || qd[0].count("path") == 0 || qd[0]["path"].size() == 0) { if (qd.size() != 1 || qd[0].count("path") == 0 || qd[0]["path"].size() == 0) {
LOG(ERROR) << "osquery watcher cannot determine process path for worker"; LOG(ERROR) << "osquery watcher cannot determine process path for worker";
Initializer::requestShutdown(EXIT_FAILURE); Initializer::requestShutdown(EXIT_FAILURE);
@ -325,7 +338,7 @@ void WatcherRunner::createWorker() {
// Set an environment signaling to potential plugin-dependent workers to wait // Set an environment signaling to potential plugin-dependent workers to wait
// for extensions to broadcast. // for extensions to broadcast.
if (Watcher::hasManagedExtensions()) { if (Watcher::hasManagedExtensions()) {
setenv("OSQUERY_EXTENSIONS", "true", 1); setEnvVar("OSQUERY_EXTENSIONS", "true");
} }
// Get the complete path of the osquery process binary. // Get the complete path of the osquery process binary.
@ -339,26 +352,18 @@ void WatcherRunner::createWorker() {
return; return;
} }
auto worker_pid = fork(); auto worker = PlatformProcess::launchWorker(exec_path.string(), argc_, argv_);
if (worker_pid < 0) { if (worker == nullptr) {
// Unrecoverable error, cannot create a worker process. // Unrecoverable error, cannot create a worker process.
LOG(ERROR) << "osqueryd could not create a worker process"; LOG(ERROR) << "osqueryd could not create a worker process";
Initializer::shutdown(EXIT_FAILURE); Initializer::shutdown(EXIT_FAILURE);
return; return;
} else if (worker_pid == 0) {
// This is the new worker process, no watching needed.
setenv("OSQUERY_WORKER", std::to_string(getpid()).c_str(), 1);
execve(exec_path.string().c_str(), argv_, environ);
// Code should never reach this point.
LOG(ERROR) << "osqueryd could not start worker process";
Initializer::shutdown(EXIT_CATASTROPHIC);
return;
} }
Watcher::setWorker(worker_pid); Watcher::setWorker(worker);
Watcher::resetWorkerCounters(getUnixTime()); Watcher::resetWorkerCounters(getUnixTime());
VLOG(1) << "osqueryd watcher (" << getpid() << ") executing worker (" VLOG(1) << "osqueryd watcher (" << PlatformProcess::getCurrentProcess()->pid()
<< worker_pid << ")"; << ") executing worker (" << worker->pid() << ")";
} }
bool WatcherRunner::createExtension(const std::string& extension) { bool WatcherRunner::createExtension(const std::string& extension) {
@ -382,44 +387,34 @@ bool WatcherRunner::createExtension(const std::string& extension) {
return false; return false;
} }
auto ext_pid = fork(); auto ext_process =
if (ext_pid < 0) { PlatformProcess::launchExtension(exec_path.string(),
extension,
Flag::getValue("extensions_socket"),
Flag::getValue("extensions_timeout"),
Flag::getValue("extensions_interval"),
Flag::getValue("verbose"));
if (ext_process == nullptr) {
// Unrecoverable error, cannot create an extension process. // Unrecoverable error, cannot create an extension process.
LOG(ERROR) << "Cannot create extension process: " << extension; LOG(ERROR) << "Cannot create extension process: " << extension;
Initializer::shutdown(EXIT_FAILURE); Initializer::shutdown(EXIT_FAILURE);
} else if (ext_pid == 0) {
// Pass the current extension socket and a set timeout to the extension.
setenv("OSQUERY_EXTENSION", std::to_string(getpid()).c_str(), 1);
// Execute extension with very specific arguments.
execle(exec_path.string().c_str(),
("osquery extension: " + extension).c_str(),
"--socket",
Flag::getValue("extensions_socket").c_str(),
"--timeout",
Flag::getValue("extensions_timeout").c_str(),
"--interval",
Flag::getValue("extensions_interval").c_str(),
(Flag::getValue("verbose") == "true") ? "--verbose" : (char*)nullptr,
(char*)nullptr,
environ);
// Code should never reach this point.
VLOG(1) << "Could not start extension process: " << extension;
Initializer::shutdown(EXIT_FAILURE);
} }
Watcher::setExtension(extension, ext_pid); Watcher::setExtension(extension, ext_process);
Watcher::resetExtensionCounters(extension, getUnixTime()); Watcher::resetExtensionCounters(extension, getUnixTime());
VLOG(1) << "Created and monitoring extension child (" << ext_pid VLOG(1) << "Created and monitoring extension child (" << ext_process->pid()
<< "): " << extension; << "): " << extension;
return true; return true;
} }
void WatcherWatcherRunner::start() { void WatcherWatcherRunner::start() {
while (!interrupted()) { while (!interrupted()) {
if (getppid() != watcher_) { if (isLauncherProcessDead(*watcher_)) {
// Watcher died, the worker must follow. // Watcher died, the worker must follow.
VLOG(1) << "osqueryd worker (" << getpid() VLOG(1) << "osqueryd worker ("
<< ") detected killed watcher (" << watcher_ << ")"; << PlatformProcess::getCurrentProcess()->pid()
<< ") detected killed watcher (" << watcher_->pid() << ")";
// The watcher watcher is a thread. Do not join services after removing. // The watcher watcher is a thread. Do not join services after removing.
Initializer::requestShutdown(); Initializer::requestShutdown();
break; break;
@ -443,3 +438,4 @@ size_t getWorkerLimit(WatchdogLimitType name, int level) {
return kWatchdogLimits.at(name).at(level); return kWatchdogLimits.at(name).at(level);
} }
} }

View File

@ -13,13 +13,17 @@
#include <atomic> #include <atomic>
#include <string> #include <string>
#ifndef WIN32
#include <unistd.h> #include <unistd.h>
#endif
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <osquery/dispatcher.h> #include <osquery/dispatcher.h>
#include <osquery/flags.h> #include <osquery/flags.h>
#include "osquery/core/process.h"
/// Define a special debug/testing watchdog level. /// Define a special debug/testing watchdog level.
#define WATCHDOG_LEVEL_DEBUG 3 #define WATCHDOG_LEVEL_DEBUG 3
/// Define the default watchdog level, level below are considered permissive. /// Define the default watchdog level, level below are considered permissive.
@ -27,6 +31,8 @@
namespace osquery { namespace osquery {
using ExtensionMap = std::map<std::string, std::shared_ptr<PlatformProcess>>;
DECLARE_bool(disable_watchdog); DECLARE_bool(disable_watchdog);
DECLARE_int32(watchdog_level); DECLARE_int32(watchdog_level);
@ -113,12 +119,10 @@ class Watcher : private boost::noncopyable {
static void unlock() { instance().lock_.unlock(); } static void unlock() { instance().lock_.unlock(); }
/// Accessor for autoloadable extension paths. /// Accessor for autoloadable extension paths.
static const std::map<std::string, pid_t>& extensions() { static const ExtensionMap& extensions() { return instance().extensions_; }
return instance().extensions_;
}
/// Lookup extension path from pid. /// Lookup extension path from pid.
static std::string getExtensionPath(pid_t child); static std::string getExtensionPath(const PlatformProcess& child);
/// Remove an autoloadable extension path. /// Remove an autoloadable extension path.
static void removeExtensionPath(const std::string& extension); static void removeExtensionPath(const std::string& extension);
@ -127,20 +131,23 @@ class Watcher : private boost::noncopyable {
static void addExtensionPath(const std::string& path); static void addExtensionPath(const std::string& path);
/// Get state information for a worker or extension child. /// Get state information for a worker or extension child.
static PerformanceState& getState(pid_t child); static PerformanceState& getState(const PlatformProcess& child);
static PerformanceState& getState(const std::string& extension); static PerformanceState& getState(const std::string& extension);
/// Accessor for the worker process. /// Accessor for the worker process.
static pid_t getWorker() { return instance().worker_; } static PlatformProcess& getWorker() { return *instance().worker_; }
/// Setter for worker process. /// Setter for worker process.
static void setWorker(pid_t child) { instance().worker_ = child; } static void setWorker(const std::shared_ptr<PlatformProcess>& child) {
instance().worker_ = child;
}
/// Setter for an extension process. /// Setter for an extension process.
static void setExtension(const std::string& extension, pid_t child); static void setExtension(const std::string& extension,
const std::shared_ptr<PlatformProcess>& child);
/// Reset pid and performance counters for a worker or extension process. /// Reset pid and performance counters for a worker or extension process.
static void reset(pid_t child); static void reset(const PlatformProcess& child);
/// Count the number of worker restarts. /// Count the number of worker restarts.
static size_t workerRestartCount() { return instance().worker_restarts_; } static size_t workerRestartCount() { return instance().worker_restarts_; }
@ -166,7 +173,9 @@ class Watcher : private boost::noncopyable {
private: private:
/// Do not request the lock until extensions are used. /// Do not request the lock until extensions are used.
Watcher() Watcher()
: worker_(-1), worker_restarts_(0), lock_(mutex_, std::defer_lock) {} : worker_(std::make_shared<PlatformProcess>()),
worker_restarts_(0),
lock_(mutex_, std::defer_lock) {}
Watcher(Watcher const&); Watcher(Watcher const&);
void operator=(Watcher const&); void operator=(Watcher const&);
@ -185,13 +194,13 @@ class Watcher : private boost::noncopyable {
private: private:
/// Keep the single worker process/thread ID for inspection. /// Keep the single worker process/thread ID for inspection.
std::atomic<int> worker_{-1}; std::shared_ptr<PlatformProcess> worker_;
/// Number of worker restarts NOT induced by a watchdog process. /// Number of worker restarts NOT induced by a watchdog process.
size_t worker_restarts_{0}; size_t worker_restarts_{0};
/// Keep a list of resolved extension paths and their managed pids. /// Keep a list of resolved extension paths and their managed pids.
std::map<std::string, pid_t> extensions_; ExtensionMap extensions_;
/// Paths to autoload extensions. /// Paths to autoload extensions.
std::vector<std::string> extensions_paths_; std::vector<std::string> extensions_paths_;
@ -255,9 +264,9 @@ class WatcherRunner : public InternalRunnable {
/// Boilerplate function to sleep for some configured latency /// Boilerplate function to sleep for some configured latency
bool ok(); bool ok();
/// Begin the worker-watcher process. /// Begin the worker-watcher process.
bool watch(pid_t child); bool watch(const PlatformProcess& child) const;
/// Inspect into the memory, CPU, and other worker/extension process states. /// Inspect into the memory, CPU, and other worker/extension process states.
bool isChildSane(pid_t child); bool isChildSane(const PlatformProcess& child) const;
private: private:
/// Fork and execute a worker process. /// Fork and execute a worker process.
@ -265,7 +274,7 @@ class WatcherRunner : public InternalRunnable {
/// Fork an extension process. /// Fork an extension process.
bool createExtension(const std::string& extension); bool createExtension(const std::string& extension);
/// If a worker/extension has otherwise gone insane, stop it. /// If a worker/extension has otherwise gone insane, stop it.
void stopChild(pid_t child); void stopChild(const PlatformProcess& child) const;
private: private:
/// Keep the invocation daemon's argc to iterate through argv. /// Keep the invocation daemon's argc to iterate through argv.
@ -279,16 +288,18 @@ class WatcherRunner : public InternalRunnable {
/// The WatcherWatcher is spawned within the worker and watches the watcher. /// The WatcherWatcher is spawned within the worker and watches the watcher.
class WatcherWatcherRunner : public InternalRunnable { class WatcherWatcherRunner : public InternalRunnable {
public: public:
explicit WatcherWatcherRunner(pid_t watcher) : watcher_(watcher) {} explicit WatcherWatcherRunner(const std::shared_ptr<PlatformProcess>& watcher)
: watcher_(watcher) {}
/// Runnable thread's entry point. /// Runnable thread's entry point.
void start(); void start();
private: private:
/// Parent, or watchdog, process ID. /// Parent, or watchdog, process ID.
pid_t watcher_{-1}; std::shared_ptr<PlatformProcess> watcher_;
}; };
/// Get a performance limit by name and optional level. /// Get a performance limit by name and optional level.
size_t getWorkerLimit(WatchdogLimitType limit, int level = -1); size_t getWorkerLimit(WatchdogLimitType limit, int level = -1);
} }

View File

@ -79,7 +79,7 @@ std::shared_ptr<PlatformProcess> PlatformProcess::getCurrentProcess() {
HANDLE handle = HANDLE handle =
::OpenProcess(PROCESS_ALL_ACCESS, FALSE, ::GetCurrentProcessId()); ::OpenProcess(PROCESS_ALL_ACCESS, FALSE, ::GetCurrentProcessId());
if (handle == NULL) { if (handle == NULL) {
return std::shared_ptr<PlatformProcess>(); return std::make_shared<PlatformProcess>();
} }
return std::make_shared<PlatformProcess>(handle); return std::make_shared<PlatformProcess>(handle);
@ -88,7 +88,7 @@ std::shared_ptr<PlatformProcess> PlatformProcess::getCurrentProcess() {
std::shared_ptr<PlatformProcess> PlatformProcess::getLauncherProcess() { std::shared_ptr<PlatformProcess> PlatformProcess::getLauncherProcess() {
auto launcher_handle = getEnvVar("OSQUERY_LAUNCHER"); auto launcher_handle = getEnvVar("OSQUERY_LAUNCHER");
if (!launcher_handle) { if (!launcher_handle) {
return std::shared_ptr<PlatformProcess>(); return std::make_shared<PlatformProcess>();
} }
// Convert the environment variable into a HANDLE (the value from environment // Convert the environment variable into a HANDLE (the value from environment
@ -101,26 +101,27 @@ std::shared_ptr<PlatformProcess> PlatformProcess::getLauncherProcess() {
std::stoull(*launcher_handle, nullptr, 16))); std::stoull(*launcher_handle, nullptr, 16)));
} }
catch (std::invalid_argument e) { catch (std::invalid_argument e) {
return std::shared_ptr<PlatformProcess>(); return std::make_shared<PlatformProcess>();
} }
catch (std::out_of_range e) { catch (std::out_of_range e) {
return std::shared_ptr<PlatformProcess>(); return std::make_shared<PlatformProcess>();
} }
if (handle == NULL || handle == INVALID_HANDLE_VALUE) { if (handle == NULL || handle == INVALID_HANDLE_VALUE) {
return std::shared_ptr<PlatformProcess>(); return std::make_shared<PlatformProcess>();
} }
return std::make_shared<PlatformProcess>(handle); return std::make_shared<PlatformProcess>(handle);
} }
std::shared_ptr<PlatformProcess> PlatformProcess::launchWorker( std::shared_ptr<PlatformProcess> PlatformProcess::launchWorker(
const std::string &exec_path, const std::string &name) { const std::string &exec_path, int argc, char **argv) {
::STARTUPINFOA si = {0}; ::STARTUPINFOA si = {0};
::PROCESS_INFORMATION pi = {0}; ::PROCESS_INFORMATION pi = {0};
si.cb = sizeof(si); si.cb = sizeof(si);
std::stringstream argv_stream;
std::stringstream handle_stream; std::stringstream handle_stream;
// The HANDLE exposed to the child process is currently limited to only having // The HANDLE exposed to the child process is currently limited to only having
@ -158,15 +159,29 @@ std::shared_ptr<PlatformProcess> PlatformProcess::launchWorker(
// Since Windows does not accept a char * array for arguments, we have to // Since Windows does not accept a char * array for arguments, we have to
// build one as a string. Therefore, we need to make sure that special // build one as a string. Therefore, we need to make sure that special
// characters are not present that would obstruct the parsing of arguments. // characters are not present that would obstruct the parsing of arguments.
// For now, we strip out all double quotes. // For now, we strip out all double quotes. If the an entry in argv has
// spaces, we will put double-quotes around the entry.
//
// NOTE: This is extremely naive and will break the moment complexities are
// involved... Windows command line argument parsing is extremely
// nitpicky and is different in behavior than POSIX argv parsing.
// //
// We don't directly use argv.c_str() as the value for lpCommandLine in // We don't directly use argv.c_str() as the value for lpCommandLine in
// CreateProcess since that argument requires a modifiable buffer. So, // CreateProcess since that argument requires a modifiable buffer. So,
// instead, we off-load the contents of argv into a vector which will have its // instead, we off-load the contents of argv into a vector which will have its
// backing memory as modifiable. // backing memory as modifiable.
auto argv = for (size_t i = 0; i < argc; i++) {
std::string("\"") + boost::replace_all_copy(name, "\" ", " ") + "\""; std::string component(argv[i]);
std::vector<char> mutable_argv(argv.begin(), argv.end()); if (component.find(" ") != std::string::npos) {
boost::replace_all(component, "\"", "\\\"");
argv_stream << "\"" << component << "\" ";
} else {
argv_stream << component << " ";
}
}
std::string cmdline = argv_stream.str();
std::vector<char> mutable_argv(cmdline.begin(), cmdline.end());
mutable_argv.push_back('\0'); mutable_argv.push_back('\0');
BOOL status = ::CreateProcessA(exec_path.c_str(), BOOL status = ::CreateProcessA(exec_path.c_str(),

View File

@ -35,13 +35,18 @@ std::string kProcessTestExecPath;
const char *kOsqueryTestModuleName = "osquery_tests.exe"; const char *kOsqueryTestModuleName = "osquery_tests.exe";
/// These are the expected arguments for our test worker process. /// These are the expected arguments for our test worker process.
const char *kExpectedWorkerArgs[] = {"worker-test"}; const char *kExpectedWorkerArgs[] = {"worker-test", "--socket",
"fake-socket", nullptr};
const size_t kExpectedWorkerArgsCount =
(sizeof(osquery::kExpectedWorkerArgs) / sizeof(char *)) - 1;
/// These are the expected arguments for our test extensions process. /// These are the expected arguments for our test extensions process.
const char *kExpectedExtensionArgs[] = { const char *kExpectedExtensionArgs[] = {
"osquery extension: extension-test", "--socket", "socket-name", "osquery extension: extension-test", "--socket", "socket-name",
"--timeout", "100", "--interval", "--timeout", "100", "--interval",
"5", "--verbose"}; "5", "--verbose", nullptr};
const size_t kExpectedExtensionArgsCount =
(sizeof(osquery::kExpectedExtensionArgs) / sizeof(char *)) - 1;
static bool compareArguments(char *result[], static bool compareArguments(char *result[],
unsigned int result_nelms, unsigned int result_nelms,
@ -69,8 +74,7 @@ int workerMain(int argc, char *argv[]) {
if (!osquery::compareArguments(argv, if (!osquery::compareArguments(argv,
argc, argc,
osquery::kExpectedWorkerArgs, osquery::kExpectedWorkerArgs,
sizeof(osquery::kExpectedWorkerArgs) / osquery::kExpectedWorkerArgsCount)) {
sizeof(const char *))) {
return ERROR_COMPARE_ARGUMENT; return ERROR_COMPARE_ARGUMENT;
} }
@ -106,8 +110,7 @@ int extensionMain(int argc, char *argv[]) {
if (!osquery::compareArguments(argv, if (!osquery::compareArguments(argv,
argc, argc,
osquery::kExpectedExtensionArgs, osquery::kExpectedExtensionArgs,
sizeof(osquery::kExpectedExtensionArgs) / osquery::kExpectedExtensionArgsCount)) {
sizeof(const char *))) {
return ERROR_COMPARE_ARGUMENT; return ERROR_COMPARE_ARGUMENT;
} }
return EXTENSION_SUCCESS_CODE; return EXTENSION_SUCCESS_CODE;