Thrift: standardize coding style

Summary: Standardize indentation, spacing, #defines etc.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664784 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Slee 2006-09-05 21:05:31 +00:00
parent c9676569ad
commit f5f2be46a3
30 changed files with 737 additions and 929 deletions

View File

@ -1,5 +1,5 @@
#ifndef T_PROCESSOR_H #ifndef _THRIFT_TPROCESSOR_H_
#define T_PROCESSOR_H #define _THRIFT_TPROCESSOR_H_ 1
#include <string> #include <string>
#include <transport/TTransport.h> #include <transport/TTransport.h>
@ -30,4 +30,4 @@ class TProcessor {
}} // facebook::thrift }} // facebook::thrift
#endif #endif // #ifndef _THRIFT_PROCESSOR_H_

View File

@ -1,5 +1,5 @@
#ifndef THRIFT_H #ifndef _THRIFT_THRIFT_H_
#define THRIFT_H #define _THRIFT_THRIFT_H_ 1
#include <netinet/in.h> #include <netinet/in.h>
#include <inttypes.h> #include <inttypes.h>
@ -24,4 +24,4 @@ public:
}} // facebook::thrift }} // facebook::thrift
#endif #endif // #ifndef _THRIFT_THRIFT_H_

View File

@ -1,5 +1,5 @@
#if !defined(_concurrency_Exception_h_) #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
#define _concurrency_Exception_h_ 1 #define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1
#include <exception> #include <exception>
@ -17,4 +17,4 @@ class TimedOutException : public std::exception {};
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Exception_h_) #endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_

View File

@ -9,14 +9,14 @@
#include <pthread.h> #include <pthread.h>
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/** Monitor implementation using the POSIX pthread library /**
* Monitor implementation using the POSIX pthread library
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class Monitor::Impl { class Monitor::Impl {
public: public:
@ -26,108 +26,80 @@ class Monitor::Impl {
condInitialized(false) { condInitialized(false) {
try { try {
assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
mutexInitialized = true; mutexInitialized = true;
assert(pthread_cond_init(&_pthread_cond, NULL) == 0); assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
condInitialized = true; condInitialized = true;
} catch(...) { } catch(...) {
cleanup(); cleanup();
} }
} }
~Impl() {cleanup();} ~Impl() { cleanup(); }
void lock() const {pthread_mutex_lock(&_pthread_mutex);} void lock() const { pthread_mutex_lock(&_pthread_mutex); }
void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
void wait(long long timeout) const { void wait(long long timeout) const {
// XXX Need to assert that caller owns mutex // XXX Need to assert that caller owns mutex
assert(timeout >= 0LL); assert(timeout >= 0LL);
if (timeout == 0LL) {
if(timeout == 0LL) {
assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0); assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
} else { } else {
struct timespec abstime; struct timespec abstime;
long long now = Util::currentTime(); long long now = Util::currentTime();
Util::toTimespec(abstime, now + timeout); Util::toTimespec(abstime, now + timeout);
int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
if (result == ETIMEDOUT) {
if(result == ETIMEDOUT) {
assert(Util::currentTime() >= (now + timeout)); assert(Util::currentTime() >= (now + timeout));
} }
} }
} }
void notify() { void notify() {
// XXX Need to assert that caller owns mutex // XXX Need to assert that caller owns mutex
assert(pthread_cond_signal(&_pthread_cond) == 0); assert(pthread_cond_signal(&_pthread_cond) == 0);
} }
void notifyAll() { void notifyAll() {
// XXX Need to assert that caller owns mutex // XXX Need to assert that caller owns mutex
assert(pthread_cond_broadcast(&_pthread_cond) == 0); assert(pthread_cond_broadcast(&_pthread_cond) == 0);
} }
private: private:
void cleanup() { void cleanup() {
if (mutexInitialized) {
if(mutexInitialized) {
mutexInitialized = false; mutexInitialized = false;
assert(pthread_mutex_destroy(&_pthread_mutex) == 0); assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
} }
if(condInitialized) { if (condInitialized) {
condInitialized = false; condInitialized = false;
assert(pthread_cond_destroy(&_pthread_cond) == 0); assert(pthread_cond_destroy(&_pthread_cond) == 0);
} }
} }
mutable pthread_mutex_t _pthread_mutex; mutable pthread_mutex_t _pthread_mutex;
mutable bool mutexInitialized; mutable bool mutexInitialized;
mutable pthread_cond_t _pthread_cond; mutable pthread_cond_t _pthread_cond;
mutable bool condInitialized; mutable bool condInitialized;
}; };
Monitor::Monitor() : _impl(new Monitor::Impl()) {} Monitor::Monitor() : _impl(new Monitor::Impl()) {}
Monitor::~Monitor() { delete _impl;} Monitor::~Monitor() { delete _impl; }
void Monitor::lock() const {_impl->lock();} void Monitor::lock() const { _impl->lock(); }
void Monitor::unlock() const {_impl->unlock();} void Monitor::unlock() const { _impl->unlock(); }
void Monitor::wait(long long timeout) const {_impl->wait(timeout);} void Monitor::wait(long long timeout) const { _impl->wait(timeout); }
void Monitor::notify() const {_impl->notify();} void Monitor::notify() const { _impl->notify(); }
void Monitor::notifyAll() const {_impl->notifyAll();} void Monitor::notifyAll() const { _impl->notifyAll(); }
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency

View File

@ -1,18 +1,22 @@
#if !defined(_concurrency_Monitor_h_) #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
#define _concurrency_Monitor_h_ 1 #define _THRIFT_CONCURRENCY_MONITOR_H_ 1
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/** A monitor is a combination mutex and condition-event. Waiting and notifying condition events requires that the caller own the mutex. Mutex /**
lock and unlock operations can be performed independently of condition events. This is more or less analogous to java.lang.Object multi-thread * A monitor is a combination mutex and condition-event. Waiting and
operations * notifying condition events requires that the caller own the mutex. Mutex
* lock and unlock operations can be performed independently of condition
Note that all methods are const. Monitors implement logical constness, not bit constness. This allows const methods to call monitor * events. This is more or less analogous to java.lang.Object multi-thread
methods without needing to cast away constness or change to non-const signatures. * operations
*
@author marc * Note that all methods are const. Monitors implement logical constness, not
@version $Id:$ */ * bit constness. This allows const methods to call monitor methods without
* needing to cast away constness or change to non-const signatures.
*
* @author marc
* @version $Id:$
*/
class Monitor { class Monitor {
public: public:
@ -56,4 +60,4 @@ class Synchronized {
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Monitor_h_) #endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_

View File

@ -3,41 +3,42 @@
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
/** Implementation of Mutex class using POSIX mutex
@author marc
@version $Id:$ */
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/**
* Implementation of Mutex class using POSIX mutex
*
* @author marc
* @version $Id:$
*/
class Mutex::impl { class Mutex::impl {
public: public:
impl() : initialized(false) { impl() : initialized(false) {
assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
initialized = true; initialized = true;
} }
~impl() { ~impl() {
if(initialized) { if (initialized) {
initialized = false; initialized = false;
assert(pthread_mutex_destroy(&_pthread_mutex) == 0); assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
} }
} }
void lock() const {pthread_mutex_lock(&_pthread_mutex);} void lock() const { pthread_mutex_lock(&_pthread_mutex); }
void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
private: private:
mutable pthread_mutex_t _pthread_mutex; mutable pthread_mutex_t _pthread_mutex;
mutable bool initialized; mutable bool initialized;
}; };
Mutex::Mutex() : _impl(new Mutex::impl()) {} Mutex::Mutex() : _impl(new Mutex::impl()) {}
void Mutex::lock() const {_impl->lock();} void Mutex::lock() const { _impl->lock(); }
void Mutex::unlock() const {_impl->unlock();} void Mutex::unlock() const { _impl->unlock(); }
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency

View File

@ -1,39 +1,31 @@
#if !defined(_concurrency_mutex_h_) #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
#define _concurrency_mutex_h_ 1 #define _THRIFT_CONCURRENCY_MUTEX_H_ 1
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/** A simple mutex class /**
* A simple mutex class
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class Mutex { class Mutex {
public: public:
Mutex(); Mutex();
virtual ~Mutex() {} virtual ~Mutex() {}
virtual void lock() const; virtual void lock() const;
virtual void unlock() const; virtual void unlock() const;
private: private:
class impl; class impl;
impl* _impl; impl* _impl;
}; };
class MutexMonitor { class MutexMonitor {
public: public:
MutexMonitor(const Mutex& value) : _mutex(value) { MutexMonitor(const Mutex& value) : _mutex(value) {
_mutex.lock(); _mutex.lock();
} }
~MutexMonitor() { ~MutexMonitor() {
_mutex.unlock(); _mutex.unlock();
} }
@ -45,4 +37,4 @@ class MutexMonitor {
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_mutex_h_) #endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_

View File

@ -11,15 +11,17 @@ namespace facebook { namespace thrift { namespace concurrency {
using namespace boost; using namespace boost;
/** The POSIX thread class. /**
* The POSIX thread class.
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class PthreadThread: public Thread { class PthreadThread: public Thread {
public:
public: enum STATE {
enum STATE {uninitialized, uninitialized,
starting, starting,
started, started,
stopping, stopping,
@ -30,21 +32,15 @@ public:
static void* threadMain(void* arg); static void* threadMain(void* arg);
private: private:
pthread_t _pthread; pthread_t _pthread;
STATE _state; STATE _state;
int _policy; int _policy;
int _priority; int _priority;
int _stackSize; int _stackSize;
weak_ptr<PthreadThread> _self; weak_ptr<PthreadThread> _self;
public: public:
PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) : PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
_pthread(0), _pthread(0),
@ -56,58 +52,47 @@ public:
this->Thread::runnable(runnable); this->Thread::runnable(runnable);
} }
~PthreadThread() { ~PthreadThread() {}
}
void start() { void start() {
if (_state != uninitialized) {
if(_state != uninitialized) {
return; return;
} }
_state = starting; _state = starting;
pthread_attr_t thread_attr; pthread_attr_t thread_attr;
assert(pthread_attr_init(&thread_attr) == 0); assert(pthread_attr_init(&thread_attr) == 0);
assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
// Set thread stack size // Set thread stack size
assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
// Set thread policy // Set thread policy
assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
struct sched_param sched_param; struct sched_param sched_param;
sched_param.sched_priority = _priority; sched_param.sched_priority = _priority;
// Set thread priority // Set thread priority
assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
// Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>(); shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = _self.lock(); *selfRef = _self.lock();
assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
} }
void join() { void join() {
if (_state != stopped) {
if(_state != stopped) {
void* ignore; void* ignore;
pthread_join(_pthread, &ignore); pthread_join(_pthread, &ignore);
} }
} }
shared_ptr<Runnable> runnable() const {return Thread::runnable();} shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);} void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
void weakRef(shared_ptr<PthreadThread> self) { void weakRef(shared_ptr<PthreadThread> self) {
assert(self.get() == this); assert(self.get() == this);
@ -117,46 +102,41 @@ public:
void* PthreadThread::threadMain(void* arg) { void* PthreadThread::threadMain(void* arg) {
// XXX need a lock here when testing thread state // XXX need a lock here when testing thread state
shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg; shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg); delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
if(thread == NULL) { if (thread == NULL) {
return (void*)0; return (void*)0;
} }
if(thread->_state != starting) { if (thread->_state != starting) {
return (void*)0; return (void*)0;
} }
thread->_state = starting; thread->_state = starting;
thread->runnable()->run(); thread->runnable()->run();
if (thread->_state != stopping && thread->_state != stopped) {
if(thread->_state != stopping && thread->_state != stopped) {
thread->_state = stopping; thread->_state = stopping;
} }
return (void*)0; return (void*)0;
} }
/** POSIX Thread factory implementation */ /**
* POSIX Thread factory implementation
*/
class PosixThreadFactory::Impl { class PosixThreadFactory::Impl {
private: private:
POLICY _policy; POLICY _policy;
PRIORITY _priority; PRIORITY _priority;
int _stackSize; int _stackSize;
bool _detached; bool _detached;
/** Converts generic posix thread schedule policy enums into pthread API values. */ /**
* Converts generic posix thread schedule policy enums into pthread
* API values.
*/
static int toPthreadPolicy(POLICY policy) { static int toPthreadPolicy(POLICY policy) {
switch(policy) { switch(policy) {
case OTHER: return SCHED_OTHER; break; case OTHER: return SCHED_OTHER; break;
@ -166,83 +146,76 @@ private:
} }
} }
/** Converts relative thread priorities to absolute value based on posix thread scheduler policy /**
* Converts relative thread priorities to absolute value based on posix
The idea is simply to divide up the priority range for the given policy into the correpsonding relative * thread scheduler policy
priority level (lowest..highest) and then pro-rate accordingly. */ *
* The idea is simply to divide up the priority range for the given policy
* into the correpsonding relative priority level (lowest..highest) and
* then pro-rate accordingly.
*/
static int toPthreadPriority(POLICY policy, PRIORITY priority) { static int toPthreadPriority(POLICY policy, PRIORITY priority) {
int pthread_policy = toPthreadPolicy(policy); int pthread_policy = toPthreadPolicy(policy);
int min_priority = sched_get_priority_min(pthread_policy); int min_priority = sched_get_priority_min(pthread_policy);
int max_priority = sched_get_priority_max(pthread_policy); int max_priority = sched_get_priority_max(pthread_policy);
int quanta = (HIGHEST - LOWEST) + 1; int quanta = (HIGHEST - LOWEST) + 1;
float stepsperquanta = (max_priority - min_priority) / quanta; float stepsperquanta = (max_priority - min_priority) / quanta;
if(priority <= HIGHEST) { if(priority <= HIGHEST) {
return (int)(min_priority + stepsperquanta * priority); return (int)(min_priority + stepsperquanta * priority);
} else { } else {
// should never get here for priority increments. // should never get here for priority increments.
assert(false); assert(false);
return (int)(min_priority + stepsperquanta * NORMAL); return (int)(min_priority + stepsperquanta * NORMAL);
} }
} }
public: public:
Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_policy(policy), _policy(policy),
_priority(priority), _priority(priority),
_stackSize(stackSize), _stackSize(stackSize),
_detached(detached) { _detached(detached) {}
}
/** Creates a new POSIX thread to run the runnable object
@param runnable A runnable object */
/**
* Creates a new POSIX thread to run the runnable object
*
* @param runnable A runnable object
*/
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
result->weakRef(result); result->weakRef(result);
runnable->thread(result); runnable->thread(result);
return result; return result;
} }
int stackSize() const { return _stackSize;} int stackSize() const { return _stackSize; }
void stackSize(int value) { _stackSize = value;} void stackSize(int value) { _stackSize = value; }
PRIORITY priority() const { return _priority;} PRIORITY priority() const { return _priority; }
/** Sets priority.
XXX
Need to handle incremental priorities properly. */
void priority(PRIORITY value) { _priority = value;}
/**
* Sets priority.
*
* XXX
* Need to handle incremental priorities properly.
*/
void priority(PRIORITY value) { _priority = value; }
}; };
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);} shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); }
int PosixThreadFactory::stackSize() const {return _impl->stackSize();} int PosixThreadFactory::stackSize() const { return _impl->stackSize(); }
void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);} void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); }
PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();} PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); }
void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);} void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); }
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency

