osquery-1/osquery/dispatcher/dispatcher.cpp

159 lines
4.3 KiB
C++
Raw Normal View History

/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed in accordance with the terms specified in
* the LICENSE file found in the root directory of this source tree.
*/
2016-03-21 22:27:51 +00:00
#include <osquery/dispatcher.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
#include <osquery/process/process.h>
namespace osquery {
/// The worker_threads define the default thread pool size.
2015-02-17 08:36:20 +00:00
FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
2016-03-21 22:27:51 +00:00
void InterruptableRunnable::interrupt() {
// Set the service as interrupted.
InterruptableRunnable RunnerInterruptPoint redesign (#4545) * InterruptableRunnable RunnerInterruptPoint redesign There were several inefficiencies in the old version of RunnerInterruptPoint and InterruptableRunnable. 1) RunnerInterruptPoint was throwing the exception when interrupted, however, the exception was always ignored. 2) InterruptableRunnable used the read-write lock, however only write lock was used. 3) InterruptableRunnable InterruptableRunnable, stored almost similar variable stop_, interrupted_. 4) std::atomic<bool> interrupted_ was used with locks, even though it was accessed by default safest access mode memory_order_seq_cst. So no additional cache invalidation was needed. 5) InterruptableRunnable contained code(in method interrupted() and variables bypass_check_, checked) just for testing. Which was slowing down method interrupted(). 6) Some more confusing things. notify_all was not needed, as only one thread could be waiting for the conditional variable. RunnerInterruptPoint:: pause(void) looks ambiguous and that's why was not used anywhere. I resolved all these problems by merging InterruptableRunnable and RunnerInterruptPoint into the InterruptableRunnable. 1) No use of the exception. 2) 4) Simple mutex, which is only used for pauseMilli. InterruptableRunnable::interrupted and InterruptableRunnable::interrupt function lock-free. 3) Single variable interrupted_. 5) Made InterruptableRunnable::interrupt virtual. Tests override interrupt to make things testable. 6) change to notify_one and removed pause without the specific time.
2018-06-15 15:15:43 +00:00
if (!interrupted_.exchange(true)) {
// Tear down the service's resources such that exiting the expected run
// loop within ::start does not need to.
stop();
std::lock_guard<std::mutex> lock(condition_lock);
// Cancel the run loop's pause request.
condition_.notify_one();
}
2016-03-21 22:27:51 +00:00
}
bool InterruptableRunnable::interrupted() {
return interrupted_;
}
2018-08-02 15:57:02 +00:00
void InterruptableRunnable::pause(std::chrono::milliseconds milli) {
InterruptableRunnable RunnerInterruptPoint redesign (#4545) * InterruptableRunnable RunnerInterruptPoint redesign There were several inefficiencies in the old version of RunnerInterruptPoint and InterruptableRunnable. 1) RunnerInterruptPoint was throwing the exception when interrupted, however, the exception was always ignored. 2) InterruptableRunnable used the read-write lock, however only write lock was used. 3) InterruptableRunnable InterruptableRunnable, stored almost similar variable stop_, interrupted_. 4) std::atomic<bool> interrupted_ was used with locks, even though it was accessed by default safest access mode memory_order_seq_cst. So no additional cache invalidation was needed. 5) InterruptableRunnable contained code(in method interrupted() and variables bypass_check_, checked) just for testing. Which was slowing down method interrupted(). 6) Some more confusing things. notify_all was not needed, as only one thread could be waiting for the conditional variable. RunnerInterruptPoint:: pause(void) looks ambiguous and that's why was not used anywhere. I resolved all these problems by merging InterruptableRunnable and RunnerInterruptPoint into the InterruptableRunnable. 1) No use of the exception. 2) 4) Simple mutex, which is only used for pauseMilli. InterruptableRunnable::interrupted and InterruptableRunnable::interrupt function lock-free. 3) Single variable interrupted_. 5) Made InterruptableRunnable::interrupt virtual. Tests override interrupt to make things testable. 6) change to notify_one and removed pause without the specific time.
2018-06-15 15:15:43 +00:00
std::unique_lock<std::mutex> lock(condition_lock);
if (!interrupted_) {
condition_.wait_for(lock, milli);
}
}
void InternalRunnable::run() {
run_ = true;
setThreadName(name());
start();
// The service is complete.
Dispatcher::removeService(this);
}
Dispatcher& Dispatcher::instance() {
static Dispatcher instance;
return instance;
}
size_t Dispatcher::serviceCount() const {
ReadLock lock(mutex_);
return services_.size();
}
Status Dispatcher::addService(InternalRunnableRef service) {
2015-02-04 03:55:16 +00:00
if (service->hasRun()) {
return Status(1, "Cannot schedule a service twice");
}
2015-05-04 03:02:01 +00:00
auto& self = instance();
if (self.stopping_) {
// Cannot add a service while the dispatcher is stopping and no joins
// have been requested.
return Status(1, "Cannot add service, dispatcher is stopping");
}
2018-07-21 18:28:00 +00:00
auto thread = std::make_unique<std::thread>(
2016-03-11 08:30:20 +00:00
std::bind(&InternalRunnable::run, &*service));
DLOG(INFO) << "Adding new service: " << service->name() << " ("
<< service.get() << ") to thread: " << thread->get_id() << " ("
<< thread.get() << ") in process " << platformGetPid();
{
WriteLock lock(self.mutex_);
self.service_threads_.push_back(std::move(thread));
self.services_.push_back(std::move(service));
}
return Status::success();
2015-02-04 03:55:16 +00:00
}
void Dispatcher::removeService(const InternalRunnable* service) {
auto& self = Dispatcher::instance();
WriteLock lock(self.mutex_);
// Remove the service.
self.services_.erase(
std::remove_if(self.services_.begin(),
self.services_.end(),
[service](const InternalRunnableRef& target) {
return (target.get() == service);
}),
self.services_.end());
}
inline static void assureRun(const InternalRunnableRef& service) {
while (true) {
// Wait for each thread's entry point (start) meaning the thread context
// was allocated and (run) was called by std::thread started.
if (service->hasRun()) {
break;
}
// We only need to check if std::terminate is called very quickly after
// the std::thread is created.
sleepFor(20);
}
}
2015-02-04 03:55:16 +00:00
void Dispatcher::joinServices() {
auto& self = instance();
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a join";
// Stops when service_threads_ is empty. Before stopping and releasing of the
// lock, empties services_ .
while (1) {
InternalThreadRef thread = nullptr;
{
WriteLock lock(self.mutex_);
if (!self.service_threads_.empty()) {
thread = std::move(self.service_threads_.back());
self.service_threads_.pop_back();
} else {
self.services_.clear();
break;
}
}
if (thread != nullptr) {
thread->join();
DLOG(INFO) << "Service thread: " << thread.get() << " has joined";
}
2015-02-04 03:55:16 +00:00
}
self.stopping_ = false;
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Services and threads have been cleared";
2015-02-04 03:55:16 +00:00
}
2015-05-06 00:09:07 +00:00
void Dispatcher::stopServices() {
2015-05-04 03:02:01 +00:00
auto& self = instance();
self.stopping_ = true;
WriteLock lock(self.mutex_);
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a stop";
2015-05-04 03:02:01 +00:00
for (const auto& service : self.services_) {
assureRun(service);
service->interrupt();
DLOG(INFO) << "Service: " << service.get() << " has been interrupted";
2015-02-04 03:55:16 +00:00
}
}
} // namespace osquery