InterruptableRunnable RunnerInterruptPoint redesign (#4545)

* InterruptableRunnable RunnerInterruptPoint redesign

There were several inefficiencies in the old version of RunnerInterruptPoint and InterruptableRunnable.

1) RunnerInterruptPoint was throwing the exception when interrupted, however, the exception was always ignored.

2) InterruptableRunnable used the read-write lock, however only write lock was used.

3) InterruptableRunnable InterruptableRunnable, stored almost similar variable stop_, interrupted_.

4) std::atomic<bool> interrupted_ was used with locks, even though it was accessed by default safest access mode memory_order_seq_cst. So no additional cache invalidation was needed.

5) InterruptableRunnable contained code(in method interrupted() and variables bypass_check_, checked) just for testing. Which was slowing down method interrupted().

6) Some more confusing things. notify_all was not needed, as only one thread could be waiting for the conditional variable. RunnerInterruptPoint:: pause(void) looks ambiguous and that's why was not used anywhere.

I resolved all these problems by merging InterruptableRunnable and RunnerInterruptPoint into the InterruptableRunnable.

1) No use of the exception.
2) 4) Simple mutex, which is only used for pauseMilli. InterruptableRunnable::interrupted and InterruptableRunnable::interrupt function lock-free.
3) Single variable interrupted_.
5) Made InterruptableRunnable::interrupt virtual. Tests override interrupt to make things testable.
6) change to notify_one and removed pause without the specific time.
This commit is contained in:
Giorgi Guliashvili 2018-06-15 16:15:43 +01:00 committed by GitHub
parent 9259da6cc9
commit ff1747347e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 100 deletions

View File