View File

@ -1,5 +1,5 @@
#if !defined(_concurrency_PosixThreadFactory_h_) #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
#define _concurrency_PosixThreadFactory_h_ 1 #define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
#include "Thread.h" #include "Thread.h"
@ -9,28 +9,33 @@ namespace facebook { namespace thrift { namespace concurrency {
using namespace boost; using namespace boost;
/** A thread factory to create posix threads /**
* A thread factory to create posix threads
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class PosixThreadFactory : public ThreadFactory { class PosixThreadFactory : public ThreadFactory {
public: public:
/** POSIX Thread scheduler policies */ /**
* POSIX Thread scheduler policies
*/
enum POLICY { enum POLICY {
OTHER, OTHER,
FIFO, FIFO,
ROUND_ROBIN ROUND_ROBIN
}; };
/** POSIX Thread scheduler relative priorities, /**
* POSIX Thread scheduler relative priorities,
Absolute priority is determined by scheduler policy and OS. This enumeration specifies relative priorities such that one can *
specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */ * Absolute priority is determined by scheduler policy and OS. This
* enumeration specifies relative priorities such that one can specify a
* priority withing a giving scheduler policy without knowing the absolute
* value of the priority.
*/
enum PRIORITY { enum PRIORITY {
LOWEST = 0, LOWEST = 0,
LOWER = 1, LOWER = 1,
@ -46,36 +51,37 @@ class PosixThreadFactory : public ThreadFactory {
PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false); PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
// From ThreadFactory; // From ThreadFactory;
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const; shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const;
/** Sets stack size for created threads /**
* Sets stack size for created threads
@param value size in megabytes */ *
* @param value size in megabytes
*/
virtual void stackSize(int value); virtual void stackSize(int value);
/** Gets stack size for created threads /**
* Gets stack size for created threads
@return int size in megabytes */ *
* @return int size in megabytes
*/
virtual int stackSize() const; virtual int stackSize() const;
/** Sets priority relative to current policy */ /**
* Sets priority relative to current policy
*/
virtual void priority(PRIORITY priority); virtual void priority(PRIORITY priority);
/** Gets priority relative to current policy */ /**
* Gets priority relative to current policy
*/
virtual PRIORITY priority() const; virtual PRIORITY priority() const;
private: private:
class Impl; class Impl;
shared_ptr<Impl> _impl; shared_ptr<Impl> _impl;
}; };
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_PosixThreadFactory_h_) #endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_

View File

@ -1,5 +1,5 @@
#if !defined(_concurrency_Thread_h_) #ifndef _THRIFT_CONCURRENCY_THREAD_H_
#define _concurrency_Thread_h_ 1 #define _THRIFT_CONCURRENCY_THREAD_H_ 1
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp> #include <boost/weak_ptr.hpp>
@ -10,77 +10,84 @@ using namespace boost;
class Thread; class Thread;
/** Minimal runnable class. More or less analogous to java.lang.Runnable. /**
* Minimal runnable class. More or less analogous to java.lang.Runnable.
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class Runnable { class Runnable {
public: public:
virtual ~Runnable() {}; virtual ~Runnable() {};
virtual void run() = 0; virtual void run() = 0;
/** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */ /**
* Gets the thread object that is hosting this runnable object - can return
* an empty shared pointer if no references remain on thet thread object
*/
virtual shared_ptr<Thread> thread() { return _thread.lock(); }
virtual shared_ptr<Thread> thread() {return _thread.lock();} /**
* Sets the thread that is executing this object. This is only meant for
/** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */ * use by concrete implementations of Thread.
*/
virtual void thread(shared_ptr<Thread> value) {_thread = value;} virtual void thread(shared_ptr<Thread> value) { _thread = value; }
private: private:
weak_ptr<Thread> _thread; weak_ptr<Thread> _thread;
}; };
/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread /**
(minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific * Minimal thread class. Returned by thread factory bound to a Runnable object
ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */ * and ready to start execution. More or less analogous to java.lang.Thread
* (minus all the thread group, priority, mode and other baggage, since that
* is difficult to abstract across platforms and is left for platform-specific
* ThreadFactory implemtations to deal with
*
* @see facebook::thrift::concurrency::ThreadFactory)
*/
class Thread { class Thread {
public: public:
virtual ~Thread() {}; virtual ~Thread() {};
/** Starts the thread. Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this /**
thread. */ * Starts the thread. Does platform specific thread creation and
* configuration then invokes the run method of the Runnable object bound
* to this thread.
*/
virtual void start() = 0; virtual void start() = 0;
/** Join this thread /**
* Join this thread. Current thread blocks until this target thread
Current thread blocks until this target thread completes. */ * completes.
*/
virtual void join() = 0; virtual void join() = 0;
/** Gets the runnable object this thread is hosting */ /**
* Gets the runnable object this thread is hosting
virtual shared_ptr<Runnable> runnable() const {return _runnable;} */
virtual shared_ptr<Runnable> runnable() const { return _runnable; }
protected: protected:
virtual void runnable(shared_ptr<Runnable> value) { _runnable = value; }
virtual void runnable(shared_ptr<Runnable> value) {_runnable = value;}
private: private:
shared_ptr<Runnable> _runnable; shared_ptr<Runnable> _runnable;
}; };
/** Factory to create platform-specific thread object and bind them to Runnable object for execution */ /**
* Factory to create platform-specific thread object and bind them to Runnable
* object for execution
*/
class ThreadFactory { class ThreadFactory {
public: public:
virtual ~ThreadFactory() {} virtual ~ThreadFactory() {}
virtual shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const = 0; virtual shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const = 0;
}; };
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Thread_h_) #endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_

