osquery-1/osquery/dispatcher/dispatcher.cpp

146 lines
3.7 KiB
C++
Raw Normal View History

/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <boost/date_time/posix_time/posix_time.hpp>
#include <osquery/dispatcher.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
#include "osquery/core/conversions.h"
using namespace apache::thrift::concurrency;
namespace osquery {
/// The worker_threads define the default thread pool size.
2015-02-17 08:36:20 +00:00
FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
void interruptableSleep(size_t milli) {
boost::this_thread::sleep(boost::posix_time::milliseconds(milli));
}
2014-09-30 02:06:33 +00:00
Dispatcher& Dispatcher::getInstance() {
static Dispatcher d;
return d;
}
Dispatcher::Dispatcher() {
#ifdef FBOSQUERY
thread_manager_ = ThreadManager::newSimpleThreadManager(
(size_t)FLAGS_worker_threads, 0);
auto threadFactory =
std::shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
#else
2015-02-04 03:55:16 +00:00
thread_manager_ =
boost_to_std_shared_ptr(InternalThreadManager::newSimpleThreadManager(
(size_t)FLAGS_worker_threads, 0));
auto threadFactory =
boost::shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
#endif
2014-09-15 18:09:33 +00:00
thread_manager_->threadFactory(threadFactory);
thread_manager_->start();
}
2015-02-04 03:55:16 +00:00
Status Dispatcher::add(std::shared_ptr<InternalRunnable> task) {
try {
#ifdef FBOSQUERY
thread_manager_->add(task, 0, 0);
#else
2014-09-15 18:09:33 +00:00
thread_manager_->add(std_to_boost_shared_ptr(task), 0, 0);
#endif
2014-09-21 21:29:28 +00:00
} catch (std::exception& e) {
return Status(1, e.what());
}
return Status(0, "OK");
}
2015-02-04 03:55:16 +00:00
Status Dispatcher::addService(std::shared_ptr<InternalRunnable> service) {
if (service->hasRun()) {
return Status(1, "Cannot schedule a service twice");
}
auto thread = std::make_shared<boost::thread>(
boost::bind(&InternalRunnable::run, &*service));
service_threads_.push_back(thread);
services_.push_back(std::move(service));
return Status(0, "OK");
}
InternalThreadManagerRef Dispatcher::getThreadManager() {
2014-09-15 18:09:33 +00:00
return thread_manager_;
}
2014-09-15 18:09:33 +00:00
void Dispatcher::join() { thread_manager_->join(); }
2015-02-04 03:55:16 +00:00
void Dispatcher::joinServices() {
2015-02-19 01:19:45 +00:00
for (auto& thread : getInstance().service_threads_) {
2015-02-04 03:55:16 +00:00
thread->join();
}
}
void Dispatcher::removeServices() {
for (const auto& service : services_) {
while (true) {
// Wait for each thread's entry point (enter) meaning the thread context
// was allocated and (run) was called by boost::thread started.
if (service->hasRun()) {
break;
}
// We only need to check if std::terminate is call very quickly after
// the boost::thread is created.
::usleep(200);
}
}
for (auto& thread : service_threads_) {
thread->interrupt();
}
// Deallocate services.
service_threads_.clear();
services_.clear();
}
InternalThreadManager::STATE Dispatcher::state() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->state();
}
2014-09-15 18:09:33 +00:00
void Dispatcher::addWorker(size_t value) { thread_manager_->addWorker(value); }
void Dispatcher::removeWorker(size_t value) {
2014-09-15 18:09:33 +00:00
thread_manager_->removeWorker(value);
}
size_t Dispatcher::idleWorkerCount() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->idleWorkerCount();
}
2014-09-15 18:09:33 +00:00
size_t Dispatcher::workerCount() const {
return thread_manager_->workerCount();
}
size_t Dispatcher::pendingTaskCount() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->pendingTaskCount();
}
size_t Dispatcher::totalTaskCount() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->totalTaskCount();
}
size_t Dispatcher::pendingTaskCountMax() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->pendingTaskCountMax();
}
size_t Dispatcher::expiredTaskCount() const {
2014-09-15 18:09:33 +00:00
return thread_manager_->expiredTaskCount();
}
}