@ -27,34 +27,6 @@ namespace osquery {
class Status;
class Dispatcher;
/// A throw/catch relay between a pause request and cancel event.
struct RunnerInterruptError {};
class RunnerInterruptPoint : private boost::noncopyable {
public:
RunnerInterruptPoint() = default;
/// Cancel the pause request.
void cancel();
/// Pause until the requested millisecond delay has elapsed or a cancel.
void pause(size_t milli) {
pause(std::chrono::milliseconds(milli));
}
/// Pause until the requested millisecond delay has elapsed or a cancel.
void pause(std::chrono::milliseconds milli);
private:
/// Communicate between the pause and cancel event.
bool stop_{false};
/// Protection around pause and cancel calls.
std::mutex mutex_;
/// Wait for notification or a pause expiration.
std::condition_variable condition_;
};
class InterruptableRunnable {
public:
@ -67,52 +39,30 @@ class InterruptableRunnable {
protected:
/// Allow the runnable to check interruption.
bool interrupted();
virtual bool interrupted();
/// Require the runnable thread to define a stop/interrupt point.
virtual void stop() = 0;
/// Put the runnable into an interruptible sleep.
virtual void pause() {
pauseMilli(std::chrono::milliseconds(100));
}
/// Put the runnable into an interruptible sleep.
virtual void pauseMilli(size_t milli) {
inline void pauseMilli(size_t milli) {
pauseMilli(std::chrono::milliseconds(milli));
}
/// Put the runnable into an interruptible sleep.
virtual void pauseMilli(std::chrono::milliseconds milli);
private:
/// Testing only, the interruptible will bypass initial interruption check.
void mustRun() {
bypass_check_ = true;
}
void pauseMilli(std::chrono::milliseconds milli);
private:
/**
* @brief Protect interruption checking and resource tear down.
*
* A tearDown mutex protects the runnable service's resources.
* Interruption means resources have been stopped.
* Non-interruption means no attempt to affect resources has been started.
* @brief Used to wait for the interruption notification while sleeping
*/
Mutex stopping_;
std::mutex condition_lock;
/// If a service includes a run loop it should check for interrupted.
std::atomic<bool> interrupted_{false};
/// Use an interruption point to exit a pause if the thread was interrupted.
RunnerInterruptPoint point_;
private:
/// Testing only, track the interruptible check for interruption.
bool checked_{false};
/// Testing only, require that the interruptible bypass the first check.
std::atomic<bool> bypass_check_{false};
/// Wait for notification or a pause expiration.
std::condition_variable condition_;
private:
FRIEND_TEST(DispatcherTests, test_run);

View File

@ -20,48 +20,27 @@ namespace osquery {
/// The worker_threads define the default thread pool size.
FLAG(int32, worker_threads, 4, "Number of work dispatch threads");
/// Cancel the pause request.
void RunnerInterruptPoint::cancel() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
condition_.notify_all();
}
/// Pause until the requested millisecond delay has elapsed or a cancel.
void RunnerInterruptPoint::pause(std::chrono::milliseconds milli) {
std::unique_lock<std::mutex> lock(mutex_);
if (stop_ || condition_.wait_for(lock, milli) == std::cv_status::no_timeout) {
stop_ = false;
throw RunnerInterruptError();
}
}
void InterruptableRunnable::interrupt() {
WriteLock lock(stopping_);
// Set the service as interrupted.
interrupted_ = true;
// Tear down the service's resources such that exiting the expected run
// loop within ::start does not need to.
stop();
// Cancel the run loop's pause request.
point_.cancel();
if (!interrupted_.exchange(true)) {
// Tear down the service's resources such that exiting the expected run
// loop within ::start does not need to.
stop();
std::lock_guard<std::mutex> lock(condition_lock);
// Cancel the run loop's pause request.
condition_.notify_one();
}
}
bool InterruptableRunnable::interrupted() {
WriteLock lock(stopping_);
// A small conditional to force-skip an interruption check, used in testing.
if (bypass_check_ && !checked_) {
checked_ = true;
return false;
}
return interrupted_;
}
void InterruptableRunnable::pauseMilli(std::chrono::milliseconds milli) {
try {
point_.pause(milli);
} catch (const RunnerInterruptError&) {
// The pause request was canceled.
std::unique_lock<std::mutex> lock(condition_lock);
if (!interrupted_) {
condition_.wait_for(lock, milli);
}
}

View File

@ -27,9 +27,28 @@ TEST_F(DispatcherTests, test_singleton) {
EXPECT_EQ(&one, &two);
}
class TestRunnable : public InternalRunnable {
class InternalTestableRunnable : public InternalRunnable {
public:
explicit TestRunnable() : InternalRunnable("TestRunnable") {}
InternalTestableRunnable(const std::string& name) : InternalRunnable(name) {}
bool interrupted() override {
// A small conditional to force-skip an interruption check, used in testing.
if (!checked_) {
checked_ = true;
return false;
} else {
return InternalRunnable::interrupted();
}
}
private:
/// Testing only, track the interruptible check for interruption.
bool checked_{false};
};
class TestRunnable : public InternalTestableRunnable {
public:
explicit TestRunnable() : InternalTestableRunnable("TestRunnable") {}
virtual void start() override {
++i;
@ -66,7 +85,6 @@ TEST_F(DispatcherTests, test_service_count) {
TEST_F(DispatcherTests, test_run) {
auto runnable = std::make_shared<TestRunnable>();
runnable->mustRun();
runnable->reset();
// The service exits after incrementing.
@ -87,8 +105,6 @@ TEST_F(DispatcherTests, test_independent_run) {
// Nothing stops two instances of the same service from running.
auto r1 = std::make_shared<TestRunnable>();
auto r2 = std::make_shared<TestRunnable>();
r1->mustRun();
r2->mustRun();
r1->reset();
Dispatcher::addService(r1);
@ -98,9 +114,10 @@ TEST_F(DispatcherTests, test_independent_run) {
EXPECT_EQ(2U, r1->count());
}
class BlockingTestRunnable : public InternalRunnable {
class BlockingTestRunnable : public InternalTestableRunnable {
public:
explicit BlockingTestRunnable() : InternalRunnable("BlockingTestRunnable") {}
explicit BlockingTestRunnable()
: InternalTestableRunnable("BlockingTestRunnable") {}
virtual void start() override {
// Wow that's a long sleep!
@ -110,7 +127,6 @@ class BlockingTestRunnable : public InternalRunnable {
TEST_F(DispatcherTests, test_interruption) {
auto r1 = std::make_shared<BlockingTestRunnable>();
r1->mustRun();
Dispatcher::addService(r1);
// This service would normally wait for 100 seconds.

View File

@ -63,7 +63,7 @@ void WindowsEventLogEventPublisher::configure() {
}
Status WindowsEventLogEventPublisher::run() {
pause();
pauseMilli(100);
return Status(0, "OK");
}

View File

@ -74,6 +74,16 @@ class MockBufferedLogForwarder : public BufferedLogForwarder {
: BufferedLogForwarder(
"MockBufferedLogForwarder", name, log_period, max_log_lines) {}
bool interrupted() {
// A small conditional to force-skip an interruption check, used in testing.
if (!checked_) {
checked_ = true;
return false;
} else {
return BufferedLogForwarder::interrupted();
}
}
MOCK_METHOD2(send,
Status(std::vector<std::string>& log_data,
const std::string& log_type));
@ -85,6 +95,9 @@ class MockBufferedLogForwarder : public BufferedLogForwarder {
FRIEND_TEST(BufferedLogForwarderTests, test_split);
FRIEND_TEST(BufferedLogForwarderTests, test_purge);
FRIEND_TEST(BufferedLogForwarderTests, test_purge_max);
private:
bool checked_{false};
};
TEST_F(BufferedLogForwarderTests, test_index) {
@ -223,7 +236,6 @@ TEST_F(BufferedLogForwarderTests, test_multiple) {
TEST_F(BufferedLogForwarderTests, test_async) {
auto runner = std::make_shared<StrictMock<MockBufferedLogForwarder>>(
"mock", kLogPeriod);
runner->mustRun();
EXPECT_CALL(*runner, send(ElementsAre("foo"), "result"))
.WillOnce(Return(Status(0)));

View File

@ -144,4 +144,4 @@ Status WindowsEventLoggerPlugin::emitLogRecord(
return Status();
}
}
} // namespace osquery