osquery-1/osquery/dispatcher/dispatcher.cpp

159 lines
4.4 KiB
C++
Raw Normal View History

/*
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
2015-05-12 06:31:13 +00:00
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
2016-03-11 08:30:20 +00:00
#include <chrono>
2016-03-21 22:27:51 +00:00
#include <osquery/dispatcher.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
#include "osquery/core/conversions.h"
#include "osquery/core/process.h"
namespace osquery {
/// The worker_threads define the default thread pool size.
2015-02-17 08:36:20 +00:00
FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
/// Cancel the pause request.
void RunnerInterruptPoint::cancel() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
condition_.notify_all();
}
/// Pause until the requested millisecond delay has elapsed or a cancel.
void RunnerInterruptPoint::pause(std::chrono::milliseconds milli) {
std::unique_lock<std::mutex> lock(mutex_);
if (stop_ || condition_.wait_for(lock, milli) == std::cv_status::no_timeout) {
stop_ = false;
throw RunnerInterruptError();
}
}
2016-03-21 22:27:51 +00:00
void InterruptableRunnable::interrupt() {
WriteLock lock(stopping_);
// Set the service as interrupted.
interrupted_ = true;
// Tear down the service's resources such that exiting the expected run
// loop within ::start does not need to.
stop();
// Cancel the run loop's pause request.
point_.cancel();
}
bool InterruptableRunnable::interrupted() {
WriteLock lock(stopping_);
// A small conditional to force-skip an interruption check, used in testing.
if (bypass_check_ && !checked_) {
checked_ = true;
return false;
}
2016-03-21 22:27:51 +00:00
return interrupted_;
}
void InterruptableRunnable::pauseMilli(std::chrono::milliseconds milli) {
try {
point_.pause(milli);
} catch (const RunnerInterruptError&) {
// The pause request was canceled.
}
}
void InternalRunnable::run() {
run_ = true;
start();
// The service is complete.
Dispatcher::removeService(this);
}
Status Dispatcher::addService(InternalRunnableRef service) {
2015-02-04 03:55:16 +00:00
if (service->hasRun()) {
return Status(1, "Cannot schedule a service twice");
}
2015-05-04 03:02:01 +00:00
auto& self = instance();
if (self.stopping_) {
// Cannot add a service while the dispatcher is stopping and no joins
// have been requested.
return Status(1, "Cannot add service, dispatcher is stopping");
}
2016-03-11 08:30:20 +00:00
auto thread = std::make_shared<std::thread>(
std::bind(&InternalRunnable::run, &*service));
WriteLock lock(self.mutex_);
DLOG(INFO) << "Adding new service: " << service.get()
<< " to thread: " << thread.get();
2015-05-04 03:02:01 +00:00
self.service_threads_.push_back(thread);
self.services_.push_back(std::move(service));
2015-02-04 03:55:16 +00:00
return Status(0, "OK");
}
void Dispatcher::removeService(const InternalRunnable* service) {
auto& self = Dispatcher::instance();
WriteLock lock(self.mutex_);
// Remove the service.
self.services_.erase(
std::remove_if(self.services_.begin(),
self.services_.end(),
[service](const InternalRunnableRef& target) {
return (target.get() == service);
}),
self.services_.end());
}
inline static void assureRun(const InternalRunnableRef& service) {
while (true) {
// Wait for each thread's entry point (start) meaning the thread context
// was allocated and (run) was called by std::thread started.
if (service->hasRun()) {
break;
}
// We only need to check if std::terminate is called very quickly after
// the std::thread is created.
sleepFor(20);
}
}
2015-02-04 03:55:16 +00:00
void Dispatcher::joinServices() {
auto& self = instance();
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a join";
WriteLock join_lock(self.join_mutex_);
for (auto& thread : self.service_threads_) {
2015-02-04 03:55:16 +00:00
thread->join();
DLOG(INFO) << "Service thread: " << thread.get() << " has joined";
2015-02-04 03:55:16 +00:00
}
WriteLock lock(self.mutex_);
self.services_.clear();
self.service_threads_.clear();
self.stopping_ = false;
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Services and threads have been cleared";
2015-02-04 03:55:16 +00:00
}
2015-05-06 00:09:07 +00:00
void Dispatcher::stopServices() {
2015-05-04 03:02:01 +00:00
auto& self = instance();
self.stopping_ = true;
WriteLock lock(self.mutex_);
2016-03-20 23:05:13 +00:00
DLOG(INFO) << "Thread: " << std::this_thread::get_id()
<< " requesting a stop";
2015-05-04 03:02:01 +00:00
for (const auto& service : self.services_) {
assureRun(service);
service->interrupt();
DLOG(INFO) << "Service: " << service.get() << " has been interrupted";
2015-02-04 03:55:16 +00:00
}
}
}