diff --git a/include/osquery/registry.h b/include/osquery/registry.h index 1b95e82b..0bff4996 100644 --- a/include/osquery/registry.h +++ b/include/osquery/registry.h @@ -354,7 +354,7 @@ class RegistryHelperCore : private boost::noncopyable { protected: /// A map of registered plugin instances to their registered identifier. - std::map > items_; + std::map> items_; /// If aliases are used, a map of alias to item name. std::map aliases_; @@ -782,6 +782,9 @@ class RegistryFactory : private boost::noncopyable { /// This will cause extension-internal events to forward to osquery core. bool external_{false}; + /// Protector for broadcast lookups and external registry mutations. + mutable Mutex mutex_; + private: friend class RegistryHelperCore; friend class RegistryModuleLoader; diff --git a/osquery/core/init.cpp b/osquery/core/init.cpp index c82b1aaa..43b35d23 100644 --- a/osquery/core/init.cpp +++ b/osquery/core/init.cpp @@ -105,7 +105,6 @@ volatile std::sig_atomic_t kHandledSignal{0}; static inline bool isWatcher() { return (osquery::Watcher::getWorker() > 0); } void signalHandler(int num) { - // Inform exit status of main threads blocked by service joins. if (kHandledSignal == 0) { kHandledSignal = num; diff --git a/osquery/devtools/shell.cpp b/osquery/devtools/shell.cpp index bef655b3..64f5f93c 100644 --- a/osquery/devtools/shell.cpp +++ b/osquery/devtools/shell.cpp @@ -179,9 +179,7 @@ static char continuePrompt[20]; // Continuation prompt. default: " ...> " // since the shell is built around the callback paradigm it would be a lot // of work. Instead just use this hack, which is quite harmless. static const char *zShellStatic = 0; -static void shellstaticFunc(sqlite3_context *context, - int argc, - sqlite3_value **argv) { +void shellstaticFunc(sqlite3_context *context, int argc, sqlite3_value **argv) { assert(0 == argc); assert(zShellStatic); UNUSED_PARAMETER(argc); @@ -670,9 +668,8 @@ static void output_csv(struct callback_data *p, const char *z, int bSep) { */ static void interrupt_handler(int signal) { if (signal == SIGINT) { - osquery::Initializer::requestShutdown(130); + seenInterrupt = 1; } - seenInterrupt = 1; } #endif @@ -1686,14 +1683,9 @@ int launchIntoShell(int argc, char **argv) { sqlite3WhereTrace = 0xffffffff; #endif - { - // Hold the manager connection instance again in callbacks. - auto dbc = SQLiteDBManager::get(); - // Add some shell-specific functions to the instance. - sqlite3_create_function( - dbc->db(), "shellstatic", 0, SQLITE_UTF8, 0, shellstaticFunc, 0, 0); - } - + // Move the attach function method into the osquery SQL implementation. + // This allow simple/straightforward control of concurrent DB access. + osquery::attachFunctionInternal("shellstatic", shellstaticFunc); stdin_is_interactive = isatty(0); // SQLite: Make sure we have a valid signal handler early diff --git a/osquery/dispatcher/dispatcher.cpp b/osquery/dispatcher/dispatcher.cpp index 4c87c158..388a40ef 100644 --- a/osquery/dispatcher/dispatcher.cpp +++ b/osquery/dispatcher/dispatcher.cpp @@ -16,6 +16,13 @@ #include "osquery/core/conversions.h" #include "osquery/dispatcher/dispatcher.h" +#if 0 +#ifdef DLOG +#undef DLOG +#define DLOG(v) LOG(v) +#endif +#endif + namespace osquery { /// The worker_threads define the default thread pool size. @@ -56,6 +63,8 @@ Status Dispatcher::addService(InternalRunnableRef service) { auto thread = std::make_shared( std::bind(&InternalRunnable::run, &*service)); WriteLock lock(self.mutex_); + DLOG(INFO) << "Adding new service: " << &*service + << " to thread: " << &*thread; self.service_threads_.push_back(thread); self.services_.push_back(std::move(service)); return Status(0, "OK"); @@ -63,21 +72,27 @@ Status Dispatcher::addService(InternalRunnableRef service) { void Dispatcher::joinServices() { auto& self = instance(); + DLOG(INFO) << "Thread: " << std::this_thread::get_id() + << " requesting a join"; WriteLock join_lock(self.join_mutex_); for (auto& thread : self.service_threads_) { // Boost threads would have been interrupted, and joined using the // provided thread instance. thread->join(); + DLOG(INFO) << "Service thread: " << &*thread << " has joined"; } WriteLock lock(self.mutex_); self.services_.clear(); self.service_threads_.clear(); + DLOG(INFO) << "Services and threads have been cleared"; } void Dispatcher::stopServices() { auto& self = instance(); WriteLock lock(self.mutex_); + DLOG(INFO) << "Thread: " << std::this_thread::get_id() + << " requesting a stop"; for (const auto& service : self.services_) { while (true) { // Wait for each thread's entry point (start) meaning the thread context @@ -90,6 +105,7 @@ void Dispatcher::stopServices() { ::usleep(20); } service->interrupt(); + DLOG(INFO) << "Service: " << &*service << " has been interrupted"; } } } diff --git a/osquery/extensions/extensions.cpp b/osquery/extensions/extensions.cpp index e4e18d05..b2ef1f29 100644 --- a/osquery/extensions/extensions.cpp +++ b/osquery/extensions/extensions.cpp @@ -131,12 +131,19 @@ void ExtensionManagerWatcher::watch() { ExtensionStatus status; for (const auto& uuid : uuids) { - try { - auto client = EXClient(getExtensionSocket(uuid)); - // Ping the extension until it goes down. - client.get()->ping(status); - } catch (const std::exception& e) { - failures_[uuid] += 1; + auto path = getExtensionSocket(uuid); + if (isWritable(path)) { + try { + auto client = EXClient(path); + // Ping the extension until it goes down. + client.get()->ping(status); + } catch (const std::exception& e) { + failures_[uuid] += 1; + continue; + } + } else { + // Immediate fail non-writable paths. + failures_[uuid] = 3; continue; } diff --git a/osquery/main/shell.cpp b/osquery/main/shell.cpp index 07f72403..ccc8928f 100644 --- a/osquery/main/shell.cpp +++ b/osquery/main/shell.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -30,6 +31,8 @@ HIDDEN_FLAG(int32, profile_delay, 0, "Sleep a number of seconds before and after the profiling"); + +DECLARE_bool(disable_caching); } int profile(int argc, char *argv[]) { @@ -48,6 +51,10 @@ int profile(int argc, char *argv[]) { ::sleep(osquery::FLAGS_profile_delay); } + // Perform some duplication from Initializer with respect to database setup. + osquery::DatabasePlugin::setAllowOpen(true); + osquery::Registry::setActive("database", "ephemeral"); + auto dbc = osquery::SQLiteDBManager::get(); for (size_t i = 0; i < static_cast(osquery::FLAGS_profile); ++i) { osquery::QueryData results; @@ -82,6 +89,7 @@ int main(int argc, char *argv[]) { osquery::FLAGS_L || osquery::FLAGS_profile > 0) { // A query was set as a positional argument, via stdin, or profiling is on. osquery::FLAGS_disable_events = true; + osquery::FLAGS_disable_caching = true; // The shell may have loaded table extensions, if not, disable the manager. if (!osquery::Watcher::hasManagedExtensions()) { osquery::FLAGS_disable_extensions = true; @@ -94,11 +102,11 @@ int main(int argc, char *argv[]) { // Virtual tables will be attached to the shell's in-memory SQLite DB. retcode = osquery::launchIntoShell(argc, argv); + // Finally shutdown. + runner.requestShutdown(); } else { retcode = profile(argc, argv); } - // Finally shutdown. - runner.requestShutdown(); return retcode; } diff --git a/osquery/registry/registry.cpp b/osquery/registry/registry.cpp index ad8b21fd..3336447e 100644 --- a/osquery/registry/registry.cpp +++ b/osquery/registry/registry.cpp @@ -27,9 +27,6 @@ namespace osquery { HIDDEN_FLAG(bool, registry_exceptions, false, "Allow plugin exceptions"); -/// Mutex to control access to registry extensions and their UUIDs. -Mutex gRegistryExtensionsMutex; - void RegistryHelperCore::remove(const std::string& item_name) { if (items_.count(item_name) > 0) { items_[item_name]->tearDown(); @@ -292,7 +289,9 @@ RegistryBroadcast RegistryFactory::getBroadcast() { Status RegistryFactory::addBroadcast(const RouteUUID& uuid, const RegistryBroadcast& broadcast) { - if (instance().extensions_.count(uuid) > 0) { + auto& self = instance(); + WriteLock lock(self.mutex_); + if (self.extensions_.count(uuid) > 0) { return Status(1, "Duplicate extension UUID: " + std::to_string(uuid)); } @@ -332,12 +331,13 @@ Status RegistryFactory::addBroadcast(const RouteUUID& uuid, Registry::registry(registry.first)->removeExternal(uuid); } } - instance().extensions_.insert(uuid); + self.extensions_.insert(uuid); return status; } Status RegistryFactory::removeBroadcast(const RouteUUID& uuid) { - WriteLock lock(gRegistryExtensionsMutex); + auto& self = instance(); + WriteLock lock(self.mutex_); if (instance().extensions_.count(uuid) == 0) { return Status(1, "Unknown extension UUID: " + std::to_string(uuid)); } @@ -467,9 +467,10 @@ std::vector RegistryFactory::names( } std::vector RegistryFactory::routeUUIDs() { - WriteLock lock(gRegistryExtensionsMutex); + auto& self = instance(); + WriteLock lock(self.mutex_); std::vector uuids; - for (const auto& extension : instance().extensions_) { + for (const auto& extension : self.extensions_) { uuids.push_back(extension); } return uuids; diff --git a/osquery/sql/sqlite_util.cpp b/osquery/sql/sqlite_util.cpp index 85ac0374..a9fa68b9 100644 --- a/osquery/sql/sqlite_util.cpp +++ b/osquery/sql/sqlite_util.cpp @@ -124,6 +124,9 @@ Status SQLiteSQLPlugin::attach(const std::string& name) { } auto statement = columnDefinition(response); + // Attach requests occurring via the plugin/registry APIs must act on the + // primary database. To allow this, getConnection can explicitly request the + // primary instance and avoid the contention decisions. auto dbc = SQLiteDBManager::getConnection(true); return attachTableInternal(name, statement, dbc); } diff --git a/osquery/sql/virtual_table.cpp b/osquery/sql/virtual_table.cpp index 7561e1ca..caf68fa4 100644 --- a/osquery/sql/virtual_table.cpp +++ b/osquery/sql/virtual_table.cpp @@ -19,6 +19,16 @@ namespace osquery { SHELL_FLAG(bool, planner, false, "Enable osquery runtime planner output"); +/** + * @brief A protection around concurrent table attach requests. + * + * Table attaching is not concurrent. Attaching is the only unprotected SQLite + * operation from osquery's usage perspective. The extensions API allows for + * concurrent access of non-thread-safe database resources for attaching table + * schema and filter routing instructions. + */ +Mutex kAttachMutex; + namespace tables { namespace sqlite { @@ -396,6 +406,7 @@ Status attachTableInternal(const std::string &name, // Note, if the clientData API is used then this will save a registry call // within xCreate. + WriteLock lock(kAttachMutex); int rc = sqlite3_create_module( instance->db(), name.c_str(), &module, (void *)&(*instance)); if (rc == SQLITE_OK || rc == SQLITE_MISUSE) { @@ -409,6 +420,7 @@ Status attachTableInternal(const std::string &name, } Status detachTableInternal(const std::string &name, sqlite3 *db) { + WriteLock lock(kAttachMutex); auto format = "DROP TABLE IF EXISTS temp." + name; int rc = sqlite3_exec(db, format.c_str(), nullptr, nullptr, 0); if (rc != SQLITE_OK) { @@ -418,6 +430,26 @@ Status detachTableInternal(const std::string &name, sqlite3 *db) { return Status(rc, getStringForSQLiteReturnCode(rc)); } +Status attachFunctionInternal( + const std::string &name, + std::function< + void(sqlite3_context *context, int argc, sqlite3_value **argv)> func) { + // Hold the manager connection instance again in callbacks. + auto dbc = SQLiteDBManager::get(); + // Add some shell-specific functions to the instance. + WriteLock lock(kAttachMutex); + int rc = sqlite3_create_function( + dbc->db(), + name.c_str(), + 0, + SQLITE_UTF8, + nullptr, + *func.target(), + nullptr, + nullptr); + return Status(rc); +} + void attachVirtualTables(const SQLiteDBInstanceRef &instance) { PluginResponse response; for (const auto &name : Registry::names("table")) { diff --git a/osquery/sql/virtual_table.h b/osquery/sql/virtual_table.h index 3846a8da..21f57fba 100644 --- a/osquery/sql/virtual_table.h +++ b/osquery/sql/virtual_table.h @@ -62,6 +62,11 @@ Status attachTableInternal(const std::string &name, /// Detach (drop) a table. Status detachTableInternal(const std::string &name, sqlite3 *db); +Status attachFunctionInternal( + const std::string &name, + std::function< + void(sqlite3_context *context, int argc, sqlite3_value **argv)> func); + /// Attach all table plugins to an in-memory SQLite database. void attachVirtualTables(const SQLiteDBInstanceRef &instance); }