mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
use chrono
This commit is contained in:
parent
d12dbed670
commit
bfdbd0344b
@ -36,7 +36,6 @@ set( thriftcpp_SOURCES
|
|||||||
src/thrift/async/TConcurrentClientSyncInfo.cpp
|
src/thrift/async/TConcurrentClientSyncInfo.cpp
|
||||||
src/thrift/concurrency/ThreadManager.cpp
|
src/thrift/concurrency/ThreadManager.cpp
|
||||||
src/thrift/concurrency/TimerManager.cpp
|
src/thrift/concurrency/TimerManager.cpp
|
||||||
src/thrift/concurrency/Util.cpp
|
|
||||||
src/thrift/processor/PeekProcessor.cpp
|
src/thrift/processor/PeekProcessor.cpp
|
||||||
src/thrift/protocol/TBase64Utils.cpp
|
src/thrift/protocol/TBase64Utils.cpp
|
||||||
src/thrift/protocol/TDebugProtocol.cpp
|
src/thrift/protocol/TDebugProtocol.cpp
|
||||||
|
@ -67,7 +67,6 @@ libthrift_la_SOURCES = src/thrift/TApplicationException.cpp \
|
|||||||
src/thrift/async/TConcurrentClientSyncInfo.cpp \
|
src/thrift/async/TConcurrentClientSyncInfo.cpp \
|
||||||
src/thrift/concurrency/ThreadManager.cpp \
|
src/thrift/concurrency/ThreadManager.cpp \
|
||||||
src/thrift/concurrency/TimerManager.cpp \
|
src/thrift/concurrency/TimerManager.cpp \
|
||||||
src/thrift/concurrency/Util.cpp \
|
|
||||||
src/thrift/processor/PeekProcessor.cpp \
|
src/thrift/processor/PeekProcessor.cpp \
|
||||||
src/thrift/protocol/TDebugProtocol.cpp \
|
src/thrift/protocol/TDebugProtocol.cpp \
|
||||||
src/thrift/protocol/TJSONProtocol.cpp \
|
src/thrift/protocol/TJSONProtocol.cpp \
|
||||||
@ -155,8 +154,7 @@ include_concurrency_HEADERS = \
|
|||||||
src/thrift/concurrency/Thread.h \
|
src/thrift/concurrency/Thread.h \
|
||||||
src/thrift/concurrency/ThreadManager.h \
|
src/thrift/concurrency/ThreadManager.h \
|
||||||
src/thrift/concurrency/TimerManager.h \
|
src/thrift/concurrency/TimerManager.h \
|
||||||
src/thrift/concurrency/FunctionRunner.h \
|
src/thrift/concurrency/FunctionRunner.h
|
||||||
src/thrift/concurrency/Util.h
|
|
||||||
|
|
||||||
include_protocoldir = $(include_thriftdir)/protocol
|
include_protocoldir = $(include_thriftdir)/protocol
|
||||||
include_protocol_HEADERS = \
|
include_protocol_HEADERS = \
|
||||||
|
@ -21,7 +21,6 @@
|
|||||||
|
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Exception.h>
|
#include <thrift/concurrency/Exception.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
#include <thrift/transport/PlatformSocket.h>
|
#include <thrift/transport/PlatformSocket.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
@ -61,8 +60,8 @@ public:
|
|||||||
* If the condition occurs, this function returns cleanly; on timeout or
|
* If the condition occurs, this function returns cleanly; on timeout or
|
||||||
* error an exception is thrown.
|
* error an exception is thrown.
|
||||||
*/
|
*/
|
||||||
void wait(int64_t timeout_ms) {
|
void wait(const std::chrono::milliseconds &timeout) {
|
||||||
int result = waitForTimeRelative(timeout_ms);
|
int result = waitForTimeRelative(timeout);
|
||||||
if (result == THRIFT_ETIMEDOUT) {
|
if (result == THRIFT_ETIMEDOUT) {
|
||||||
throw TimedOutException();
|
throw TimedOutException();
|
||||||
} else if (result != 0) {
|
} else if (result != 0) {
|
||||||
@ -72,12 +71,12 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits until the specified timeout in milliseconds for the condition to
|
* Waits until the specified timeout in milliseconds for the condition to
|
||||||
* occur, or waits forever if timeout_ms == 0.
|
* occur, or waits forever if timeout is zero.
|
||||||
*
|
*
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
||||||
*/
|
*/
|
||||||
int waitForTimeRelative(int64_t timeout_ms) {
|
int waitForTimeRelative(const std::chrono::milliseconds &timeout) {
|
||||||
if (timeout_ms == 0LL) {
|
if (timeout.count() == 0) {
|
||||||
return waitForever();
|
return waitForever();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,46 +85,23 @@ public:
|
|||||||
assert(mutexImpl);
|
assert(mutexImpl);
|
||||||
|
|
||||||
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
|
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
|
||||||
bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
|
bool timedout = (conditionVariable_.wait_for(lock, timeout)
|
||||||
== std::cv_status::timeout);
|
== std::cv_status::timeout);
|
||||||
lock.release();
|
lock.release();
|
||||||
return (timedout ? THRIFT_ETIMEDOUT : 0);
|
return (timedout ? THRIFT_ETIMEDOUT : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
|
* Waits until the absolute time specified by abstime.
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
||||||
*/
|
*/
|
||||||
int waitForTime(const THRIFT_TIMESPEC* abstime) {
|
int waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
|
||||||
struct timeval temp;
|
|
||||||
temp.tv_sec = static_cast<long>(abstime->tv_sec);
|
|
||||||
temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
|
|
||||||
return waitForTime(&temp);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits until the absolute time specified using struct timeval.
|
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
|
||||||
*/
|
|
||||||
int waitForTime(const struct timeval* abstime) {
|
|
||||||
assert(mutex_);
|
assert(mutex_);
|
||||||
std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
|
std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
|
||||||
assert(mutexImpl);
|
assert(mutexImpl);
|
||||||
|
|
||||||
struct timeval currenttime;
|
|
||||||
Util::toTimeval(currenttime, Util::currentTime());
|
|
||||||
|
|
||||||
long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
|
|
||||||
long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
|
|
||||||
if (tv_sec < 0)
|
|
||||||
tv_sec = 0;
|
|
||||||
if (tv_usec < 0)
|
|
||||||
tv_usec = 0;
|
|
||||||
|
|
||||||
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
|
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
|
||||||
bool timedout = (conditionVariable_.wait_for(lock,
|
bool timedout = (conditionVariable_.wait_until(lock, abstime)
|
||||||
std::chrono::seconds(tv_sec)
|
|
||||||
+ std::chrono::microseconds(tv_usec))
|
|
||||||
== std::cv_status::timeout);
|
== std::cv_status::timeout);
|
||||||
lock.release();
|
lock.release();
|
||||||
return (timedout ? THRIFT_ETIMEDOUT : 0);
|
return (timedout ? THRIFT_ETIMEDOUT : 0);
|
||||||
@ -181,20 +157,16 @@ void Monitor::unlock() const {
|
|||||||
const_cast<Monitor::Impl*>(impl_)->unlock();
|
const_cast<Monitor::Impl*>(impl_)->unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Monitor::wait(int64_t timeout) const {
|
void Monitor::wait(const std::chrono::milliseconds &timeout) const {
|
||||||
const_cast<Monitor::Impl*>(impl_)->wait(timeout);
|
const_cast<Monitor::Impl*>(impl_)->wait(timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
|
int Monitor::waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) const {
|
||||||
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
|
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
|
||||||
}
|
}
|
||||||
|
|
||||||
int Monitor::waitForTime(const timeval* abstime) const {
|
int Monitor::waitForTimeRelative(const std::chrono::milliseconds &timeout) const {
|
||||||
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
|
return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout);
|
||||||
}
|
|
||||||
|
|
||||||
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
|
|
||||||
return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int Monitor::waitForever() const {
|
int Monitor::waitForever() const {
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
|
#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
|
||||||
#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
|
#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
#include <thrift/concurrency/Exception.h>
|
#include <thrift/concurrency/Exception.h>
|
||||||
#include <thrift/concurrency/Mutex.h>
|
#include <thrift/concurrency/Mutex.h>
|
||||||
|
|
||||||
@ -67,23 +68,19 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits a maximum of the specified timeout in milliseconds for the condition
|
* Waits a maximum of the specified timeout in milliseconds for the condition
|
||||||
* to occur, or waits forever if timeout_ms == 0.
|
* to occur, or waits forever if timeout is zero.
|
||||||
*
|
*
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
||||||
*/
|
*/
|
||||||
int waitForTimeRelative(int64_t timeout_ms) const;
|
int waitForTimeRelative(const std::chrono::milliseconds &timeout) const;
|
||||||
|
|
||||||
|
int waitForTimeRelative(int64_t timeout_ms) const { return waitForTimeRelative(std::chrono::milliseconds(timeout_ms)); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
|
* Waits until the absolute time specified by abstime.
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
||||||
*/
|
*/
|
||||||
int waitForTime(const THRIFT_TIMESPEC* abstime) const;
|
int waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) const;
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits until the absolute time specified using struct timeval.
|
|
||||||
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
|
|
||||||
*/
|
|
||||||
int waitForTime(const struct timeval* abstime) const;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits forever until the condition occurs.
|
* Waits forever until the condition occurs.
|
||||||
@ -93,12 +90,14 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Exception-throwing version of waitForTimeRelative(), called simply
|
* Exception-throwing version of waitForTimeRelative(), called simply
|
||||||
* wait(int64) for historical reasons. Timeout is in milliseconds.
|
* wait(std::chrono::milliseconds) for historical reasons. Timeout is in milliseconds.
|
||||||
*
|
*
|
||||||
* If the condition occurs, this function returns cleanly; on timeout or
|
* If the condition occurs, this function returns cleanly; on timeout or
|
||||||
* error an exception is thrown.
|
* error an exception is thrown.
|
||||||
*/
|
*/
|
||||||
void wait(int64_t timeout_ms = 0LL) const;
|
void wait(const std::chrono::milliseconds &timeout) const;
|
||||||
|
|
||||||
|
void wait(int64_t timeout_ms = 0LL) const { this->wait(std::chrono::milliseconds(timeout_ms)); }
|
||||||
|
|
||||||
/** Wakes up one thread waiting on this monitor. */
|
/** Wakes up one thread waiting on this monitor. */
|
||||||
virtual void notify() const;
|
virtual void notify() const;
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
#include <thrift/concurrency/ThreadManager.h>
|
#include <thrift/concurrency/ThreadManager.h>
|
||||||
#include <thrift/concurrency/Exception.h>
|
#include <thrift/concurrency/Exception.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
@ -35,6 +34,7 @@ namespace thrift {
|
|||||||
namespace concurrency {
|
namespace concurrency {
|
||||||
|
|
||||||
using std::shared_ptr;
|
using std::shared_ptr;
|
||||||
|
using std::unique_ptr;
|
||||||
using std::dynamic_pointer_cast;
|
using std::dynamic_pointer_cast;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -180,10 +180,13 @@ class ThreadManager::Task : public Runnable {
|
|||||||
public:
|
public:
|
||||||
enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
|
enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
|
||||||
|
|
||||||
Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
|
Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
|
||||||
: runnable_(runnable),
|
: runnable_(runnable),
|
||||||
state_(WAITING),
|
state_(WAITING) {
|
||||||
expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
|
if (expiration != 0ULL) {
|
||||||
|
expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
~Task() {}
|
~Task() {}
|
||||||
|
|
||||||
@ -196,13 +199,13 @@ public:
|
|||||||
|
|
||||||
shared_ptr<Runnable> getRunnable() { return runnable_; }
|
shared_ptr<Runnable> getRunnable() { return runnable_; }
|
||||||
|
|
||||||
int64_t getExpireTime() const { return expireTime_; }
|
const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
shared_ptr<Runnable> runnable_;
|
shared_ptr<Runnable> runnable_;
|
||||||
friend class ThreadManager::Worker;
|
friend class ThreadManager::Worker;
|
||||||
STATE state_;
|
STATE state_;
|
||||||
int64_t expireTime_;
|
unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ThreadManager::Worker : public Runnable {
|
class ThreadManager::Worker : public Runnable {
|
||||||
@ -280,7 +283,7 @@ public:
|
|||||||
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
|
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
|
||||||
// then the execution loop needs to be changed below.
|
// then the execution loop needs to be changed below.
|
||||||
task->state_ =
|
task->state_ =
|
||||||
(task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
|
(task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
|
||||||
ThreadManager::Task::TIMEDOUT :
|
ThreadManager::Task::TIMEDOUT :
|
||||||
ThreadManager::Task::EXECUTING;
|
ThreadManager::Task::EXECUTING;
|
||||||
}
|
}
|
||||||
@ -524,15 +527,14 @@ std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
|
|||||||
|
|
||||||
void ThreadManager::Impl::removeExpired(bool justOne) {
|
void ThreadManager::Impl::removeExpired(bool justOne) {
|
||||||
// this is always called under a lock
|
// this is always called under a lock
|
||||||
int64_t now = 0LL;
|
if (tasks_.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto now = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
|
for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
|
||||||
{
|
{
|
||||||
if (now == 0LL) {
|
if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
|
||||||
now = Util::currentTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
|
|
||||||
if (expireCallback_) {
|
if (expireCallback_) {
|
||||||
expireCallback_((*it)->getRunnable());
|
expireCallback_((*it)->getRunnable());
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
#include <thrift/concurrency/TimerManager.h>
|
#include <thrift/concurrency/TimerManager.h>
|
||||||
#include <thrift/concurrency/Exception.h>
|
#include <thrift/concurrency/Exception.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -90,21 +89,22 @@ public:
|
|||||||
{
|
{
|
||||||
Synchronized s(manager_->monitor_);
|
Synchronized s(manager_->monitor_);
|
||||||
task_iterator expiredTaskEnd;
|
task_iterator expiredTaskEnd;
|
||||||
int64_t now = Util::currentTime();
|
auto now = std::chrono::steady_clock::now();
|
||||||
while (manager_->state_ == TimerManager::STARTED
|
while (manager_->state_ == TimerManager::STARTED
|
||||||
&& (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
|
&& (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
|
||||||
== manager_->taskMap_.begin()) {
|
== manager_->taskMap_.begin()) {
|
||||||
int64_t timeout = 0LL;
|
std::chrono::milliseconds timeout(0);
|
||||||
if (!manager_->taskMap_.empty()) {
|
if (!manager_->taskMap_.empty()) {
|
||||||
timeout = manager_->taskMap_.begin()->first - now;
|
timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
|
||||||
|
//because the unit of steady_clock is smaller than millisecond,timeout may be 0.
|
||||||
|
if (timeout.count() == 0) {
|
||||||
|
timeout = std::chrono::milliseconds(1);
|
||||||
|
}
|
||||||
|
manager_->monitor_.waitForTimeRelative(timeout);
|
||||||
|
} else {
|
||||||
|
manager_->monitor_.waitForTimeRelative(0);
|
||||||
}
|
}
|
||||||
assert((timeout != 0 && manager_->taskCount_ > 0)
|
now = std::chrono::steady_clock::now();
|
||||||
|| (timeout == 0 && manager_->taskCount_ == 0));
|
|
||||||
try {
|
|
||||||
manager_->monitor_.wait(timeout);
|
|
||||||
} catch (TimedOutException&) {
|
|
||||||
}
|
|
||||||
now = Util::currentTime();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (manager_->state_ == TimerManager::STARTED) {
|
if (manager_->state_ == TimerManager::STARTED) {
|
||||||
@ -239,64 +239,39 @@ size_t TimerManager::taskCount() const {
|
|||||||
return taskCount_;
|
return taskCount_;
|
||||||
}
|
}
|
||||||
|
|
||||||
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
|
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
|
||||||
int64_t now = Util::currentTime();
|
return add(task, std::chrono::steady_clock::now() + timeout);
|
||||||
timeout += now;
|
|
||||||
|
|
||||||
{
|
|
||||||
Synchronized s(monitor_);
|
|
||||||
if (state_ != TimerManager::STARTED) {
|
|
||||||
throw IllegalStateException();
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
|
|
||||||
// if the expiration time is shorter than the current value. Need to test before we insert,
|
|
||||||
// because the new task might insert at the front.
|
|
||||||
bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
|
|
||||||
|
|
||||||
shared_ptr<Task> timer(new Task(task));
|
|
||||||
taskCount_++;
|
|
||||||
timer->it_ = taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, timer));
|
|
||||||
|
|
||||||
// If the task map was empty, or if we have an expiration that is earlier
|
|
||||||
// than any previously seen, kick the dispatcher so it can update its
|
|
||||||
// timeout
|
|
||||||
if (notifyRequired) {
|
|
||||||
monitor_.notify();
|
|
||||||
}
|
|
||||||
|
|
||||||
return timer;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
|
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
|
||||||
const struct THRIFT_TIMESPEC& value) {
|
const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
|
||||||
|
auto now = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
int64_t expiration;
|
if (abstime < now) {
|
||||||
Util::toMilliseconds(expiration, value);
|
|
||||||
|
|
||||||
int64_t now = Util::currentTime();
|
|
||||||
|
|
||||||
if (expiration < now) {
|
|
||||||
throw InvalidArgumentException();
|
throw InvalidArgumentException();
|
||||||
}
|
}
|
||||||
|
Synchronized s(monitor_);
|
||||||
return add(task, expiration - now);
|
if (state_ != TimerManager::STARTED) {
|
||||||
}
|
throw IllegalStateException();
|
||||||
|
|
||||||
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
|
|
||||||
const struct timeval& value) {
|
|
||||||
|
|
||||||
int64_t expiration;
|
|
||||||
Util::toMilliseconds(expiration, value);
|
|
||||||
|
|
||||||
int64_t now = Util::currentTime();
|
|
||||||
|
|
||||||
if (expiration < now) {
|
|
||||||
throw InvalidArgumentException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return add(task, expiration - now);
|
// If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
|
||||||
|
// if the expiration time is shorter than the current value. Need to test before we insert,
|
||||||
|
// because the new task might insert at the front.
|
||||||
|
bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
|
||||||
|
|
||||||
|
shared_ptr<Task> timer(new Task(task));
|
||||||
|
taskCount_++;
|
||||||
|
timer->it_ = taskMap_.emplace(abstime, timer);
|
||||||
|
|
||||||
|
// If the task map was empty, or if we have an expiration that is earlier
|
||||||
|
// than any previously seen, kick the dispatcher so it can update its
|
||||||
|
// timeout
|
||||||
|
if (notifyRequired) {
|
||||||
|
monitor_.notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
return timer;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TimerManager::remove(shared_ptr<Runnable> task) {
|
void TimerManager::remove(shared_ptr<Runnable> task) {
|
||||||
|
@ -72,25 +72,17 @@ public:
|
|||||||
* @param timeout Time in milliseconds to delay before executing task
|
* @param timeout Time in milliseconds to delay before executing task
|
||||||
* @return Handle of the timer, which can be used to remove the timer.
|
* @return Handle of the timer, which can be used to remove the timer.
|
||||||
*/
|
*/
|
||||||
virtual Timer add(std::shared_ptr<Runnable> task, int64_t timeout);
|
virtual Timer add(std::shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout);
|
||||||
|
Timer add(std::shared_ptr<Runnable> task, uint64_t timeout) { return add(task,std::chrono::milliseconds(timeout)); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a task to be executed at some time in the future by a worker thread.
|
* Adds a task to be executed at some time in the future by a worker thread.
|
||||||
*
|
*
|
||||||
* @param task The task to execute
|
* @param task The task to execute
|
||||||
* @param timeout Absolute time in the future to execute task.
|
* @param abstime Absolute time in the future to execute task.
|
||||||
* @return Handle of the timer, which can be used to remove the timer.
|
* @return Handle of the timer, which can be used to remove the timer.
|
||||||
*/
|
*/
|
||||||
virtual Timer add(std::shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& timeout);
|
virtual Timer add(std::shared_ptr<Runnable> task, const std::chrono::time_point<std::chrono::steady_clock>& abstime);
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a task to be executed at some time in the future by a worker thread.
|
|
||||||
*
|
|
||||||
* @param task The task to execute
|
|
||||||
* @param timeout Absolute time in the future to execute task.
|
|
||||||
* @return Handle of the timer, which can be used to remove the timer.
|
|
||||||
*/
|
|
||||||
virtual Timer add(std::shared_ptr<Runnable> task, const struct timeval& timeout);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes a pending task
|
* Removes a pending task
|
||||||
@ -127,7 +119,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
std::shared_ptr<const ThreadFactory> threadFactory_;
|
std::shared_ptr<const ThreadFactory> threadFactory_;
|
||||||
friend class Task;
|
friend class Task;
|
||||||
std::multimap<int64_t, std::shared_ptr<Task> > taskMap_;
|
std::multimap<std::chrono::time_point<std::chrono::steady_clock>, std::shared_ptr<Task> > taskMap_;
|
||||||
size_t taskCount_;
|
size_t taskCount_;
|
||||||
Monitor monitor_;
|
Monitor monitor_;
|
||||||
STATE state_;
|
STATE state_;
|
||||||
@ -135,7 +127,7 @@ private:
|
|||||||
friend class Dispatcher;
|
friend class Dispatcher;
|
||||||
std::shared_ptr<Dispatcher> dispatcher_;
|
std::shared_ptr<Dispatcher> dispatcher_;
|
||||||
std::shared_ptr<Thread> dispatcherThread_;
|
std::shared_ptr<Thread> dispatcherThread_;
|
||||||
typedef std::multimap<int64_t, std::shared_ptr<TimerManager::Task> >::iterator task_iterator;
|
using task_iterator = decltype(taskMap_)::iterator;
|
||||||
typedef std::pair<task_iterator, task_iterator> task_range;
|
typedef std::pair<task_iterator, task_iterator> task_range;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -1,44 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <thrift/thrift-config.h>
|
|
||||||
|
|
||||||
#include <thrift/Thrift.h>
|
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#if defined(HAVE_SYS_TIME_H)
|
|
||||||
#include <sys/time.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
namespace apache {
|
|
||||||
namespace thrift {
|
|
||||||
namespace concurrency {
|
|
||||||
|
|
||||||
int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
|
|
||||||
int64_t result;
|
|
||||||
struct timeval now;
|
|
||||||
int ret = THRIFT_GETTIMEOFDAY(&now, NULL);
|
|
||||||
assert(ret == 0);
|
|
||||||
THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning
|
|
||||||
toTicks(result, now, ticksPerSec);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} // apache::thrift::concurrency
|
|
@ -1,151 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _THRIFT_CONCURRENCY_UTIL_H_
|
|
||||||
#define _THRIFT_CONCURRENCY_UTIL_H_ 1
|
|
||||||
|
|
||||||
#include <assert.h>
|
|
||||||
#include <stddef.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <time.h>
|
|
||||||
|
|
||||||
#ifdef HAVE_SYS_TIME_H
|
|
||||||
#include <sys/time.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include <thrift/transport/PlatformSocket.h>
|
|
||||||
|
|
||||||
namespace apache {
|
|
||||||
namespace thrift {
|
|
||||||
namespace concurrency {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility methods
|
|
||||||
*
|
|
||||||
* This class contains basic utility methods for converting time formats,
|
|
||||||
* and other common platform-dependent concurrency operations.
|
|
||||||
* It should not be included in API headers for other concurrency library
|
|
||||||
* headers, since it will, by definition, pull in all sorts of horrid
|
|
||||||
* platform dependent stuff. Rather it should be inluded directly in
|
|
||||||
* concurrency library implementation source.
|
|
||||||
*
|
|
||||||
* @version $Id:$
|
|
||||||
*/
|
|
||||||
class Util {
|
|
||||||
|
|
||||||
static const int64_t NS_PER_S = 1000000000LL;
|
|
||||||
static const int64_t US_PER_S = 1000000LL;
|
|
||||||
static const int64_t MS_PER_S = 1000LL;
|
|
||||||
|
|
||||||
static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
|
|
||||||
static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
|
|
||||||
static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
|
|
||||||
|
|
||||||
public:
|
|
||||||
/**
|
|
||||||
* Converts millisecond timestamp into a THRIFT_TIMESPEC struct
|
|
||||||
*
|
|
||||||
* @param struct THRIFT_TIMESPEC& result
|
|
||||||
* @param time or duration in milliseconds
|
|
||||||
*/
|
|
||||||
static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) {
|
|
||||||
result.tv_sec = value / MS_PER_S; // ms to s
|
|
||||||
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
|
|
||||||
}
|
|
||||||
|
|
||||||
static void toTimeval(struct timeval& result, int64_t value) {
|
|
||||||
result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s
|
|
||||||
result.tv_usec = static_cast<uint32_t>((value % MS_PER_S) * US_PER_MS); // ms to us
|
|
||||||
}
|
|
||||||
|
|
||||||
static void toTicks(int64_t& result,
|
|
||||||
int64_t secs,
|
|
||||||
int64_t oldTicks,
|
|
||||||
int64_t oldTicksPerSec,
|
|
||||||
int64_t newTicksPerSec) {
|
|
||||||
result = secs * newTicksPerSec;
|
|
||||||
result += oldTicks * newTicksPerSec / oldTicksPerSec;
|
|
||||||
|
|
||||||
int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
|
|
||||||
if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
|
|
||||||
++result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch
|
|
||||||
*/
|
|
||||||
static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) {
|
|
||||||
return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts struct timeval to arbitrary-sized ticks since epoch
|
|
||||||
*/
|
|
||||||
static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
|
|
||||||
return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts struct THRIFT_TIMESPEC to milliseconds
|
|
||||||
*/
|
|
||||||
static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) {
|
|
||||||
return toTicks(result, value, MS_PER_S);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts struct timeval to milliseconds
|
|
||||||
*/
|
|
||||||
static void toMilliseconds(int64_t& result, const struct timeval& value) {
|
|
||||||
return toTicks(result, value, MS_PER_S);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts struct THRIFT_TIMESPEC to microseconds
|
|
||||||
*/
|
|
||||||
static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) {
|
|
||||||
return toTicks(result, value, US_PER_S);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts struct timeval to microseconds
|
|
||||||
*/
|
|
||||||
static void toUsec(int64_t& result, const struct timeval& value) {
|
|
||||||
return toTicks(result, value, US_PER_S);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current time as a number of arbitrary-size ticks from epoch
|
|
||||||
*/
|
|
||||||
static int64_t currentTimeTicks(int64_t ticksPerSec);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current time as milliseconds from epoch
|
|
||||||
*/
|
|
||||||
static int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current time as micros from epoch
|
|
||||||
*/
|
|
||||||
static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} // apache::thrift::concurrency
|
|
||||||
|
|
||||||
#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
|
|
@ -264,7 +264,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
|
|||||||
// it is probably a non-factor for the time being
|
// it is probably a non-factor for the time being
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
|
bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
|
||||||
bool swap;
|
bool swap;
|
||||||
Guard g(mutex_);
|
Guard g(mutex_);
|
||||||
|
|
||||||
@ -277,7 +277,7 @@ bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
|
|||||||
} else {
|
} else {
|
||||||
if (deadline != NULL) {
|
if (deadline != NULL) {
|
||||||
// if we were handed a deadline time struct, do a timed wait
|
// if we were handed a deadline time struct, do a timed wait
|
||||||
notEmpty_.waitForTime(deadline);
|
notEmpty_.waitForTime(*deadline);
|
||||||
} else {
|
} else {
|
||||||
// just wait until the buffer gets an item
|
// just wait until the buffer gets an item
|
||||||
notEmpty_.wait();
|
notEmpty_.wait();
|
||||||
@ -336,8 +336,7 @@ void TFileTransport::writerThread() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Figure out the next time by which a flush must take place
|
// Figure out the next time by which a flush must take place
|
||||||
struct timeval ts_next_flush;
|
auto ts_next_flush = getNextFlushTime();
|
||||||
getNextFlushTime(&ts_next_flush);
|
|
||||||
uint32_t unflushed = 0;
|
uint32_t unflushed = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
@ -492,15 +491,13 @@ void TFileTransport::writerThread() {
|
|||||||
} else {
|
} else {
|
||||||
struct timeval current_time;
|
struct timeval current_time;
|
||||||
THRIFT_GETTIMEOFDAY(¤t_time, NULL);
|
THRIFT_GETTIMEOFDAY(¤t_time, NULL);
|
||||||
if (current_time.tv_sec > ts_next_flush.tv_sec
|
if (std::chrono::steady_clock::now() > ts_next_flush) {
|
||||||
|| (current_time.tv_sec == ts_next_flush.tv_sec
|
|
||||||
&& current_time.tv_usec > ts_next_flush.tv_usec)) {
|
|
||||||
if (unflushed > 0) {
|
if (unflushed > 0) {
|
||||||
flush = true;
|
flush = true;
|
||||||
} else {
|
} else {
|
||||||
// If there is no new data since the last fsync,
|
// If there is no new data since the last fsync,
|
||||||
// don't perform the fsync, but do reset the timer.
|
// don't perform the fsync, but do reset the timer.
|
||||||
getNextFlushTime(&ts_next_flush);
|
ts_next_flush = getNextFlushTime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -509,7 +506,7 @@ void TFileTransport::writerThread() {
|
|||||||
// sync (force flush) file to disk
|
// sync (force flush) file to disk
|
||||||
THRIFT_FSYNC(fd_);
|
THRIFT_FSYNC(fd_);
|
||||||
unflushed = 0;
|
unflushed = 0;
|
||||||
getNextFlushTime(&ts_next_flush);
|
ts_next_flush = getNextFlushTime();
|
||||||
|
|
||||||
// notify anybody waiting for flush completion
|
// notify anybody waiting for flush completion
|
||||||
if (forced_flush) {
|
if (forced_flush) {
|
||||||
@ -908,15 +905,8 @@ void TFileTransport::openLogFile() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
|
std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
|
||||||
THRIFT_GETTIMEOFDAY(ts_next_flush, NULL);
|
return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
|
||||||
|
|
||||||
ts_next_flush->tv_usec += flushMaxUs_;
|
|
||||||
if (ts_next_flush->tv_usec > 1000000) {
|
|
||||||
long extra_secs = ts_next_flush->tv_usec / 1000000;
|
|
||||||
ts_next_flush->tv_usec %= 1000000;
|
|
||||||
ts_next_flush->tv_sec += extra_secs;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
|
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
|
||||||
|
@ -267,7 +267,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
// helper functions for writing to a file
|
// helper functions for writing to a file
|
||||||
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
|
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
|
||||||
bool swapEventBuffers(struct timeval* deadline);
|
bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
|
||||||
bool initBufferAndWriteThread();
|
bool initBufferAndWriteThread();
|
||||||
|
|
||||||
// control for writer thread
|
// control for writer thread
|
||||||
@ -286,7 +286,7 @@ private:
|
|||||||
|
|
||||||
// Utility functions
|
// Utility functions
|
||||||
void openLogFile();
|
void openLogFile();
|
||||||
void getNextFlushTime(struct timeval* ts_next_flush);
|
std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
|
||||||
|
|
||||||
// Class variables
|
// Class variables
|
||||||
readState readState_;
|
readState readState_;
|
||||||
|
@ -94,18 +94,18 @@ int main(int argc, char** argv) {
|
|||||||
|
|
||||||
std::cout << "\t\tUtil minimum time" << std::endl;
|
std::cout << "\t\tUtil minimum time" << std::endl;
|
||||||
|
|
||||||
int64_t time00 = Util::currentTime();
|
int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
int64_t time01 = Util::currentTime();
|
int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
|
std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
|
||||||
|
|
||||||
time00 = Util::currentTime();
|
time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
time01 = time00;
|
time01 = time00;
|
||||||
size_t count = 0;
|
size_t count = 0;
|
||||||
|
|
||||||
while (time01 < time00 + 10) {
|
while (time01 < time00 + 10) {
|
||||||
count++;
|
count++;
|
||||||
time01 = Util::currentTime();
|
time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
|
std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
#include <thrift/concurrency/ThreadFactory.h>
|
#include <thrift/concurrency/ThreadFactory.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Mutex.h>
|
#include <thrift/concurrency/Mutex.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -221,7 +220,7 @@ public:
|
|||||||
|
|
||||||
Monitor monitor;
|
Monitor monitor;
|
||||||
|
|
||||||
int64_t startTime = Util::currentTime();
|
int64_t startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
for (int64_t ix = 0; ix < count; ix++) {
|
for (int64_t ix = 0; ix < count; ix++) {
|
||||||
{
|
{
|
||||||
@ -233,7 +232,7 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endTime = Util::currentTime();
|
int64_t endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
bool success = (endTime - startTime) >= (count * timeout);
|
bool success = (endTime - startTime) >= (count * timeout);
|
||||||
|
|
||||||
|
@ -21,7 +21,6 @@
|
|||||||
#include <thrift/concurrency/ThreadManager.h>
|
#include <thrift/concurrency/ThreadManager.h>
|
||||||
#include <thrift/concurrency/ThreadFactory.h>
|
#include <thrift/concurrency/ThreadFactory.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
@ -66,11 +65,11 @@ public:
|
|||||||
|
|
||||||
void run() {
|
void run() {
|
||||||
|
|
||||||
_startTime = Util::currentTime();
|
_startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
sleep_(_timeout);
|
sleep_(_timeout);
|
||||||
|
|
||||||
_endTime = Util::currentTime();
|
_endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
_done = true;
|
_done = true;
|
||||||
|
|
||||||
@ -123,7 +122,7 @@ public:
|
|||||||
new ThreadManagerTests::Task(monitor, activeCount, timeout)));
|
new ThreadManagerTests::Task(monitor, activeCount, timeout)));
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t time00 = Util::currentTime();
|
int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
|
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
|
||||||
ix != tasks.end();
|
ix != tasks.end();
|
||||||
@ -143,7 +142,7 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t time01 = Util::currentTime();
|
int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
int64_t firstTime = 9223372036854775807LL;
|
int64_t firstTime = 9223372036854775807LL;
|
||||||
int64_t lastTime = 0;
|
int64_t lastTime = 0;
|
||||||
@ -387,9 +386,9 @@ public:
|
|||||||
bool apiTest() {
|
bool apiTest() {
|
||||||
|
|
||||||
// prove currentTime has milliseconds granularity since many other things depend on it
|
// prove currentTime has milliseconds granularity since many other things depend on it
|
||||||
int64_t a = Util::currentTime();
|
int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
sleep_(100);
|
sleep_(100);
|
||||||
int64_t b = Util::currentTime();
|
int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
if (b - a < 50 || b - a > 150) {
|
if (b - a < 50 || b - a > 150) {
|
||||||
std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
|
std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
|
||||||
return false;
|
return false;
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
#include <thrift/concurrency/TimerManager.h>
|
#include <thrift/concurrency/TimerManager.h>
|
||||||
#include <thrift/concurrency/ThreadFactory.h>
|
#include <thrift/concurrency/ThreadFactory.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -39,7 +38,7 @@ public:
|
|||||||
public:
|
public:
|
||||||
Task(Monitor& monitor, int64_t timeout)
|
Task(Monitor& monitor, int64_t timeout)
|
||||||
: _timeout(timeout),
|
: _timeout(timeout),
|
||||||
_startTime(Util::currentTime()),
|
_startTime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()),
|
||||||
_endTime(0),
|
_endTime(0),
|
||||||
_monitor(monitor),
|
_monitor(monitor),
|
||||||
_success(false),
|
_success(false),
|
||||||
@ -49,7 +48,7 @@ public:
|
|||||||
|
|
||||||
void run() {
|
void run() {
|
||||||
|
|
||||||
_endTime = Util::currentTime();
|
_endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
_success = (_endTime - _startTime) >= _timeout;
|
_success = (_endTime - _startTime) >= _timeout;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
#include <thrift/concurrency/ThreadManager.h>
|
#include <thrift/concurrency/ThreadManager.h>
|
||||||
#include <thrift/concurrency/ThreadFactory.h>
|
#include <thrift/concurrency/ThreadFactory.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
#include <thrift/concurrency/Mutex.h>
|
#include <thrift/concurrency/Mutex.h>
|
||||||
#include <thrift/protocol/TBinaryProtocol.h>
|
#include <thrift/protocol/TBinaryProtocol.h>
|
||||||
#include <thrift/server/TSimpleServer.h>
|
#include <thrift/server/TSimpleServer.h>
|
||||||
@ -133,7 +132,7 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_startTime = Util::currentTime();
|
_startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
if(_behavior == OpenAndCloseTransportInThread) {
|
if(_behavior == OpenAndCloseTransportInThread) {
|
||||||
_transport->open();
|
_transport->open();
|
||||||
}
|
}
|
||||||
@ -159,7 +158,7 @@ public:
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
_endTime = Util::currentTime();
|
_endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
if(_behavior == OpenAndCloseTransportInThread) {
|
if(_behavior == OpenAndCloseTransportInThread) {
|
||||||
_transport->close();
|
_transport->close();
|
||||||
@ -540,7 +539,7 @@ int main(int argc, char** argv) {
|
|||||||
|
|
||||||
cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl;
|
cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl;
|
||||||
|
|
||||||
time00 = Util::currentTime();
|
time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
monitor.notifyAll();
|
monitor.notifyAll();
|
||||||
|
|
||||||
@ -548,7 +547,7 @@ int main(int argc, char** argv) {
|
|||||||
monitor.wait();
|
monitor.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
time01 = Util::currentTime();
|
time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t firstTime = 9223372036854775807LL;
|
int64_t firstTime = 9223372036854775807LL;
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
#include <thrift/concurrency/ThreadManager.h>
|
#include <thrift/concurrency/ThreadManager.h>
|
||||||
#include <thrift/concurrency/ThreadFactory.h>
|
#include <thrift/concurrency/ThreadFactory.h>
|
||||||
#include <thrift/concurrency/Monitor.h>
|
#include <thrift/concurrency/Monitor.h>
|
||||||
#include <thrift/concurrency/Util.h>
|
|
||||||
#include <thrift/concurrency/Mutex.h>
|
#include <thrift/concurrency/Mutex.h>
|
||||||
#include <thrift/protocol/TBinaryProtocol.h>
|
#include <thrift/protocol/TBinaryProtocol.h>
|
||||||
#include <thrift/server/TSimpleServer.h>
|
#include <thrift/server/TSimpleServer.h>
|
||||||
@ -132,7 +131,7 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_startTime = Util::currentTime();
|
_startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
_transport->open();
|
_transport->open();
|
||||||
|
|
||||||
@ -157,7 +156,7 @@ public:
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
_endTime = Util::currentTime();
|
_endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
_transport->close();
|
_transport->close();
|
||||||
|
|
||||||
@ -478,7 +477,7 @@ int main(int argc, char** argv) {
|
|||||||
|
|
||||||
cerr << "Launch " << clientCount << " client threads" << endl;
|
cerr << "Launch " << clientCount << " client threads" << endl;
|
||||||
|
|
||||||
time00 = Util::currentTime();
|
time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
|
|
||||||
monitor.notifyAll();
|
monitor.notifyAll();
|
||||||
|
|
||||||
@ -486,7 +485,7 @@ int main(int argc, char** argv) {
|
|||||||
monitor.wait();
|
monitor.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
time01 = Util::currentTime();
|
time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t firstTime = 9223372036854775807LL;
|
int64_t firstTime = 9223372036854775807LL;
|
||||||
|
Loading…
Reference in New Issue
Block a user