From bfdbd0344bd3ddf348985b51e02212e8092859d4 Mon Sep 17 00:00:00 2001 From: cyy Date: Sat, 12 Jan 2019 14:38:28 +0800 Subject: [PATCH] use chrono --- lib/cpp/CMakeLists.txt | 1 - lib/cpp/Makefile.am | 4 +- lib/cpp/src/thrift/concurrency/Monitor.cpp | 54 ++----- lib/cpp/src/thrift/concurrency/Monitor.h | 25 ++- .../src/thrift/concurrency/ThreadManager.cpp | 28 ++-- .../src/thrift/concurrency/TimerManager.cpp | 97 +++++------ lib/cpp/src/thrift/concurrency/TimerManager.h | 20 +-- lib/cpp/src/thrift/concurrency/Util.cpp | 44 ----- lib/cpp/src/thrift/concurrency/Util.h | 151 ------------------ .../src/thrift/transport/TFileTransport.cpp | 26 +-- lib/cpp/src/thrift/transport/TFileTransport.h | 4 +- lib/cpp/test/concurrency/Tests.cpp | 8 +- lib/cpp/test/concurrency/ThreadFactoryTests.h | 5 +- lib/cpp/test/concurrency/ThreadManagerTests.h | 13 +- lib/cpp/test/concurrency/TimerManagerTests.h | 5 +- test/cpp/src/StressTest.cpp | 9 +- test/cpp/src/StressTestNonBlocking.cpp | 9 +- 17 files changed, 115 insertions(+), 388 deletions(-) delete mode 100644 lib/cpp/src/thrift/concurrency/Util.cpp delete mode 100644 lib/cpp/src/thrift/concurrency/Util.h diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index 90a6c2885..f4e810461 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -36,7 +36,6 @@ set( thriftcpp_SOURCES src/thrift/async/TConcurrentClientSyncInfo.cpp src/thrift/concurrency/ThreadManager.cpp src/thrift/concurrency/TimerManager.cpp - src/thrift/concurrency/Util.cpp src/thrift/processor/PeekProcessor.cpp src/thrift/protocol/TBase64Utils.cpp src/thrift/protocol/TDebugProtocol.cpp diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index de0c058c1..db9d751e4 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -67,7 +67,6 @@ libthrift_la_SOURCES = src/thrift/TApplicationException.cpp \ src/thrift/async/TConcurrentClientSyncInfo.cpp \ src/thrift/concurrency/ThreadManager.cpp \ src/thrift/concurrency/TimerManager.cpp \ - src/thrift/concurrency/Util.cpp \ src/thrift/processor/PeekProcessor.cpp \ src/thrift/protocol/TDebugProtocol.cpp \ src/thrift/protocol/TJSONProtocol.cpp \ @@ -155,8 +154,7 @@ include_concurrency_HEADERS = \ src/thrift/concurrency/Thread.h \ src/thrift/concurrency/ThreadManager.h \ src/thrift/concurrency/TimerManager.h \ - src/thrift/concurrency/FunctionRunner.h \ - src/thrift/concurrency/Util.h + src/thrift/concurrency/FunctionRunner.h include_protocoldir = $(include_thriftdir)/protocol include_protocol_HEADERS = \ diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp index 7b3b209a7..99d52b3e3 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.cpp +++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include @@ -61,8 +60,8 @@ public: * If the condition occurs, this function returns cleanly; on timeout or * error an exception is thrown. */ - void wait(int64_t timeout_ms) { - int result = waitForTimeRelative(timeout_ms); + void wait(const std::chrono::milliseconds &timeout) { + int result = waitForTimeRelative(timeout); if (result == THRIFT_ETIMEDOUT) { throw TimedOutException(); } else if (result != 0) { @@ -72,12 +71,12 @@ public: /** * Waits until the specified timeout in milliseconds for the condition to - * occur, or waits forever if timeout_ms == 0. + * occur, or waits forever if timeout is zero. * * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTimeRelative(int64_t timeout_ms) { - if (timeout_ms == 0LL) { + int waitForTimeRelative(const std::chrono::milliseconds &timeout) { + if (timeout.count() == 0) { return waitForever(); } @@ -86,46 +85,23 @@ public: assert(mutexImpl); std::unique_lock lock(*mutexImpl, std::adopt_lock); - bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) + bool timedout = (conditionVariable_.wait_for(lock, timeout) == std::cv_status::timeout); lock.release(); return (timedout ? THRIFT_ETIMEDOUT : 0); } /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Waits until the absolute time specified by abstime. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const THRIFT_TIMESPEC* abstime) { - struct timeval temp; - temp.tv_sec = static_cast(abstime->tv_sec); - temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; - return waitForTime(&temp); - } - - /** - * Waits until the absolute time specified using struct timeval. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const struct timeval* abstime) { + int waitForTime(const std::chrono::time_point& abstime) { assert(mutex_); std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); - struct timeval currenttime; - Util::toTimeval(currenttime, Util::currentTime()); - - long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); - long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); - if (tv_sec < 0) - tv_sec = 0; - if (tv_usec < 0) - tv_usec = 0; - std::unique_lock lock(*mutexImpl, std::adopt_lock); - bool timedout = (conditionVariable_.wait_for(lock, - std::chrono::seconds(tv_sec) - + std::chrono::microseconds(tv_usec)) + bool timedout = (conditionVariable_.wait_until(lock, abstime) == std::cv_status::timeout); lock.release(); return (timedout ? THRIFT_ETIMEDOUT : 0); @@ -181,20 +157,16 @@ void Monitor::unlock() const { const_cast(impl_)->unlock(); } -void Monitor::wait(int64_t timeout) const { +void Monitor::wait(const std::chrono::milliseconds &timeout) const { const_cast(impl_)->wait(timeout); } -int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { +int Monitor::waitForTime(const std::chrono::time_point& abstime) const { return const_cast(impl_)->waitForTime(abstime); } -int Monitor::waitForTime(const timeval* abstime) const { - return const_cast(impl_)->waitForTime(abstime); -} - -int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return const_cast(impl_)->waitForTimeRelative(timeout_ms); +int Monitor::waitForTimeRelative(const std::chrono::milliseconds &timeout) const { + return const_cast(impl_)->waitForTimeRelative(timeout); } int Monitor::waitForever() const { diff --git a/lib/cpp/src/thrift/concurrency/Monitor.h b/lib/cpp/src/thrift/concurrency/Monitor.h index 11a145d55..c70fc8616 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.h +++ b/lib/cpp/src/thrift/concurrency/Monitor.h @@ -20,6 +20,7 @@ #ifndef _THRIFT_CONCURRENCY_MONITOR_H_ #define _THRIFT_CONCURRENCY_MONITOR_H_ 1 +#include #include #include @@ -67,23 +68,19 @@ public: /** * Waits a maximum of the specified timeout in milliseconds for the condition - * to occur, or waits forever if timeout_ms == 0. + * to occur, or waits forever if timeout is zero. * * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTimeRelative(int64_t timeout_ms) const; + int waitForTimeRelative(const std::chrono::milliseconds &timeout) const; + + int waitForTimeRelative(int64_t timeout_ms) const { return waitForTimeRelative(std::chrono::milliseconds(timeout_ms)); } /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Waits until the absolute time specified by abstime. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const THRIFT_TIMESPEC* abstime) const; - - /** - * Waits until the absolute time specified using struct timeval. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const struct timeval* abstime) const; + int waitForTime(const std::chrono::time_point& abstime) const; /** * Waits forever until the condition occurs. @@ -93,12 +90,14 @@ public: /** * Exception-throwing version of waitForTimeRelative(), called simply - * wait(int64) for historical reasons. Timeout is in milliseconds. + * wait(std::chrono::milliseconds) for historical reasons. Timeout is in milliseconds. * - * If the condition occurs, this function returns cleanly; on timeout or + * If the condition occurs, this function returns cleanly; on timeout or * error an exception is thrown. */ - void wait(int64_t timeout_ms = 0LL) const; + void wait(const std::chrono::milliseconds &timeout) const; + + void wait(int64_t timeout_ms = 0LL) const { this->wait(std::chrono::milliseconds(timeout_ms)); } /** Wakes up one thread waiting on this monitor. */ virtual void notify() const; diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp index 5e883270b..4c7c372af 100644 --- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include @@ -35,6 +34,7 @@ namespace thrift { namespace concurrency { using std::shared_ptr; +using std::unique_ptr; using std::dynamic_pointer_cast; /** @@ -180,10 +180,13 @@ class ThreadManager::Task : public Runnable { public: enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE }; - Task(shared_ptr runnable, int64_t expiration = 0LL) + Task(shared_ptr runnable, uint64_t expiration = 0ULL) : runnable_(runnable), - state_(WAITING), - expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {} + state_(WAITING) { + if (expiration != 0ULL) { + expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration))); + } + } ~Task() {} @@ -196,13 +199,13 @@ public: shared_ptr getRunnable() { return runnable_; } - int64_t getExpireTime() const { return expireTime_; } + const unique_ptr & getExpireTime() const { return expireTime_; } private: shared_ptr runnable_; friend class ThreadManager::Worker; STATE state_; - int64_t expireTime_; + unique_ptr expireTime_; }; class ThreadManager::Worker : public Runnable { @@ -280,7 +283,7 @@ public: // If the state is changed to anything other than EXECUTING or TIMEDOUT here // then the execution loop needs to be changed below. task->state_ = - (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ? + (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ? ThreadManager::Task::TIMEDOUT : ThreadManager::Task::EXECUTING; } @@ -524,15 +527,14 @@ std::shared_ptr ThreadManager::Impl::removeNextPending() { void ThreadManager::Impl::removeExpired(bool justOne) { // this is always called under a lock - int64_t now = 0LL; + if (tasks_.empty()) { + return; + } + auto now = std::chrono::steady_clock::now(); for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ) { - if (now == 0LL) { - now = Util::currentTime(); - } - - if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) { + if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) { if (expireCallback_) { expireCallback_((*it)->getRunnable()); } diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp index 61a34ff69..edd336be0 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp +++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp @@ -19,7 +19,6 @@ #include #include -#include #include #include @@ -90,21 +89,22 @@ public: { Synchronized s(manager_->monitor_); task_iterator expiredTaskEnd; - int64_t now = Util::currentTime(); + auto now = std::chrono::steady_clock::now(); while (manager_->state_ == TimerManager::STARTED && (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) { - int64_t timeout = 0LL; + std::chrono::milliseconds timeout(0); if (!manager_->taskMap_.empty()) { - timeout = manager_->taskMap_.begin()->first - now; + timeout = std::chrono::duration_cast(manager_->taskMap_.begin()->first - now); + //because the unit of steady_clock is smaller than millisecond,timeout may be 0. + if (timeout.count() == 0) { + timeout = std::chrono::milliseconds(1); + } + manager_->monitor_.waitForTimeRelative(timeout); + } else { + manager_->monitor_.waitForTimeRelative(0); } - assert((timeout != 0 && manager_->taskCount_ > 0) - || (timeout == 0 && manager_->taskCount_ == 0)); - try { - manager_->monitor_.wait(timeout); - } catch (TimedOutException&) { - } - now = Util::currentTime(); + now = std::chrono::steady_clock::now(); } if (manager_->state_ == TimerManager::STARTED) { @@ -239,64 +239,39 @@ size_t TimerManager::taskCount() const { return taskCount_; } -TimerManager::Timer TimerManager::add(shared_ptr task, int64_t timeout) { - int64_t now = Util::currentTime(); - timeout += now; - - { - Synchronized s(monitor_); - if (state_ != TimerManager::STARTED) { - throw IllegalStateException(); - } - - // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him - // if the expiration time is shorter than the current value. Need to test before we insert, - // because the new task might insert at the front. - bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first; - - shared_ptr timer(new Task(task)); - taskCount_++; - timer->it_ = taskMap_.insert(std::pair >(timeout, timer)); - - // If the task map was empty, or if we have an expiration that is earlier - // than any previously seen, kick the dispatcher so it can update its - // timeout - if (notifyRequired) { - monitor_.notify(); - } - - return timer; - } +TimerManager::Timer TimerManager::add(shared_ptr task, const std::chrono::milliseconds &timeout) { + return add(task, std::chrono::steady_clock::now() + timeout); } TimerManager::Timer TimerManager::add(shared_ptr task, - const struct THRIFT_TIMESPEC& value) { + const std::chrono::time_point& abstime) { + auto now = std::chrono::steady_clock::now(); - int64_t expiration; - Util::toMilliseconds(expiration, value); - - int64_t now = Util::currentTime(); - - if (expiration < now) { + if (abstime < now) { throw InvalidArgumentException(); } - - return add(task, expiration - now); -} - -TimerManager::Timer TimerManager::add(shared_ptr task, - const struct timeval& value) { - - int64_t expiration; - Util::toMilliseconds(expiration, value); - - int64_t now = Util::currentTime(); - - if (expiration < now) { - throw InvalidArgumentException(); + Synchronized s(monitor_); + if (state_ != TimerManager::STARTED) { + throw IllegalStateException(); } - return add(task, expiration - now); + // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him + // if the expiration time is shorter than the current value. Need to test before we insert, + // because the new task might insert at the front. + bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first; + + shared_ptr timer(new Task(task)); + taskCount_++; + timer->it_ = taskMap_.emplace(abstime, timer); + + // If the task map was empty, or if we have an expiration that is earlier + // than any previously seen, kick the dispatcher so it can update its + // timeout + if (notifyRequired) { + monitor_.notify(); + } + + return timer; } void TimerManager::remove(shared_ptr task) { diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h index 4e291e3cb..44d4738d5 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.h +++ b/lib/cpp/src/thrift/concurrency/TimerManager.h @@ -72,25 +72,17 @@ public: * @param timeout Time in milliseconds to delay before executing task * @return Handle of the timer, which can be used to remove the timer. */ - virtual Timer add(std::shared_ptr task, int64_t timeout); + virtual Timer add(std::shared_ptr task, const std::chrono::milliseconds &timeout); + Timer add(std::shared_ptr task, uint64_t timeout) { return add(task,std::chrono::milliseconds(timeout)); } /** * Adds a task to be executed at some time in the future by a worker thread. * * @param task The task to execute - * @param timeout Absolute time in the future to execute task. + * @param abstime Absolute time in the future to execute task. * @return Handle of the timer, which can be used to remove the timer. */ - virtual Timer add(std::shared_ptr task, const struct THRIFT_TIMESPEC& timeout); - - /** - * Adds a task to be executed at some time in the future by a worker thread. - * - * @param task The task to execute - * @param timeout Absolute time in the future to execute task. - * @return Handle of the timer, which can be used to remove the timer. - */ - virtual Timer add(std::shared_ptr task, const struct timeval& timeout); + virtual Timer add(std::shared_ptr task, const std::chrono::time_point& abstime); /** * Removes a pending task @@ -127,7 +119,7 @@ public: private: std::shared_ptr threadFactory_; friend class Task; - std::multimap > taskMap_; + std::multimap, std::shared_ptr > taskMap_; size_t taskCount_; Monitor monitor_; STATE state_; @@ -135,7 +127,7 @@ private: friend class Dispatcher; std::shared_ptr dispatcher_; std::shared_ptr dispatcherThread_; - typedef std::multimap >::iterator task_iterator; + using task_iterator = decltype(taskMap_)::iterator; typedef std::pair task_range; }; } diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp deleted file mode 100644 index dd6d19f97..000000000 --- a/lib/cpp/src/thrift/concurrency/Util.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#include -#include - -#if defined(HAVE_SYS_TIME_H) -#include -#endif - -namespace apache { -namespace thrift { -namespace concurrency { - -int64_t Util::currentTimeTicks(int64_t ticksPerSec) { - int64_t result; - struct timeval now; - int ret = THRIFT_GETTIMEOFDAY(&now, NULL); - assert(ret == 0); - THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning - toTicks(result, now, ticksPerSec); - return result; -} -} -} -} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h deleted file mode 100644 index 1a915993f..000000000 --- a/lib/cpp/src/thrift/concurrency/Util.h +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_UTIL_H_ -#define _THRIFT_CONCURRENCY_UTIL_H_ 1 - -#include -#include -#include -#include - -#ifdef HAVE_SYS_TIME_H -#include -#endif - -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Utility methods - * - * This class contains basic utility methods for converting time formats, - * and other common platform-dependent concurrency operations. - * It should not be included in API headers for other concurrency library - * headers, since it will, by definition, pull in all sorts of horrid - * platform dependent stuff. Rather it should be inluded directly in - * concurrency library implementation source. - * - * @version $Id:$ - */ -class Util { - - static const int64_t NS_PER_S = 1000000000LL; - static const int64_t US_PER_S = 1000000LL; - static const int64_t MS_PER_S = 1000LL; - - static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S; - static const int64_t NS_PER_US = NS_PER_S / US_PER_S; - static const int64_t US_PER_MS = US_PER_S / MS_PER_S; - -public: - /** - * Converts millisecond timestamp into a THRIFT_TIMESPEC struct - * - * @param struct THRIFT_TIMESPEC& result - * @param time or duration in milliseconds - */ - static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) { - result.tv_sec = value / MS_PER_S; // ms to s - result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns - } - - static void toTimeval(struct timeval& result, int64_t value) { - result.tv_sec = static_cast(value / MS_PER_S); // ms to s - result.tv_usec = static_cast((value % MS_PER_S) * US_PER_MS); // ms to us - } - - static void toTicks(int64_t& result, - int64_t secs, - int64_t oldTicks, - int64_t oldTicksPerSec, - int64_t newTicksPerSec) { - result = secs * newTicksPerSec; - result += oldTicks * newTicksPerSec / oldTicksPerSec; - - int64_t oldPerNew = oldTicksPerSec / newTicksPerSec; - if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) { - ++result; - } - } - /** - * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch - */ - static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) { - return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec); - } - - /** - * Converts struct timeval to arbitrary-sized ticks since epoch - */ - static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) { - return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec); - } - - /** - * Converts struct THRIFT_TIMESPEC to milliseconds - */ - static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) { - return toTicks(result, value, MS_PER_S); - } - - /** - * Converts struct timeval to milliseconds - */ - static void toMilliseconds(int64_t& result, const struct timeval& value) { - return toTicks(result, value, MS_PER_S); - } - - /** - * Converts struct THRIFT_TIMESPEC to microseconds - */ - static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) { - return toTicks(result, value, US_PER_S); - } - - /** - * Converts struct timeval to microseconds - */ - static void toUsec(int64_t& result, const struct timeval& value) { - return toTicks(result, value, US_PER_S); - } - - /** - * Get current time as a number of arbitrary-size ticks from epoch - */ - static int64_t currentTimeTicks(int64_t ticksPerSec); - - /** - * Get current time as milliseconds from epoch - */ - static int64_t currentTime() { return currentTimeTicks(MS_PER_S); } - - /** - * Get current time as micros from epoch - */ - static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); } -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_ diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp index afb441198..19058094c 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.cpp +++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp @@ -264,7 +264,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { // it is probably a non-factor for the time being } -bool TFileTransport::swapEventBuffers(struct timeval* deadline) { +bool TFileTransport::swapEventBuffers(const std::chrono::time_point *deadline) { bool swap; Guard g(mutex_); @@ -277,7 +277,7 @@ bool TFileTransport::swapEventBuffers(struct timeval* deadline) { } else { if (deadline != NULL) { // if we were handed a deadline time struct, do a timed wait - notEmpty_.waitForTime(deadline); + notEmpty_.waitForTime(*deadline); } else { // just wait until the buffer gets an item notEmpty_.wait(); @@ -336,8 +336,7 @@ void TFileTransport::writerThread() { } // Figure out the next time by which a flush must take place - struct timeval ts_next_flush; - getNextFlushTime(&ts_next_flush); + auto ts_next_flush = getNextFlushTime(); uint32_t unflushed = 0; while (1) { @@ -492,15 +491,13 @@ void TFileTransport::writerThread() { } else { struct timeval current_time; THRIFT_GETTIMEOFDAY(¤t_time, NULL); - if (current_time.tv_sec > ts_next_flush.tv_sec - || (current_time.tv_sec == ts_next_flush.tv_sec - && current_time.tv_usec > ts_next_flush.tv_usec)) { + if (std::chrono::steady_clock::now() > ts_next_flush) { if (unflushed > 0) { flush = true; } else { // If there is no new data since the last fsync, // don't perform the fsync, but do reset the timer. - getNextFlushTime(&ts_next_flush); + ts_next_flush = getNextFlushTime(); } } } @@ -509,7 +506,7 @@ void TFileTransport::writerThread() { // sync (force flush) file to disk THRIFT_FSYNC(fd_); unflushed = 0; - getNextFlushTime(&ts_next_flush); + ts_next_flush = getNextFlushTime(); // notify anybody waiting for flush completion if (forced_flush) { @@ -908,15 +905,8 @@ void TFileTransport::openLogFile() { } } -void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) { - THRIFT_GETTIMEOFDAY(ts_next_flush, NULL); - - ts_next_flush->tv_usec += flushMaxUs_; - if (ts_next_flush->tv_usec > 1000000) { - long extra_secs = ts_next_flush->tv_usec / 1000000; - ts_next_flush->tv_usec %= 1000000; - ts_next_flush->tv_sec += extra_secs; - } +std::chrono::time_point TFileTransport::getNextFlushTime() { + return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_); } TFileTransportBuffer::TFileTransportBuffer(uint32_t size) diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h index ece271aae..4290eaa66 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.h +++ b/lib/cpp/src/thrift/transport/TFileTransport.h @@ -267,7 +267,7 @@ public: private: // helper functions for writing to a file void enqueueEvent(const uint8_t* buf, uint32_t eventLen); - bool swapEventBuffers(struct timeval* deadline); + bool swapEventBuffers(const std::chrono::time_point *deadline); bool initBufferAndWriteThread(); // control for writer thread @@ -286,7 +286,7 @@ private: // Utility functions void openLogFile(); - void getNextFlushTime(struct timeval* ts_next_flush); + std::chrono::time_point getNextFlushTime(); // Class variables readState readState_; diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp index fc0ba7f15..019ae67f2 100644 --- a/lib/cpp/test/concurrency/Tests.cpp +++ b/lib/cpp/test/concurrency/Tests.cpp @@ -94,18 +94,18 @@ int main(int argc, char** argv) { std::cout << "\t\tUtil minimum time" << std::endl; - int64_t time00 = Util::currentTime(); - int64_t time01 = Util::currentTime(); + int64_t time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + int64_t time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl; - time00 = Util::currentTime(); + time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); time01 = time00; size_t count = 0; while (time01 < time00 + 10) { count++; - time01 = Util::currentTime(); + time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); } std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl; diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h index 8ab754c89..ad1613ba9 100644 --- a/lib/cpp/test/concurrency/ThreadFactoryTests.h +++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -221,7 +220,7 @@ public: Monitor monitor; - int64_t startTime = Util::currentTime(); + int64_t startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); for (int64_t ix = 0; ix < count; ix++) { { @@ -233,7 +232,7 @@ public: } } - int64_t endTime = Util::currentTime(); + int64_t endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); bool success = (endTime - startTime) >= (count * timeout); diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h index b3a319a57..e9ed75653 100644 --- a/lib/cpp/test/concurrency/ThreadManagerTests.h +++ b/lib/cpp/test/concurrency/ThreadManagerTests.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -66,11 +65,11 @@ public: void run() { - _startTime = Util::currentTime(); + _startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); sleep_(_timeout); - _endTime = Util::currentTime(); + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); _done = true; @@ -123,7 +122,7 @@ public: new ThreadManagerTests::Task(monitor, activeCount, timeout))); } - int64_t time00 = Util::currentTime(); + int64_t time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); for (std::set >::iterator ix = tasks.begin(); ix != tasks.end(); @@ -143,7 +142,7 @@ public: } } - int64_t time01 = Util::currentTime(); + int64_t time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); int64_t firstTime = 9223372036854775807LL; int64_t lastTime = 0; @@ -387,9 +386,9 @@ public: bool apiTest() { // prove currentTime has milliseconds granularity since many other things depend on it - int64_t a = Util::currentTime(); + int64_t a = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); sleep_(100); - int64_t b = Util::currentTime(); + int64_t b = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); if (b - a < 50 || b - a > 150) { std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl; return false; diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h index c15b14b80..24d829acf 100644 --- a/lib/cpp/test/concurrency/TimerManagerTests.h +++ b/lib/cpp/test/concurrency/TimerManagerTests.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include @@ -39,7 +38,7 @@ public: public: Task(Monitor& monitor, int64_t timeout) : _timeout(timeout), - _startTime(Util::currentTime()), + _startTime(std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count()), _endTime(0), _monitor(monitor), _success(false), @@ -49,7 +48,7 @@ public: void run() { - _endTime = Util::currentTime(); + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); _success = (_endTime - _startTime) >= _timeout; { diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp index f4c281c7f..930261e46 100644 --- a/test/cpp/src/StressTest.cpp +++ b/test/cpp/src/StressTest.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -133,7 +132,7 @@ public: } } - _startTime = Util::currentTime(); + _startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); if(_behavior == OpenAndCloseTransportInThread) { _transport->open(); } @@ -159,7 +158,7 @@ public: break; } - _endTime = Util::currentTime(); + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); if(_behavior == OpenAndCloseTransportInThread) { _transport->close(); @@ -540,7 +539,7 @@ int main(int argc, char** argv) { cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl; - time00 = Util::currentTime(); + time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); monitor.notifyAll(); @@ -548,7 +547,7 @@ int main(int argc, char** argv) { monitor.wait(); } - time01 = Util::currentTime(); + time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); } int64_t firstTime = 9223372036854775807LL; diff --git a/test/cpp/src/StressTestNonBlocking.cpp b/test/cpp/src/StressTestNonBlocking.cpp index 5f0b293b9..ead2df570 100644 --- a/test/cpp/src/StressTestNonBlocking.cpp +++ b/test/cpp/src/StressTestNonBlocking.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -132,7 +131,7 @@ public: } } - _startTime = Util::currentTime(); + _startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); _transport->open(); @@ -157,7 +156,7 @@ public: break; } - _endTime = Util::currentTime(); + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); _transport->close(); @@ -478,7 +477,7 @@ int main(int argc, char** argv) { cerr << "Launch " << clientCount << " client threads" << endl; - time00 = Util::currentTime(); + time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); monitor.notifyAll(); @@ -486,7 +485,7 @@ int main(int argc, char** argv) { monitor.wait(); } - time01 = Util::currentTime(); + time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); } int64_t firstTime = 9223372036854775807LL;