Merge pull request #728 from theopolis/pubs_as_runnables

[Fix #704] Events sleep with dispatcher's interruptableSleep
This commit is contained in:
Teddy Reed 2015-02-10 13:06:16 -08:00
commit 7f7b2acd37
8 changed files with 22 additions and 12 deletions

View File

@ -57,8 +57,6 @@ class InternalRunnable : public apache::thrift::concurrency::Runnable {
/// Check if the thread's entrypoint (run) executed, meaning thread context
/// was allocated.
bool hasRun() { return run_; }
/// Sleep in a boost::thread interruptable state.
void interruptableSleep(size_t milli);
protected:
/// Require the runnable thread define an entrypoint.
@ -121,6 +119,7 @@ class Dispatcher {
*/
Status add(std::shared_ptr<InternalRunnable> task);
/// See `add`, but services are not limited to a thread poll size.
Status addService(std::shared_ptr<InternalRunnable> service);
/**
@ -152,8 +151,10 @@ class Dispatcher {
*/
void join();
/// See `join`, but applied to osquery services.
void joinServices();
/// Destroy and stop all osquery service threads and service objects.
void removeServices();
/**
@ -253,7 +254,12 @@ class Dispatcher {
* @see getThreadManager
*/
InternalThreadManagerRef thread_manager_;
/// The set of shared osquery service threads.
std::vector<std::shared_ptr<boost::thread> > service_threads_;
/// THe set of shared osquery services.
std::vector<std::shared_ptr<InternalRunnable> > services_;
};
/// Sleep in a boost::thread interruptable state.
void interruptableSleep(size_t milli);
}

View File

@ -21,6 +21,7 @@
#include <boost/thread/mutex.hpp>
#include <osquery/database.h>
#include <osquery/dispatcher.h>
#include <osquery/registry.h>
#include <osquery/status.h>
#include <osquery/tables.h>

View File

@ -26,7 +26,7 @@ DEFINE_osquery_flag(int32,
4,
"Number of work dispatch threads");
void InternalRunnable::interruptableSleep(size_t milli) {
void interruptableSleep(size_t milli) {
boost::this_thread::sleep(boost::posix_time::milliseconds(milli));
}

View File

@ -170,7 +170,7 @@ Status IOKitHIDEventPublisher::run() {
CFRunLoopRun();
// Add artificial latency to run loop.
::sleep(1);
osquery::interruptableSleep(1000);
return Status(0, "OK");
}

View File

@ -150,7 +150,7 @@ Status SCNetworkEventPublisher::run() {
CFRunLoopRun();
// Do not expect the run loop to exit often, if so, add artificial latency.
::sleep(1);
osquery::interruptableSleep(1000);
return Status(0, "OK");
}
};

View File

@ -15,7 +15,6 @@
#include <boost/lexical_cast.hpp>
#include <osquery/core.h>
#include <osquery/dispatcher.h>
#include <osquery/events.h>
#include <osquery/flags.h>
#include <osquery/logger.h>
@ -24,6 +23,9 @@
namespace osquery {
/// Helper cooloff (ms) macro to prevent thread failure thrashing.
#define EVENTS_COOLOFF 20
DEFINE_osquery_flag(bool,
disable_events,
false,
@ -417,7 +419,7 @@ Status EventFactory::run(EventPublisherID& type_id) {
while (!publisher->isEnding() && status.ok()) {
// Can optionally implement a global cooloff latency here.
status = publisher->run();
::usleep(20);
osquery::interruptableSleep(EVENTS_COOLOFF);
}
// The runloop status is not reflective of the event type's.

View File

@ -20,7 +20,8 @@
namespace osquery {
int kINotifyULatency = 200;
int kINotifyMLatency = 200;
static const uint32_t BUFFER_SIZE =
(10 * ((sizeof(struct inotify_event)) + NAME_MAX + 1));
@ -70,7 +71,7 @@ Status INotifyEventPublisher::run() {
FD_ZERO(&set);
FD_SET(getHandle(), &set);
struct timeval timeout = {0, kINotifyULatency};
struct timeval timeout = {0, kINotifyMLatency};
int selector = ::select(getHandle() + 1, &set, nullptr, nullptr, &timeout);
if (selector == -1) {
LOG(ERROR) << "Could not read inotify handle";
@ -111,7 +112,7 @@ Status INotifyEventPublisher::run() {
p += (sizeof(struct inotify_event)) + event->len;
}
::usleep(kINotifyULatency);
osquery::interruptableSleep(kINotifyMLatency);
return Status(0, "Continue");
}

View File

@ -16,7 +16,7 @@
namespace osquery {
int kUdevULatency = 200;
int kUdevMLatency = 200;
REGISTER(UdevEventPublisher, "event_publisher", "udev");
@ -76,7 +76,7 @@ Status UdevEventPublisher::run() {
udev_device_unref(device);
::usleep(kUdevULatency);
osquery::interruptableSleep(kUdevMLatency);
return Status(0, "Continue");
}