View File

@ -17,48 +17,41 @@ namespace facebook { namespace thrift { namespace concurrency {
using namespace boost; using namespace boost;
/** ThreadManager class /**
* ThreadManager class
This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather *
it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times. * This class manages a pool of threads. It uses a ThreadFactory to create
* threads. It never actually creates or destroys worker threads, rather
@author marc * it maintains statistics on number of idle threads, number of active threads,
@version $Id:$ */ * task backlog, and average wait and service times.
*
* @author marc
* @version $Id:$
*/
class ThreadManager::Impl : public ThreadManager { class ThreadManager::Impl : public ThreadManager {
public: public:
Impl() : Impl() :
_workerCount(0), _workerCount(0),
_workerMaxCount(0), _workerMaxCount(0),
_idleCount(0), _idleCount(0),
_state(ThreadManager::UNINITIALIZED) _state(ThreadManager::UNINITIALIZED) {}
{}
~Impl() { ~Impl() { stop(); }
stop();
}
void start(); void start();
void stop(); void stop();
const ThreadManager::STATE state() const { const ThreadManager::STATE state() const { return _state; }
return _state;
};
shared_ptr<ThreadFactory> threadFactory() const { shared_ptr<ThreadFactory> threadFactory() const {
Synchronized s(_monitor); Synchronized s(_monitor);
return _threadFactory; return _threadFactory;
} }
void threadFactory(shared_ptr<ThreadFactory> value) { void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(_monitor); Synchronized s(_monitor);
_threadFactory = value; _threadFactory = value;
} }
@ -66,26 +59,20 @@ class ThreadManager::Impl : public ThreadManager {
void removeWorker(size_t value); void removeWorker(size_t value);
size_t idleWorkerCount() const {return _idleCount;} size_t idleWorkerCount() const { return _idleCount; }
size_t workerCount() const { size_t workerCount() const {
Synchronized s(_monitor); Synchronized s(_monitor);
return _workerCount; return _workerCount;
} }
size_t pendingTaskCount() const { size_t pendingTaskCount() const {
Synchronized s(_monitor); Synchronized s(_monitor);
return _tasks.size(); return _tasks.size();
} }
size_t totalTaskCount() const { size_t totalTaskCount() const {
Synchronized s(_monitor); Synchronized s(_monitor);
return _tasks.size() + _workerCount - _idleCount; return _tasks.size() + _workerCount - _idleCount;
} }
@ -94,35 +81,26 @@ class ThreadManager::Impl : public ThreadManager {
void remove(shared_ptr<Runnable> task); void remove(shared_ptr<Runnable> task);
private: private:
size_t _workerCount; size_t _workerCount;
size_t _workerMaxCount; size_t _workerMaxCount;
size_t _idleCount; size_t _idleCount;
ThreadManager::STATE _state; ThreadManager::STATE _state;
shared_ptr<ThreadFactory> _threadFactory; shared_ptr<ThreadFactory> _threadFactory;
friend class ThreadManager::Task; friend class ThreadManager::Task;
std::queue<shared_ptr<Task> > _tasks; std::queue<shared_ptr<Task> > _tasks;
Monitor _monitor; Monitor _monitor;
Monitor _workerMonitor; Monitor _workerMonitor;
friend class ThreadManager::Worker; friend class ThreadManager::Worker;
std::set<shared_ptr<Thread> > _workers; std::set<shared_ptr<Thread> > _workers;
std::set<shared_ptr<Thread> > _deadWorkers; std::set<shared_ptr<Thread> > _deadWorkers;
}; };
class ThreadManager::Task : public Runnable { class ThreadManager::Task : public Runnable {
public: public:
enum STATE { enum STATE {
WAITING, WAITING,
EXECUTING, EXECUTING,
@ -132,29 +110,24 @@ public:
Task(shared_ptr<Runnable> runnable) : Task(shared_ptr<Runnable> runnable) :
_runnable(runnable), _runnable(runnable),
_state(WAITING) _state(WAITING) {}
{}
~Task() {}; ~Task() {}
void run() { void run() {
if(_state == EXECUTING) { if (_state == EXECUTING) {
_runnable->run(); _runnable->run();
_state = COMPLETE; _state = COMPLETE;
} }
} }
private: private:
shared_ptr<Runnable> _runnable; shared_ptr<Runnable> _runnable;
friend class ThreadManager::Worker; friend class ThreadManager::Worker;
STATE _state; STATE _state;
}; };
class ThreadManager::Worker: public Runnable { class ThreadManager::Worker: public Runnable {
enum STATE { enum STATE {
UNINITIALIZED, UNINITIALIZED,
STARTING, STARTING,
@ -164,127 +137,101 @@ class ThreadManager::Worker: public Runnable {
}; };
public: public:
Worker(ThreadManager::Impl* manager) : Worker(ThreadManager::Impl* manager) :
_manager(manager), _manager(manager),
_state(UNINITIALIZED), _state(UNINITIALIZED),
_idle(false) _idle(false) {}
{}
~Worker() {} ~Worker() {}
bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;} bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; }
/** Worker entry point
As long as worker thread is running, pull tasks off the task queue and execute. */
/**
* Worker entry point
*
* As long as worker thread is running, pull tasks off the task queue and
* execute.
*/
void run() { void run() {
bool active = false; bool active = false;
bool notifyManager = false; bool notifyManager = false;
/** Increment worker semaphore and notify manager if worker count reached desired max /**
* Increment worker semaphore and notify manager if worker count reached
Note * desired max
We have to release the monitor and acquire the workerMonitor since that is what the manager *
blocks on for worker add/remove */ * Note: We have to release the monitor and acquire the workerMonitor
* since that is what the manager blocks on for worker add/remove
{Synchronized s(_manager->_monitor); */
{
Synchronized s(_manager->_monitor);
active = _manager->_workerCount < _manager->_workerMaxCount; active = _manager->_workerCount < _manager->_workerMaxCount;
if (active) {
if(active) {
_manager->_workerCount++; _manager->_workerCount++;
notifyManager = _manager->_workerCount == _manager->_workerMaxCount; notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
} }
} }
if(notifyManager) { if (notifyManager) {
Synchronized s(_manager->_workerMonitor); Synchronized s(_manager->_workerMonitor);
_manager->_workerMonitor.notify(); _manager->_workerMonitor.notify();
notifyManager = false; notifyManager = false;
} }
while(active) { while (active) {
shared_ptr<ThreadManager::Task> task; shared_ptr<ThreadManager::Task> task;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop). /**
* While holding manager monitor block for non-empty task queue (Also
Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented * check that the thread hasn't been requested to stop). Once the queue
such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying * is non-empty, dequeue a task, release monitor, and execute. If the
the next blocked thread but eventually the manager will see it. */ * worker max count has been decremented such that we exceed it, mark
* ourself inactive, decrement the worker count and notify the manager
{Synchronized s(_manager->_monitor); * (technically we're notifying the next blocked thread but eventually
* the manager will see it.
*/
{
Synchronized s(_manager->_monitor);
active = isActive(); active = isActive();
while (active && _manager->_tasks.empty()) {
while(active && _manager->_tasks.empty()) {
_manager->_idleCount++; _manager->_idleCount++;
_idle = true; _idle = true;
_manager->_monitor.wait(); _manager->_monitor.wait();
active = isActive(); active = isActive();
_idle = false; _idle = false;
_manager->_idleCount--; _manager->_idleCount--;
} }
if(active) { if (active) {
if (!_manager->_tasks.empty()) {
if(!_manager->_tasks.empty()) {
task = _manager->_tasks.front(); task = _manager->_tasks.front();
_manager->_tasks.pop(); _manager->_tasks.pop();
if (task->_state == ThreadManager::Task::WAITING) {
if(task->_state == ThreadManager::Task::WAITING) {
task->_state = ThreadManager::Task::EXECUTING; task->_state = ThreadManager::Task::EXECUTING;
} }
} }
} else { } else {
_idle = true; _idle = true;
_manager->_workerCount--; _manager->_workerCount--;
notifyManager = _manager->_workerCount == _manager->_workerMaxCount; notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
} }
} }
if(task != NULL) { if (task != NULL) {
if (task->_state == ThreadManager::Task::EXECUTING) {
if(task->_state == ThreadManager::Task::EXECUTING) {
try { try {
task->run(); task->run();
} catch(...) { } catch(...) {
// XXX need to log this // XXX need to log this
} }
} }
} }
} }
{Synchronized s(_manager->_workerMonitor); {
Synchronized s(_manager->_workerMonitor);
_manager->_deadWorkers.insert(this->thread()); _manager->_deadWorkers.insert(this->thread());
if (notifyManager) {
if(notifyManager) {
_manager->_workerMonitor.notify(); _manager->_workerMonitor.notify();
} }
} }
@ -293,48 +240,36 @@ class ThreadManager::Worker: public Runnable {
} }
private: private:
ThreadManager::Impl* _manager; ThreadManager::Impl* _manager;
friend class ThreadManager::Impl; friend class ThreadManager::Impl;
STATE _state; STATE _state;
bool _idle; bool _idle;
}; };
void ThreadManager::Impl::addWorker(size_t value) {
void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads; std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
for(size_t ix = 0; ix < value; ix++) {
class ThreadManager::Worker; class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this)); shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(_threadFactory->newThread(worker)); newThreads.insert(_threadFactory->newThread(worker));
} }
{Synchronized s(_monitor); {
Synchronized s(_monitor);
_workerMaxCount+= value; _workerMaxCount+= value;
_workers.insert(newThreads.begin(), newThreads.end()); _workers.insert(newThreads.begin(), newThreads.end());
} }
for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable()); shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->_state = ThreadManager::Worker::STARTING; worker->_state = ThreadManager::Worker::STARTING;
(*ix)->start(); (*ix)->start();
} }
{Synchronized s(_workerMonitor); {
Synchronized s(_workerMonitor);
while(_workerCount != _workerMaxCount) { while (_workerCount != _workerMaxCount) {
_workerMonitor.wait(); _workerMonitor.wait();
} }
} }
@ -342,93 +277,78 @@ void ThreadManager::Impl::addWorker(size_t value) {
void ThreadManager::Impl::start() { void ThreadManager::Impl::start() {
if(_state == ThreadManager::STOPPED) { if (_state == ThreadManager::STOPPED) {
return; return;
} }
{Synchronized s(_monitor); {
Synchronized s(_monitor);
if(_state == ThreadManager::UNINITIALIZED) { if (_state == ThreadManager::UNINITIALIZED) {
if (_threadFactory == NULL) {
if(_threadFactory == NULL) {throw InvalidArgumentException();} throw InvalidArgumentException();
}
_state = ThreadManager::STARTED; _state = ThreadManager::STARTED;
_monitor.notifyAll(); _monitor.notifyAll();
} }
while(_state == STARTING) { while (_state == STARTING) {
_monitor.wait(); _monitor.wait();
} }
} }
} }
void ThreadManager::Impl::stop() { void ThreadManager::Impl::stop() {
bool doStop = false; bool doStop = false;
if (_state == ThreadManager::STOPPED) {
if(_state == ThreadManager::STOPPED) {
return; return;
} }
{Synchronized s(_monitor); {
Synchronized s(_monitor);
if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
doStop = true; doStop = true;
_state = ThreadManager::STOPPING; _state = ThreadManager::STOPPING;
} }
} }
if(doStop) { if (doStop) {
removeWorker(_workerCount); removeWorker(_workerCount);
_state = ThreadManager::STOPPING; _state = ThreadManager::STOPPING;
} }
// XXX // XXX
// should be able to block here for transition to STOPPED since we're now using shared_ptrs // should be able to block here for transition to STOPPED since we're no
// using shared_ptrs
} }
void ThreadManager::Impl::removeWorker(size_t value) { void ThreadManager::Impl::removeWorker(size_t value) {
std::set<shared_ptr<Thread> > removedThreads; std::set<shared_ptr<Thread> > removedThreads;
{
{Synchronized s(_monitor); Synchronized s(_monitor);
if (value > _workerMaxCount) {
if(value > _workerMaxCount) {
throw InvalidArgumentException(); throw InvalidArgumentException();
} }
_workerMaxCount-= value; _workerMaxCount-= value;
if(_idleCount < value) { if (_idleCount < value) {
for (size_t ix = 0; ix < _idleCount; ix++) {
for(size_t ix = 0; ix < _idleCount; ix++) {
_monitor.notify(); _monitor.notify();
} }
} else { } else {
_monitor.notifyAll(); _monitor.notifyAll();
} }
} }
{Synchronized s(_workerMonitor); {
Synchronized s(_workerMonitor);
while(_workerCount != _workerMaxCount) { while (_workerCount != _workerMaxCount) {
_workerMonitor.wait(); _workerMonitor.wait();
} }
for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { for (std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
_workers.erase(*ix); _workers.erase(*ix);
} }
_deadWorkers.clear(); _deadWorkers.clear();
@ -436,31 +356,24 @@ void ThreadManager::Impl::removeWorker(size_t value) {
} }
void ThreadManager::Impl::add(shared_ptr<Runnable> value) { void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
Synchronized s(_monitor); Synchronized s(_monitor);
if(_state != ThreadManager::STARTED) { if (_state != ThreadManager::STARTED) {
throw IllegalStateException(); throw IllegalStateException();
} }
_tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value))); _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
/* If idle thread is available notify it, otherwise all worker threads are running and will get around to this // If idle thread is available notify it, otherwise all worker threads are
task in time. */ // running and will get around to this task in time.
if (_idleCount > 0) {
if(_idleCount > 0) {
_monitor.notify(); _monitor.notify();
} }
} }
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Synchronized s(_monitor); Synchronized s(_monitor);
if (_state != ThreadManager::STARTED) {
if(_state != ThreadManager::STARTED) {
throw IllegalStateException(); throw IllegalStateException();
} }
} }
@ -468,7 +381,6 @@ void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
class SimpleThreadManager : public ThreadManager::Impl { class SimpleThreadManager : public ThreadManager::Impl {
public: public:
SimpleThreadManager(size_t workerCount=4) : SimpleThreadManager(size_t workerCount=4) :
_workerCount(workerCount), _workerCount(workerCount),
_firstTime(true) { _firstTime(true) {
@ -476,12 +388,10 @@ public:
void start() { void start() {
ThreadManager::Impl::start(); ThreadManager::Impl::start();
addWorker(_workerCount); addWorker(_workerCount);
} }
private: private:
const size_t _workerCount; const size_t _workerCount;
bool _firstTime; bool _firstTime;
Monitor _monitor; Monitor _monitor;
@ -497,4 +407,3 @@ shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
} }
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency

View File

@ -1,49 +1,59 @@
#if !defined(_concurrency_ThreadManager_h_) #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
#define _concurrency_ThreadManager_h_ 1 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <sys/types.h> #include <sys/types.h>
#include "Thread.h" #include "Thread.h"
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
using namespace boost; using namespace boost;
/** Thread Pool Manager and related classes /**
* Thread Pool Manager and related classes
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class ThreadManager; class ThreadManager;
/** ThreadManager class /**
* ThreadManager class
This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather *
it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the * This class manages a pool of threads. It uses a ThreadFactory to create
PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool * threads. It never actually creates or destroys worker threads, rather
size needs to be adjusted and call this object addWorker and removeWorker methods to make changes. * It maintains statistics on number of idle threads, number of active threads,
* task backlog, and average wait and service times and informs the PoolPolicy
This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on * object bound to instances of this manager of interesting transitions. It is
policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */ * then up the PoolPolicy object to decide if the thread pool size needs to be
* adjusted and call this object addWorker and removeWorker methods to make
* changes.
*
* This design allows different policy implementations to used this code to
* handle basic worker thread management and worker task execution and focus on
* policy issues. The simplest policy, StaticPolicy, does nothing other than
* create a fixed number of threads.
*/
class ThreadManager { class ThreadManager {
public: public:
ThreadManager() {} ThreadManager() {}
virtual ~ThreadManager() {} virtual ~ThreadManager() {}
/** Starts the thread manager. Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */ /**
* Starts the thread manager. Verifies all attributes have been properly
* initialized, then allocates necessary resources to begin operation
*/
virtual void start() = 0; virtual void start() = 0;
/** Stops the thread manager. Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources. /**
This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that * Stops the thread manager. Aborts all remaining unprocessed task, shuts
won't terminate. */ * down all created worker threads, and realeases all allocated resources.
* This method blocks for all worker threads to complete, thus it can
* potentially block forever if a worker thread is running a task that
* won't terminate.
*/
virtual void stop() = 0; virtual void stop() = 0;
enum STATE { enum STATE {
@ -64,37 +74,43 @@ class ThreadManager {
virtual void removeWorker(size_t value=1) = 0; virtual void removeWorker(size_t value=1) = 0;
/** Gets the current number of idle worker threads */ /**
* Gets the current number of idle worker threads
*/
virtual size_t idleWorkerCount() const = 0; virtual size_t idleWorkerCount() const = 0;
/** Gets the current number of total worker threads */ /**
* Gets the current number of total worker threads
*/
virtual size_t workerCount() const = 0; virtual size_t workerCount() const = 0;
/** Gets the current number of pending tasks */ /**
* Gets the current number of pending tasks
*/
virtual size_t pendingTaskCount() const = 0; virtual size_t pendingTaskCount() const = 0;
/** Gets the current number of pending and executing tasks */ /**
* Gets the current number of pending and executing tasks
*/
virtual size_t totalTaskCount() const = 0; virtual size_t totalTaskCount() const = 0;
/** Adds a task to be execued at some time in the future by a worker thread. /**
* Adds a task to be execued at some time in the future by a worker thread.
@param value The task to run */ *
* @param value The task to run
*/
virtual void add(shared_ptr<Runnable>value) = 0; virtual void add(shared_ptr<Runnable>value) = 0;
/** Removes a pending task */ /**
* Removes a pending task
*/
virtual void remove(shared_ptr<Runnable> task) = 0; virtual void remove(shared_ptr<Runnable> task) = 0;
static shared_ptr<ThreadManager> newThreadManager(); static shared_ptr<ThreadManager> newThreadManager();
/** Creates a simple thread manager the uses count number of worker threads */ /**
* Creates a simple thread manager the uses count number of worker threads
*/
static shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4); static shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
class Task; class Task;
@ -106,4 +122,4 @@ class ThreadManager {
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_ThreadManager_h_) #endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_

View File

@ -8,17 +8,18 @@
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/** TimerManager class
@author marc
@version $Id:$ */
typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator; typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range; typedef std::pair<task_iterator, task_iterator> task_range;
/**
* TimerManager class
*
* @author marc
* @version $Id:$
*/
class TimerManager::Task : public Runnable { class TimerManager::Task : public Runnable {
public: public:
enum STATE { enum STATE {
WAITING, WAITING,
EXECUTING, EXECUTING,
@ -28,130 +29,101 @@ public:
Task(shared_ptr<Runnable> runnable) : Task(shared_ptr<Runnable> runnable) :
_runnable(runnable), _runnable(runnable),
_state(WAITING) _state(WAITING) {}
{}
~Task() { ~Task() {
std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug //debug
}; std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
}
void run() { void run() {
if(_state == EXECUTING) { if (_state == EXECUTING) {
_runnable->run(); _runnable->run();
_state = COMPLETE; _state = COMPLETE;
} }
} }
private: private:
shared_ptr<Runnable> _runnable; shared_ptr<Runnable> _runnable;
class TimerManager::Dispatcher; class TimerManager::Dispatcher;
friend class TimerManager::Dispatcher; friend class TimerManager::Dispatcher;
STATE _state; STATE _state;
}; };
class TimerManager::Dispatcher: public Runnable { class TimerManager::Dispatcher: public Runnable {
public: public:
Dispatcher(TimerManager* manager) : Dispatcher(TimerManager* manager) :
_manager(manager) { _manager(manager) {}
}
~Dispatcher() { ~Dispatcher() {
std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug // debug
std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
} }
/** Dispatcher entry point /**
* Dispatcher entry point
As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */ *
* As long as dispatcher thread is running, pull tasks off the task _taskMap
* and execute.
*/
void run() { void run() {
{
{Synchronized s(_manager->_monitor); Synchronized s(_manager->_monitor);
if (_manager->_state == TimerManager::STARTING) {
if(_manager->_state == TimerManager::STARTING) {
_manager->_state = TimerManager::STARTED; _manager->_state = TimerManager::STARTED;
_manager->_monitor.notifyAll(); _manager->_monitor.notifyAll();
} }
} }
do { do {
std::set<shared_ptr<TimerManager::Task> > expiredTasks; std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{
{Synchronized s(_manager->_monitor); Synchronized s(_manager->_monitor);
task_iterator expiredTaskEnd; task_iterator expiredTaskEnd;
long long now = Util::currentTime(); long long now = Util::currentTime();
while (_manager->_state == TimerManager::STARTED &&
while(_manager->_state == TimerManager::STARTED &&
(expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
long long timeout = 0LL; long long timeout = 0LL;
if (!_manager->_taskMap.empty()) {
if(!_manager->_taskMap.empty()) {
timeout = _manager->_taskMap.begin()->first - now; timeout = _manager->_taskMap.begin()->first - now;
} }
assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
_manager->_monitor.wait(timeout); _manager->_monitor.wait(timeout);
now = Util::currentTime(); now = Util::currentTime();
} }
if(_manager->_state == TimerManager::STARTED) { if (_manager->_state == TimerManager::STARTED) {
for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
shared_ptr<TimerManager::Task> task = ix->second; shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task); expiredTasks.insert(task);
if (task->_state == TimerManager::Task::WAITING) {
if(task->_state == TimerManager::Task::WAITING) {
task->_state = TimerManager::Task::EXECUTING; task->_state = TimerManager::Task::EXECUTING;
} }
_manager->_taskCount--; _manager->_taskCount--;
} }
_manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
} }
} }
for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
(*ix)->run(); (*ix)->run();
} }
} while(_manager->_state == TimerManager::STARTED); } while (_manager->_state == TimerManager::STARTED);
{Synchronized s(_manager->_monitor);
if(_manager->_state == TimerManager::STOPPING) {
{
Synchronized s(_manager->_monitor);
if (_manager->_state == TimerManager::STOPPING) {
_manager->_state = TimerManager::STOPPED; _manager->_state = TimerManager::STOPPED;
_manager->_monitor.notify(); _manager->_monitor.notify();
} }
} }
return; return;
} }
private: private:
TimerManager* _manager; TimerManager* _manager;
friend class TimerManager; friend class TimerManager;
}; };
@ -164,141 +136,106 @@ TimerManager::TimerManager() :
TimerManager::~TimerManager() { TimerManager::~TimerManager() {
/* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since // If we haven't been explicitly stopped, do so now. We don't need to grab
stop already takes care of reentrancy. */ // the monitor here, since stop already takes care of reentrancy.
std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if(_state != STOPPED) { if (_state != STOPPED) {
try { try {
stop(); stop();
} catch(...) { } catch(...) {
std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl; std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
throw; throw;
// uhoh // uhoh
} }
} }
} }
void TimerManager::start() { void TimerManager::start() {
bool doStart = false; bool doStart = false;
{
{Synchronized s(_monitor); Synchronized s(_monitor);
if (_threadFactory == NULL) {
if(_threadFactory == NULL) {throw InvalidArgumentException();} throw InvalidArgumentException();
}
if(_state == TimerManager::UNINITIALIZED) { if (_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STARTING; _state = TimerManager::STARTING;
doStart = true; doStart = true;
} }
} }
if(doStart) { if (doStart) {
_dispatcherThread = _threadFactory->newThread(_dispatcher); _dispatcherThread = _threadFactory->newThread(_dispatcher);
_dispatcherThread->start(); _dispatcherThread->start();
} }
{Synchronized s(_monitor); {
Synchronized s(_monitor);
while(_state == TimerManager::STARTING) { while (_state == TimerManager::STARTING) {
_monitor.wait(); _monitor.wait();
} }
assert(_state != TimerManager::STARTING); assert(_state != TimerManager::STARTING);
} }
} }
void TimerManager::stop() { void TimerManager::stop() {
bool doStop = false; bool doStop = false;
{
{Synchronized s(_monitor); Synchronized s(_monitor);
if (_state == TimerManager::UNINITIALIZED) {
if(_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STOPPED; _state = TimerManager::STOPPED;
} else if (_state != STOPPING && _state != STOPPED) {
} else if(_state != STOPPING && _state != STOPPED) {
doStop = true; doStop = true;
_state = STOPPING; _state = STOPPING;
_monitor.notifyAll(); _monitor.notifyAll();
} }
while (_state != STOPPED) {
while(_state != STOPPED) {
_monitor.wait(); _monitor.wait();
} }
} }
if(doStop) { if (doStop) {
// Clean up any outstanding tasks // Clean up any outstanding tasks
for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
_taskMap.erase(ix); _taskMap.erase(ix);
} }
// Remove dispatcher's reference to us. // Remove dispatcher's reference to us.
_dispatcher->_manager = NULL; _dispatcher->_manager = NULL;
} }
} }
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Synchronized s(_monitor); Synchronized s(_monitor);
return _threadFactory; return _threadFactory;
} }
void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) { void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(_monitor); Synchronized s(_monitor);
_threadFactory = value; _threadFactory = value;
} }
size_t TimerManager::taskCount() const { size_t TimerManager::taskCount() const {
return _taskCount; return _taskCount;
} }
void TimerManager::add(shared_ptr<Runnable> task, long long timeout) { void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
long long now = Util::currentTime(); long long now = Util::currentTime();
timeout += now; timeout += now;
{Synchronized s(_monitor); {
Synchronized s(_monitor);
if(_state != TimerManager::STARTED) { if (_state != TimerManager::STARTED) {
throw IllegalStateException(); throw IllegalStateException();
} }
_taskCount++; _taskCount++;
_taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task)))); _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
/* If the task map was empty, or if we have an expiration that is earlier than any previously seen, // If the task map was empty, or if we have an expiration that is earlier
kick the dispatcher so it can update its timeout */ // than any previously seen, kick the dispatcher so it can update its
// timeout
if(_taskCount == 1 || timeout < _taskMap.begin()->first) { if (_taskCount == 1 || timeout < _taskMap.begin()->first) {
_monitor.notify(); _monitor.notify();
} }
} }
@ -307,12 +244,11 @@ void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) { void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
long long expiration; long long expiration;
Util::toMilliseconds(expiration, value); Util::toMilliseconds(expiration, value);
long long now = Util::currentTime(); long long now = Util::currentTime();
if(expiration < now) { if (expiration < now) {
throw InvalidArgumentException(); throw InvalidArgumentException();
} }
@ -321,15 +257,13 @@ void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value)
void TimerManager::remove(shared_ptr<Runnable> task) { void TimerManager::remove(shared_ptr<Runnable> task) {
{Synchronized s(_monitor); Synchronized s(_monitor);
if (_state != TimerManager::STARTED) {
if(_state != TimerManager::STARTED) {
throw IllegalStateException(); throw IllegalStateException();
} }
}
} }
const TimerManager::STATE TimerManager::state() const { return _state;} const TimerManager::STATE TimerManager::state() const { return _state; }
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency

