Fix shell's --profile switch

This commit is contained in:
Teddy Reed 2016-03-20 16:05:13 -07:00
parent 0ab1a156cd
commit ebb0ab30ce
10 changed files with 97 additions and 31 deletions

View File

@ -354,7 +354,7 @@ class RegistryHelperCore : private boost::noncopyable {
protected: protected:
/// A map of registered plugin instances to their registered identifier. /// A map of registered plugin instances to their registered identifier.
std::map<std::string, std::shared_ptr<Plugin> > items_; std::map<std::string, std::shared_ptr<Plugin>> items_;
/// If aliases are used, a map of alias to item name. /// If aliases are used, a map of alias to item name.
std::map<std::string, std::string> aliases_; std::map<std::string, std::string> aliases_;
@ -782,6 +782,9 @@ class RegistryFactory : private boost::noncopyable {
/// This will cause extension-internal events to forward to osquery core. /// This will cause extension-internal events to forward to osquery core.
bool external_{false}; bool external_{false};
/// Protector for broadcast lookups and external registry mutations.
mutable Mutex mutex_;
private: private:
friend class RegistryHelperCore; friend class RegistryHelperCore;
friend class RegistryModuleLoader; friend class RegistryModuleLoader;

View File

@ -105,7 +105,6 @@ volatile std::sig_atomic_t kHandledSignal{0};
static inline bool isWatcher() { return (osquery::Watcher::getWorker() > 0); } static inline bool isWatcher() { return (osquery::Watcher::getWorker() > 0); }
void signalHandler(int num) { void signalHandler(int num) {
// Inform exit status of main threads blocked by service joins. // Inform exit status of main threads blocked by service joins.
if (kHandledSignal == 0) { if (kHandledSignal == 0) {
kHandledSignal = num; kHandledSignal = num;

View File

@ -179,9 +179,7 @@ static char continuePrompt[20]; // Continuation prompt. default: " ...> "
// since the shell is built around the callback paradigm it would be a lot // since the shell is built around the callback paradigm it would be a lot
// of work. Instead just use this hack, which is quite harmless. // of work. Instead just use this hack, which is quite harmless.
static const char *zShellStatic = 0; static const char *zShellStatic = 0;
static void shellstaticFunc(sqlite3_context *context, void shellstaticFunc(sqlite3_context *context, int argc, sqlite3_value **argv) {
int argc,
sqlite3_value **argv) {
assert(0 == argc); assert(0 == argc);
assert(zShellStatic); assert(zShellStatic);
UNUSED_PARAMETER(argc); UNUSED_PARAMETER(argc);
@ -670,9 +668,8 @@ static void output_csv(struct callback_data *p, const char *z, int bSep) {
*/ */
static void interrupt_handler(int signal) { static void interrupt_handler(int signal) {
if (signal == SIGINT) { if (signal == SIGINT) {
osquery::Initializer::requestShutdown(130); seenInterrupt = 1;
} }
seenInterrupt = 1;
} }
#endif #endif
@ -1686,14 +1683,9 @@ int launchIntoShell(int argc, char **argv) {
sqlite3WhereTrace = 0xffffffff; sqlite3WhereTrace = 0xffffffff;
#endif #endif
{ // Move the attach function method into the osquery SQL implementation.
// Hold the manager connection instance again in callbacks. // This allow simple/straightforward control of concurrent DB access.
auto dbc = SQLiteDBManager::get(); osquery::attachFunctionInternal("shellstatic", shellstaticFunc);
// Add some shell-specific functions to the instance.
sqlite3_create_function(
dbc->db(), "shellstatic", 0, SQLITE_UTF8, 0, shellstaticFunc, 0, 0);
}
stdin_is_interactive = isatty(0); stdin_is_interactive = isatty(0);
// SQLite: Make sure we have a valid signal handler early // SQLite: Make sure we have a valid signal handler early

View File

@ -16,6 +16,13 @@
#include "osquery/core/conversions.h" #include "osquery/core/conversions.h"
#include "osquery/dispatcher/dispatcher.h" #include "osquery/dispatcher/dispatcher.h"
#if 0
#ifdef DLOG
#undef DLOG
#define DLOG(v) LOG(v)
#endif
#endif
namespace osquery { namespace osquery {
/// The worker_threads define the default thread pool size. /// The worker_threads define the default thread pool size.
@ -56,6 +63,8 @@ Status Dispatcher::addService(InternalRunnableRef service) {
auto thread = std::make_shared<std::thread>( auto thread = std::make_shared<std::thread>(
std::bind(&InternalRunnable::run, &*service)); std::bind(&InternalRunnable::run, &*service));
WriteLock lock(self.mutex_); WriteLock lock(self.mutex_);
DLOG(INFO) << "Adding new service: " << &*service
<< " to thread: " << &*thread;
self.service_threads_.push_back(thread); self.service_threads_.push_back(thread);
self.services_.push_back(std::move(service)); self.services_.push_back(std::move(service));
return Status(0, "OK"); return Status(0, "OK");
@ -63,21 +72,27 @@ Status Dispatcher::addService(InternalRunnableRef service) {
void Dispatcher::joinServices() { void Dispatcher::joinServices() {
auto& self = instance(); auto& self = instance();
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a join";
WriteLock join_lock(self.join_mutex_); WriteLock join_lock(self.join_mutex_);
for (auto& thread : self.service_threads_) { for (auto& thread : self.service_threads_) {
// Boost threads would have been interrupted, and joined using the // Boost threads would have been interrupted, and joined using the
// provided thread instance. // provided thread instance.
thread->join(); thread->join();
DLOG(INFO) << "Service thread: " << &*thread << " has joined";
} }
WriteLock lock(self.mutex_); WriteLock lock(self.mutex_);
self.services_.clear(); self.services_.clear();
self.service_threads_.clear(); self.service_threads_.clear();
DLOG(INFO) << "Services and threads have been cleared";
} }
void Dispatcher::stopServices() { void Dispatcher::stopServices() {
auto& self = instance(); auto& self = instance();
WriteLock lock(self.mutex_); WriteLock lock(self.mutex_);
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a stop";
for (const auto& service : self.services_) { for (const auto& service : self.services_) {
while (true) { while (true) {
// Wait for each thread's entry point (start) meaning the thread context // Wait for each thread's entry point (start) meaning the thread context
@ -90,6 +105,7 @@ void Dispatcher::stopServices() {
::usleep(20); ::usleep(20);
} }
service->interrupt(); service->interrupt();
DLOG(INFO) << "Service: " << &*service << " has been interrupted";
} }
} }
} }

View File

@ -131,12 +131,19 @@ void ExtensionManagerWatcher::watch() {
ExtensionStatus status; ExtensionStatus status;
for (const auto& uuid : uuids) { for (const auto& uuid : uuids) {
try { auto path = getExtensionSocket(uuid);
auto client = EXClient(getExtensionSocket(uuid)); if (isWritable(path)) {
// Ping the extension until it goes down. try {
client.get()->ping(status); auto client = EXClient(path);
} catch (const std::exception& e) { // Ping the extension until it goes down.
failures_[uuid] += 1; client.get()->ping(status);
} catch (const std::exception& e) {
failures_[uuid] += 1;
continue;
}
} else {
// Immediate fail non-writable paths.
failures_[uuid] = 3;
continue; continue;
} }

View File

@ -13,6 +13,7 @@
#include <iostream> #include <iostream>
#include <osquery/core.h> #include <osquery/core.h>
#include <osquery/database.h>
#include <osquery/extensions.h> #include <osquery/extensions.h>
#include <osquery/flags.h> #include <osquery/flags.h>
@ -30,6 +31,8 @@ HIDDEN_FLAG(int32,
profile_delay, profile_delay,
0, 0,
"Sleep a number of seconds before and after the profiling"); "Sleep a number of seconds before and after the profiling");
DECLARE_bool(disable_caching);
} }
int profile(int argc, char *argv[]) { int profile(int argc, char *argv[]) {
@ -48,6 +51,10 @@ int profile(int argc, char *argv[]) {
::sleep(osquery::FLAGS_profile_delay); ::sleep(osquery::FLAGS_profile_delay);
} }
// Perform some duplication from Initializer with respect to database setup.
osquery::DatabasePlugin::setAllowOpen(true);
osquery::Registry::setActive("database", "ephemeral");
auto dbc = osquery::SQLiteDBManager::get(); auto dbc = osquery::SQLiteDBManager::get();
for (size_t i = 0; i < static_cast<size_t>(osquery::FLAGS_profile); ++i) { for (size_t i = 0; i < static_cast<size_t>(osquery::FLAGS_profile); ++i) {
osquery::QueryData results; osquery::QueryData results;
@ -82,6 +89,7 @@ int main(int argc, char *argv[]) {
osquery::FLAGS_L || osquery::FLAGS_profile > 0) { osquery::FLAGS_L || osquery::FLAGS_profile > 0) {
// A query was set as a positional argument, via stdin, or profiling is on. // A query was set as a positional argument, via stdin, or profiling is on.
osquery::FLAGS_disable_events = true; osquery::FLAGS_disable_events = true;
osquery::FLAGS_disable_caching = true;
// The shell may have loaded table extensions, if not, disable the manager. // The shell may have loaded table extensions, if not, disable the manager.
if (!osquery::Watcher::hasManagedExtensions()) { if (!osquery::Watcher::hasManagedExtensions()) {
osquery::FLAGS_disable_extensions = true; osquery::FLAGS_disable_extensions = true;
@ -94,11 +102,11 @@ int main(int argc, char *argv[]) {
// Virtual tables will be attached to the shell's in-memory SQLite DB. // Virtual tables will be attached to the shell's in-memory SQLite DB.
retcode = osquery::launchIntoShell(argc, argv); retcode = osquery::launchIntoShell(argc, argv);
// Finally shutdown.
runner.requestShutdown();
} else { } else {
retcode = profile(argc, argv); retcode = profile(argc, argv);
} }
// Finally shutdown.
runner.requestShutdown();
return retcode; return retcode;
} }

View File

@ -27,9 +27,6 @@ namespace osquery {
HIDDEN_FLAG(bool, registry_exceptions, false, "Allow plugin exceptions"); HIDDEN_FLAG(bool, registry_exceptions, false, "Allow plugin exceptions");
/// Mutex to control access to registry extensions and their UUIDs.
Mutex gRegistryExtensionsMutex;
void RegistryHelperCore::remove(const std::string& item_name) { void RegistryHelperCore::remove(const std::string& item_name) {
if (items_.count(item_name) > 0) { if (items_.count(item_name) > 0) {
items_[item_name]->tearDown(); items_[item_name]->tearDown();
@ -292,7 +289,9 @@ RegistryBroadcast RegistryFactory::getBroadcast() {
Status RegistryFactory::addBroadcast(const RouteUUID& uuid, Status RegistryFactory::addBroadcast(const RouteUUID& uuid,
const RegistryBroadcast& broadcast) { const RegistryBroadcast& broadcast) {
if (instance().extensions_.count(uuid) > 0) { auto& self = instance();
WriteLock lock(self.mutex_);
if (self.extensions_.count(uuid) > 0) {
return Status(1, "Duplicate extension UUID: " + std::to_string(uuid)); return Status(1, "Duplicate extension UUID: " + std::to_string(uuid));
} }
@ -332,12 +331,13 @@ Status RegistryFactory::addBroadcast(const RouteUUID& uuid,
Registry::registry(registry.first)->removeExternal(uuid); Registry::registry(registry.first)->removeExternal(uuid);
} }
} }
instance().extensions_.insert(uuid); self.extensions_.insert(uuid);
return status; return status;
} }
Status RegistryFactory::removeBroadcast(const RouteUUID& uuid) { Status RegistryFactory::removeBroadcast(const RouteUUID& uuid) {
WriteLock lock(gRegistryExtensionsMutex); auto& self = instance();
WriteLock lock(self.mutex_);
if (instance().extensions_.count(uuid) == 0) { if (instance().extensions_.count(uuid) == 0) {
return Status(1, "Unknown extension UUID: " + std::to_string(uuid)); return Status(1, "Unknown extension UUID: " + std::to_string(uuid));
} }
@ -467,9 +467,10 @@ std::vector<std::string> RegistryFactory::names(
} }
std::vector<RouteUUID> RegistryFactory::routeUUIDs() { std::vector<RouteUUID> RegistryFactory::routeUUIDs() {
WriteLock lock(gRegistryExtensionsMutex); auto& self = instance();
WriteLock lock(self.mutex_);
std::vector<RouteUUID> uuids; std::vector<RouteUUID> uuids;
for (const auto& extension : instance().extensions_) { for (const auto& extension : self.extensions_) {
uuids.push_back(extension); uuids.push_back(extension);
} }
return uuids; return uuids;

View File

@ -124,6 +124,9 @@ Status SQLiteSQLPlugin::attach(const std::string& name) {
} }
auto statement = columnDefinition(response); auto statement = columnDefinition(response);
// Attach requests occurring via the plugin/registry APIs must act on the
// primary database. To allow this, getConnection can explicitly request the
// primary instance and avoid the contention decisions.
auto dbc = SQLiteDBManager::getConnection(true); auto dbc = SQLiteDBManager::getConnection(true);
return attachTableInternal(name, statement, dbc); return attachTableInternal(name, statement, dbc);
} }

View File

@ -19,6 +19,16 @@ namespace osquery {
SHELL_FLAG(bool, planner, false, "Enable osquery runtime planner output"); SHELL_FLAG(bool, planner, false, "Enable osquery runtime planner output");
/**
* @brief A protection around concurrent table attach requests.
*
* Table attaching is not concurrent. Attaching is the only unprotected SQLite
* operation from osquery's usage perspective. The extensions API allows for
* concurrent access of non-thread-safe database resources for attaching table
* schema and filter routing instructions.
*/
Mutex kAttachMutex;
namespace tables { namespace tables {
namespace sqlite { namespace sqlite {
@ -396,6 +406,7 @@ Status attachTableInternal(const std::string &name,
// Note, if the clientData API is used then this will save a registry call // Note, if the clientData API is used then this will save a registry call
// within xCreate. // within xCreate.
WriteLock lock(kAttachMutex);
int rc = sqlite3_create_module( int rc = sqlite3_create_module(
instance->db(), name.c_str(), &module, (void *)&(*instance)); instance->db(), name.c_str(), &module, (void *)&(*instance));
if (rc == SQLITE_OK || rc == SQLITE_MISUSE) { if (rc == SQLITE_OK || rc == SQLITE_MISUSE) {
@ -409,6 +420,7 @@ Status attachTableInternal(const std::string &name,
} }
Status detachTableInternal(const std::string &name, sqlite3 *db) { Status detachTableInternal(const std::string &name, sqlite3 *db) {
WriteLock lock(kAttachMutex);
auto format = "DROP TABLE IF EXISTS temp." + name; auto format = "DROP TABLE IF EXISTS temp." + name;
int rc = sqlite3_exec(db, format.c_str(), nullptr, nullptr, 0); int rc = sqlite3_exec(db, format.c_str(), nullptr, nullptr, 0);
if (rc != SQLITE_OK) { if (rc != SQLITE_OK) {
@ -418,6 +430,26 @@ Status detachTableInternal(const std::string &name, sqlite3 *db) {
return Status(rc, getStringForSQLiteReturnCode(rc)); return Status(rc, getStringForSQLiteReturnCode(rc));
} }
Status attachFunctionInternal(
const std::string &name,
std::function<
void(sqlite3_context *context, int argc, sqlite3_value **argv)> func) {
// Hold the manager connection instance again in callbacks.
auto dbc = SQLiteDBManager::get();
// Add some shell-specific functions to the instance.
WriteLock lock(kAttachMutex);
int rc = sqlite3_create_function(
dbc->db(),
name.c_str(),
0,
SQLITE_UTF8,
nullptr,
*func.target<void (*)(sqlite3_context *, int, sqlite3_value **)>(),
nullptr,
nullptr);
return Status(rc);
}
void attachVirtualTables(const SQLiteDBInstanceRef &instance) { void attachVirtualTables(const SQLiteDBInstanceRef &instance) {
PluginResponse response; PluginResponse response;
for (const auto &name : Registry::names("table")) { for (const auto &name : Registry::names("table")) {

View File

@ -62,6 +62,11 @@ Status attachTableInternal(const std::string &name,
/// Detach (drop) a table. /// Detach (drop) a table.
Status detachTableInternal(const std::string &name, sqlite3 *db); Status detachTableInternal(const std::string &name, sqlite3 *db);
Status attachFunctionInternal(
const std::string &name,
std::function<
void(sqlite3_context *context, int argc, sqlite3_value **argv)> func);
/// Attach all table plugins to an in-memory SQLite database. /// Attach all table plugins to an in-memory SQLite database.
void attachVirtualTables(const SQLiteDBInstanceRef &instance); void attachVirtualTables(const SQLiteDBInstanceRef &instance);
} }