View File

@ -1,27 +1,26 @@
#if !defined(_concurrency_TimerManager_h_) #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
#define _concurrency_TimerManager_h_ 1 #define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1
#include "Exception.h" #include "Exception.h"
#include "Monitor.h" #include "Monitor.h"
#include "Thread.h" #include "Thread.h"
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <map> #include <map>
#include <time.h> #include <time.h>
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
using namespace boost; using namespace boost;
/** Timer Manager /**
* Timer Manager
This class dispatches timer tasks when they fall due. *
* This class dispatches timer tasks when they fall due.
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class TimerManager { class TimerManager {
public: public:
@ -34,39 +33,46 @@ class TimerManager {
virtual void threadFactory(shared_ptr<const ThreadFactory> value); virtual void threadFactory(shared_ptr<const ThreadFactory> value);
/** Starts the timer manager service /**
* Starts the timer manager service
@throws IllegalArgumentException Missing thread factory attribute */ *
* @throws IllegalArgumentException Missing thread factory attribute
*/
virtual void start(); virtual void start();
/** Stops the timer manager service */ /**
* Stops the timer manager service
*/
virtual void stop(); virtual void stop();
virtual size_t taskCount() const ; virtual size_t taskCount() const ;
/** Adds a task to be executed at some time in the future by a worker thread. /**
* Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute *
@param timeout Time in milliseconds to delay before executing task */ * @param task The task to execute
* @param timeout Time in milliseconds to delay before executing task
*/
virtual void add(shared_ptr<Runnable> task, long long timeout); virtual void add(shared_ptr<Runnable> task, long long timeout);
/** Adds a task to be executed at some time in the future by a worker thread. /**
* Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute *
@param timeout Absolute time in the future to execute task. */ * @param task The task to execute
* @param timeout Absolute time in the future to execute task.
*/
virtual void add(shared_ptr<Runnable> task, const struct timespec& timeout); virtual void add(shared_ptr<Runnable> task, const struct timespec& timeout);
/** Removes a pending task /**
* Removes a pending task
@throws NoSuchTaskException Specified task doesn't exist. It was either processed already or this call was made for a task that *
was never added to this timer * @throws NoSuchTaskException Specified task doesn't exist. It was either
* processed already or this call was made for a
@throws UncancellableTaskException Specified task is already being executed or has completed execution. */ * task that was never added to this timer
*
* @throws UncancellableTaskException Specified task is already being
* executed or has completed execution.
*/
virtual void remove(shared_ptr<Runnable> task); virtual void remove(shared_ptr<Runnable> task);
enum STATE { enum STATE {
@ -80,31 +86,19 @@ class TimerManager {
virtual const STATE state() const; virtual const STATE state() const;
private: private:
shared_ptr<const ThreadFactory> _threadFactory; shared_ptr<const ThreadFactory> _threadFactory;
class Task; class Task;
friend class Task; friend class Task;
std::multimap<long long, shared_ptr<Task> > _taskMap; std::multimap<long long, shared_ptr<Task> > _taskMap;
size_t _taskCount; size_t _taskCount;
Monitor _monitor; Monitor _monitor;
STATE _state; STATE _state;
class Dispatcher; class Dispatcher;
friend class Dispatcher; friend class Dispatcher;
shared_ptr<Dispatcher> _dispatcher; shared_ptr<Dispatcher> _dispatcher;
shared_ptr<Thread> _dispatcherThread; shared_ptr<Thread> _dispatcherThread;
}; };
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_TimerManager_h_) #endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_

View File

@ -1,5 +1,5 @@
#if !defined(_concurrency_Util_h_) #ifndef _THRIFT_CONCURRENCY_UTIL_H_
#define _concurrency_Util_h_ 1 #define _THRIFT_CONCURRENCY_UTIL_H_ 1
#include <config.h> #include <config.h>
@ -13,68 +13,71 @@
namespace facebook { namespace thrift { namespace concurrency { namespace facebook { namespace thrift { namespace concurrency {
/** Utility methods /**
* Utility methods
This class contains basic utility methods for converting time formats, and other common platform-dependent concurrency operations. *
It should not be included in API headers for other concurrency library headers, since it will, by definition, pull in all sorts of * This class contains basic utility methods for converting time formats,
horrid platform dependent crap. Rather it should be inluded directly in concurrency library implementation source. * and other common platform-dependent concurrency operations.
* It should not be included in API headers for other concurrency library
@author marc * headers, since it will, by definition, pull in all sorts of horrid
@version $Id:$ */ * platform dependent crap. Rather it should be inluded directly in
* concurrency library implementation source.
*
* @author marc
* @version $Id:$
*/
class Util { class Util {
static const long long NS_PER_S = 1000000000LL; static const long long NS_PER_S = 1000000000LL;
static const long long MS_PER_S = 1000LL; static const long long MS_PER_S = 1000LL;
static const long long NS_PER_MS = 1000000LL; static const long long NS_PER_MS = 1000000LL;
public: public:
/** Converts timespec to milliseconds /**
* Converts timespec to milliseconds
@param struct timespec& result *
@param time or duration in milliseconds */ * @param struct timespec& result
* @param time or duration in milliseconds
*/
static void toTimespec(struct timespec& result, long long value) { static void toTimespec(struct timespec& result, long long value) {
result.tv_sec = value / MS_PER_S; // ms to s result.tv_sec = value / MS_PER_S; // ms to s
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
} }
/** Converts timespec to milliseconds */ /**
* Converts timespec to milliseconds
*/
static const void toMilliseconds(long long& result, const struct timespec& value) { static const void toMilliseconds(long long& result, const struct timespec& value) {
result =
result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS) + (value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; (value.tv_sec * MS_PER_S) +
(value.tv_nsec / NS_PER_MS) +
(value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0);
} }
/** Get current time as milliseconds from epoch */ /**
* Get current time as milliseconds from epoch
*/
static const long long currentTime() { static const long long currentTime() {
#if defined(HAVE_CLOCK_GETTIME) #if defined(HAVE_CLOCK_GETTIME)
struct timespec now; struct timespec now;
assert(clock_gettime(CLOCK_REALTIME, &now) == 0); assert(clock_gettime(CLOCK_REALTIME, &now) == 0);
return
return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; (now.tv_sec * MS_PER_S) +
(now.tv_nsec / NS_PER_MS) +
(now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
#elif defined(HAVE_GETTIMEOFDAY) #elif defined(HAVE_GETTIMEOFDAY)
struct timeval now; struct timeval now;
assert(gettimeofday(&now, NULL) == 0); assert(gettimeofday(&now, NULL) == 0);
return
return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0); (((long long)now.tv_sec) * MS_PER_S) +
(now.tv_usec / MS_PER_S) +
(now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
#endif // defined(HAVE_GETTIMEDAY) #endif // defined(HAVE_GETTIMEDAY)
} }
}; };
}}} // facebook::thrift::concurrency }}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Util_h_) #endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_

View File

@ -14,13 +14,13 @@ int main(int argc, char** argv) {
args[0] = "all"; args[0] = "all";
for(int ix = 1; ix < argc; ix++) { for (int ix = 1; ix < argc; ix++) {
args[ix - 1] = std::string(argv[ix]); args[ix - 1] = std::string(argv[ix]);
} }
bool runAll = args[0].compare("all") == 0; bool runAll = args[0].compare("all") == 0;
if(runAll || args[0].compare("thread-factory") == 0) { if (runAll || args[0].compare("thread-factory") == 0) {
ThreadFactoryTests threadFactoryTests; ThreadFactoryTests threadFactoryTests;
@ -41,7 +41,7 @@ int main(int argc, char** argv) {
assert(threadFactoryTests.monitorTimeoutTest()); assert(threadFactoryTests.monitorTimeoutTest());
} }
if(runAll || args[0].compare("util") == 0) { if (runAll || args[0].compare("util") == 0) {
std::cout << "Util tests..." << std::endl; std::cout << "Util tests..." << std::endl;
@ -56,7 +56,7 @@ int main(int argc, char** argv) {
time01 = time00; time01 = time00;
size_t count = 0; size_t count = 0;
while(time01 < time00 + 10) { while (time01 < time00 + 10) {
count++; count++;
time01 = Util::currentTime(); time01 = Util::currentTime();
} }
@ -65,7 +65,7 @@ int main(int argc, char** argv) {
} }
if(runAll || args[0].compare("timer-manager") == 0) { if (runAll || args[0].compare("timer-manager") == 0) {
std::cout << "TimerManager tests..." << std::endl; std::cout << "TimerManager tests..." << std::endl;
@ -76,7 +76,7 @@ int main(int argc, char** argv) {
assert(timerManagerTests.test00()); assert(timerManagerTests.test00());
} }
if(runAll || args[0].compare("thread-manager") == 0) { if (runAll || args[0].compare("thread-manager") == 0) {
std::cout << "ThreadManager tests..." << std::endl; std::cout << "ThreadManager tests..." << std::endl;
@ -96,7 +96,7 @@ int main(int argc, char** argv) {
} }
} }
if(runAll || args[0].compare("thread-manager-benchmark") == 0) { if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
std::cout << "ThreadManager benchmark tests..." << std::endl; std::cout << "ThreadManager benchmark tests..." << std::endl;
@ -110,7 +110,7 @@ int main(int argc, char** argv) {
long long delay = 10LL; long long delay = 10LL;
for(size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) { for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
size_t taskCount = workerCount * tasksPerWorker; size_t taskCount = workerCount * tasksPerWorker;

View File

@ -11,11 +11,12 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test {
using namespace facebook::thrift::concurrency; using namespace facebook::thrift::concurrency;
/** ThreadManagerTests class /**
* ThreadManagerTests class
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class ThreadFactoryTests { class ThreadFactoryTests {
public: public:
@ -33,8 +34,9 @@ public:
} }
}; };
/** Hello world test */ /**
* Hello world test
*/
bool helloWorldTest() { bool helloWorldTest() {
PosixThreadFactory threadFactory = PosixThreadFactory(); PosixThreadFactory threadFactory = PosixThreadFactory();
@ -52,30 +54,28 @@ public:
return true; return true;
} }
/** Reap N threads */ /**
* Reap N threads
*/
class ReapNTask: public Runnable { class ReapNTask: public Runnable {
public: public:
ReapNTask(Monitor& monitor, int& activeCount) : ReapNTask(Monitor& monitor, int& activeCount) :
_monitor(monitor), _monitor(monitor),
_count(activeCount) { _count(activeCount) {}
}
void run() { void run() {
Synchronized s(_monitor);
{Synchronized s(_monitor);
_count--; _count--;
//std::cout << "\t\t\tthread count: " << _count << std::endl; //std::cout << "\t\t\tthread count: " << _count << std::endl;
if(_count == 0) { if (_count == 0) {
_monitor.notify(); _monitor.notify();
} }
} }
}
Monitor& _monitor; Monitor& _monitor;
@ -92,25 +92,24 @@ public:
std::set<shared_ptr<Thread> > threads; std::set<shared_ptr<Thread> > threads;
for(int ix = 0; ix < count; ix++) { for (int ix = 0; ix < count; ix++) {
threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount)))); threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
} }
for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
(*thread)->start(); (*thread)->start();
} }
{Synchronized s(*monitor); {
Synchronized s(*monitor);
while(*activeCount > 0) { while (*activeCount > 0) {
monitor->wait(1000); monitor->wait(1000);
} }
} }
for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
threads.erase(*thread); threads.erase(*thread);
} }
@ -131,32 +130,27 @@ public:
STOPPED STOPPED
}; };
SynchStartTask(Monitor& monitor, SynchStartTask(Monitor& monitor, volatile STATE& state) :
volatile STATE& state) :
_monitor(monitor), _monitor(monitor),
_state(state) { _state(state) {}
}
void run() { void run() {
{
{Synchronized s(_monitor); Synchronized s(_monitor);
if (_state == SynchStartTask::STARTING) {
if(_state == SynchStartTask::STARTING) {
_state = SynchStartTask::STARTED; _state = SynchStartTask::STARTED;
_monitor.notify(); _monitor.notify();
} }
} }
{Synchronized s(_monitor); {
Synchronized s(_monitor);
while(_state == SynchStartTask::STARTED) { while (_state == SynchStartTask::STARTED) {
_monitor.wait(); _monitor.wait();
} }
if(_state == SynchStartTask::STOPPING) { if (_state == SynchStartTask::STOPPING) {
_state = SynchStartTask::STOPPED; _state = SynchStartTask::STOPPED;
_monitor.notifyAll(); _monitor.notifyAll();
} }
} }
@ -179,34 +173,35 @@ public:
shared_ptr<Thread> thread = threadFactory.newThread(task); shared_ptr<Thread> thread = threadFactory.newThread(task);
if(state == SynchStartTask::UNINITIALIZED) { if (state == SynchStartTask::UNINITIALIZED) {
state = SynchStartTask::STARTING; state = SynchStartTask::STARTING;
thread->start(); thread->start();
} }
{Synchronized s(monitor); {
Synchronized s(monitor);
while(state == SynchStartTask::STARTING) { while (state == SynchStartTask::STARTING) {
monitor.wait(); monitor.wait();
} }
} }
assert(state != SynchStartTask::STARTING); assert(state != SynchStartTask::STARTING);
{Synchronized s(monitor); {
Synchronized s(monitor);
monitor.wait(100); monitor.wait(100);
if(state == SynchStartTask::STARTED) { if (state == SynchStartTask::STARTED) {
state = SynchStartTask::STOPPING; state = SynchStartTask::STOPPING;
monitor.notify(); monitor.notify();
} }
while(state == SynchStartTask::STOPPING) { while (state == SynchStartTask::STOPPING) {
monitor.wait(); monitor.wait();
} }
} }
@ -228,8 +223,9 @@ public:
long long startTime = Util::currentTime(); long long startTime = Util::currentTime();
for(size_t ix = 0; ix < count; ix++) { for (size_t ix = 0; ix < count; ix++) {
{Synchronized s(monitor); {
Synchronized s(monitor);
monitor.wait(timeout); monitor.wait(timeout);
} }
} }
@ -238,7 +234,7 @@ public:
double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout); double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
if(error < 0.0) { if (error < 0.0) {
error *= 1.0; error *= 1.0;
} }

View File

@ -14,11 +14,12 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test {
using namespace facebook::thrift::concurrency; using namespace facebook::thrift::concurrency;
/** ThreadManagerTests class /**
* ThreadManagerTests class
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class ThreadManagerTests { class ThreadManagerTests {
public: public:
@ -39,8 +40,8 @@ public:
_startTime = Util::currentTime(); _startTime = Util::currentTime();
{Synchronized s(_sleep); {
Synchronized s(_sleep);
_sleep.wait(_timeout); _sleep.wait(_timeout);
} }
@ -49,13 +50,14 @@ public:
_done = true; _done = true;
{Synchronized s(_monitor); {
Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl; // std::cout << "Thread " << _count << " completed " << std::endl;
_count--; _count--;
if(_count == 0) { if (_count == 0) {
_monitor.notify(); _monitor.notify();
} }
@ -71,9 +73,11 @@ public:
Monitor _sleep; Monitor _sleep;
}; };
/** Dispatch count tasks, each of which blocks for timeout milliseconds then completes. /**
Verify that all tasks completed and that thread manager cleans up properly on delete. */ * Dispatch count tasks, each of which blocks for timeout milliseconds then
* completes. Verify that all tasks completed and that thread manager cleans
* up properly on delete.
*/
bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) { bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
Monitor monitor; Monitor monitor;
@ -92,19 +96,20 @@ public:
std::set<shared_ptr<ThreadManagerTests::Task> > tasks; std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for(size_t ix = 0; ix < count; ix++) { for (size_t ix = 0; ix < count; ix++) {
tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout))); tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
} }
long long time00 = Util::currentTime(); long long time00 = Util::currentTime();
for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
threadManager->add(*ix); threadManager->add(*ix);
} }
{Synchronized s(monitor); {
Synchronized s(monitor);
while(activeCount > 0) { while(activeCount > 0) {
@ -121,7 +126,7 @@ public:
long long minTime = 9223372036854775807LL; long long minTime = 9223372036854775807LL;
long long maxTime = 0; long long maxTime = 0;
for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
shared_ptr<ThreadManagerTests::Task> task = *ix; shared_ptr<ThreadManagerTests::Task> task = *ix;
@ -129,19 +134,19 @@ public:
assert(delta > 0); assert(delta > 0);
if(task->_startTime < firstTime) { if (task->_startTime < firstTime) {
firstTime = task->_startTime; firstTime = task->_startTime;
} }
if(task->_endTime > lastTime) { if (task->_endTime > lastTime) {
lastTime = task->_endTime; lastTime = task->_endTime;
} }
if(delta < minTime) { if (delta < minTime) {
minTime = delta; minTime = delta;
} }
if(delta > maxTime) { if (delta > maxTime) {
maxTime = delta; maxTime = delta;
} }
@ -156,7 +161,7 @@ public:
double error = ((time01 - time00) - expectedTime) / expectedTime; double error = ((time01 - time00) - expectedTime) / expectedTime;
if(error < 0) { if (error < 0) {
error*= -1.0; error*= -1.0;
} }

View File

@ -10,11 +10,12 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test {
using namespace facebook::thrift::concurrency; using namespace facebook::thrift::concurrency;
/** ThreadManagerTests class /**
* ThreadManagerTests class
@author marc *
@version $Id:$ */ * @author marc
* @version $Id:$
*/
class TimerManagerTests { class TimerManagerTests {
public: public:
@ -22,7 +23,6 @@ class TimerManagerTests {
static const double ERROR; static const double ERROR;
class Task: public Runnable { class Task: public Runnable {
public: public:
Task(Monitor& monitor, long long timeout) : Task(Monitor& monitor, long long timeout) :
@ -32,7 +32,7 @@ class TimerManagerTests {
_success(false), _success(false),
_done(false) {} _done(false) {}
~Task() {std::cerr << this << std::endl;} ~Task() { std::cerr << this << std::endl; }
void run() { void run() {
@ -60,8 +60,6 @@ class TimerManagerTests {
} }
} }
long long _timeout; long long _timeout;
long long _startTime; long long _startTime;
long long _endTime; long long _endTime;
@ -70,10 +68,12 @@ class TimerManagerTests {
bool _done; bool _done;
}; };
/** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that /**
the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its * This test creates two tasks and waits for the first to expire within 10%
destructor is called. */ * of the expected expiration time. It then verifies that the timer manager
* properly clean up itself and the remaining orphaned timeout task when the
* manager goes out of scope and its destructor is called.
*/
bool test00(long long timeout=1000LL) { bool test00(long long timeout=1000LL) {
shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout)); shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
@ -90,7 +90,8 @@ class TimerManagerTests {
shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout)); shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
{Synchronized s(_monitor); {
Synchronized s(_monitor);
timerManager.add(orphanTask, 10 * timeout); timerManager.add(orphanTask, 10 * timeout);

View File

@ -1,5 +1,5 @@
#ifndef T_BINARY_PROTOCOL_H #ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_
#define T_BINARY_PROTOCOL_H #define _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_ 1
#include "TProtocol.h" #include "TProtocol.h"
@ -147,5 +147,6 @@ using namespace boost;
}}} // facebook::thrift::protocol }}} // facebook::thrift::protocol
#endif #endif // #ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_

View File

@ -1,5 +1,5 @@
#ifndef T_PROTOCOL_H #ifndef _THRIFT_PROTOCOL_TPROTOCOL_H_
#define T_PROTOCOL_H #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
#include <transport/TTransport.h> #include <transport/TTransport.h>
@ -20,7 +20,7 @@ using namespace facebook::thrift::transport;
#define htonll(x) ntohll(x) #define htonll(x) ntohll(x)
/** Forward declaration for TProtocol */ // Forward declaration for TProtocol
struct TBuf; struct TBuf;
/** /**
@ -49,7 +49,8 @@ enum TType {
}; };
/** /**
* Enumerated definition of the message types that the Thrift protocol supports. * Enumerated definition of the message types that the Thrift protocol
* supports.
*/ */
enum TMessageType { enum TMessageType {
T_CALL = 1, T_CALL = 1,
@ -297,5 +298,4 @@ class TProtocol {
}}} // facebook::thrift::protocol }}} // facebook::thrift::protocol
#endif #endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1

View File

@ -1,5 +1,5 @@
#ifndef T_BUFFERED_TRANSPORT_H #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
#define T_BUFFERED_TRANSPORT_H #define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1
#include "TTransport.h" #include "TTransport.h"
#include <string> #include <string>
@ -84,4 +84,4 @@ class TBufferedTransport : public TTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_

View File

@ -1,5 +1,5 @@
#ifndef T_CHUNKED_TRANSPORT_H #ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_
#define T_CHUNKED_TRANSPORT_H #define _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_ 1
#include "TTransport.h" #include "TTransport.h"
#include <string> #include <string>
@ -80,4 +80,4 @@ class TChunkedTransport : public TTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_

View File

@ -1,5 +1,5 @@
#ifndef T_NULL_TRANSPORT #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
#define T_NULL_TRANSPORT #define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1
#include "TTransport.h" #include "TTransport.h"
@ -25,4 +25,4 @@ class TNullTransport : public TTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_

View File

@ -1,5 +1,5 @@
#ifndef T_SERVER_SOCKET_H #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
#define T_SERVER_SOCKET_H #define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
#include "TServerTransport.h" #include "TServerTransport.h"
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
@ -34,4 +34,4 @@ class TServerSocket : public TServerTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_

View File

@ -1,5 +1,5 @@
#ifndef T_SERVER_TRANSPORT_H #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
#define T_SERVER_TRANSPORT_H #define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1
#include "TTransport.h" #include "TTransport.h"
#include "TTransportException.h" #include "TTransportException.h"
@ -65,4 +65,4 @@ class TServerTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_

View File

@ -1,5 +1,5 @@
#ifndef T_SOCKET_H #ifndef _THRIFT_TRANSPORT_TSOCKET_H_
#define T_SOCKET_H #define _THRIFT_TRANSPORT_TSOCKET_H_ 1
#include <string> #include <string>
@ -100,4 +100,6 @@ class TSocket : public TTransport {
}; };
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif
#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_

View File

@ -1,5 +1,5 @@
#ifndef T_TRANSPORT_H #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
#define T_TRANSPORT_H #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
#include "TTransportException.h" #include "TTransportException.h"
#include <string> #include <string>
@ -98,4 +98,4 @@ class TTransport {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_

View File

@ -1,5 +1,5 @@
#ifndef T_TRANSPORT_EXCEPTION_H #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_
#define T_TRANSPORT_EXCEPTION_H #define _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ 1
#include <string> #include <string>
@ -64,4 +64,4 @@ class TTransportException {
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_

View File

@ -248,13 +248,12 @@ class TestServer : public ThriftTestServer {
printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str()); printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str());
if(arg0.compare("Xception") == 0) { if (arg0.compare("Xception") == 0) {
Xception e; Xception e;
e.errorCode = 1001; e.errorCode = 1001;
e.message = "This is an Xception"; e.message = "This is an Xception";
throw e; throw e;
} else if (arg0.compare("Xception2") == 0) {
} else if(arg0.compare("Xception2") == 0) {
Xception2 e; Xception2 e;
e.errorCode = 2002; e.errorCode = 2002;
e.struct_thing.string_thing = "This is an Xception2"; e.struct_thing.string_thing = "This is an Xception2";
@ -287,15 +286,11 @@ int main(int argc, char **argv) {
map<string, string> args; map<string, string> args;
for(int ix = 1; ix < argc; ix++) { for (int ix = 1; ix < argc; ix++) {
string arg(argv[ix]); string arg(argv[ix]);
if (arg.compare(0,2, "--") == 0) {
if(arg.compare(0,2, "--") == 0) {
size_t end = arg.find_first_of("=", 2); size_t end = arg.find_first_of("=", 2);
if (end != string::npos) {
if(end != string::npos) {
args[string(arg, 2, end - 2)] = string(arg, end + 1); args[string(arg, 2, end - 2)] = string(arg, end + 1);
} else { } else {
args[string(arg, 2, end - 2)] = "true"; args[string(arg, 2, end - 2)] = "true";
@ -308,40 +303,35 @@ int main(int argc, char **argv) {
try { try {
if(!args["port"].empty()) { if (!args["port"].empty()) {
port = atoi(args["port"].c_str()); port = atoi(args["port"].c_str());
} }
if(!args["server-type"].empty()) { if (!args["server-type"].empty()) {
serverType = args["server-type"]; serverType = args["server-type"];
if (serverType == "simple") {
if(serverType == "simple") { } else if (serverType == "thread-pool") {
} else if(serverType == "thread-pool") {
} else { } else {
throw invalid_argument("Unknown server type "+serverType); throw invalid_argument("Unknown server type "+serverType);
} }
} }
if(!args["protocol-type"].empty()) { if (!args["protocol-type"].empty()) {
protocolType = args["protocol-type"]; protocolType = args["protocol-type"];
if (protocolType == "binary") {
if(protocolType == "binary") { } else if (protocolType == "ascii") {
} else if(protocolType == "ascii") {
throw invalid_argument("ASCII protocol not supported"); throw invalid_argument("ASCII protocol not supported");
} else if(protocolType == "xml") { } else if (protocolType == "xml") {
throw invalid_argument("XML protocol not supported"); throw invalid_argument("XML protocol not supported");
} else { } else {
throw invalid_argument("Unknown protocol type "+protocolType); throw invalid_argument("Unknown protocol type "+protocolType);
} }
} }
if(!args["workers"].empty()) { if (!args["workers"].empty()) {
workerCount = atoi(args["workers"].c_str()); workerCount = atoi(args["workers"].c_str());
} }
} catch(exception& e) { } catch (exception& e) {
cerr << e.what() << endl; cerr << e.what() << endl;
cerr << usage; cerr << usage;
} }
@ -369,9 +359,11 @@ int main(int argc, char **argv) {
} else if (serverType == "thread-pool") { } else if (serverType == "thread-pool") {
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); shared_ptr<ThreadManager> threadManager =
ThreadManager::newSimpleThreadManager(workerCount);
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); shared_ptr<PosixThreadFactory> threadFactory =
shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadManager->threadFactory(threadFactory); threadManager->threadFactory(threadFactory);