diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index ce2753934..4cbcd6501 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -1,5 +1,5 @@ -#ifndef T_PROCESSOR_H -#define T_PROCESSOR_H +#ifndef _THRIFT_TPROCESSOR_H_ +#define _THRIFT_TPROCESSOR_H_ 1 #include #include @@ -30,4 +30,4 @@ class TProcessor { }} // facebook::thrift -#endif +#endif // #ifndef _THRIFT_PROCESSOR_H_ diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h index 410d20ebd..38226dcc7 100644 --- a/lib/cpp/src/Thrift.h +++ b/lib/cpp/src/Thrift.h @@ -1,5 +1,5 @@ -#ifndef THRIFT_H -#define THRIFT_H +#ifndef _THRIFT_THRIFT_H_ +#define _THRIFT_THRIFT_H_ 1 #include #include @@ -24,4 +24,4 @@ public: }} // facebook::thrift -#endif +#endif // #ifndef _THRIFT_THRIFT_H_ diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h index 5907f0285..67eb1530f 100644 --- a/lib/cpp/src/concurrency/Exception.h +++ b/lib/cpp/src/concurrency/Exception.h @@ -1,5 +1,5 @@ -#if !defined(_concurrency_Exception_h_) -#define _concurrency_Exception_h_ 1 +#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ +#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1 #include @@ -17,4 +17,4 @@ class TimedOutException : public std::exception {}; }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_Exception_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index 7493ec398..57532a3c9 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -9,125 +9,97 @@ #include - namespace facebook { namespace thrift { namespace concurrency { -/** Monitor implementation using the POSIX pthread library - - @author marc - @version $Id:$ */ - +/** + * Monitor implementation using the POSIX pthread library + * + * @author marc + * @version $Id:$ + */ class Monitor::Impl { public: - Impl() : + Impl() : mutexInitialized(false), condInitialized(false) { try { - assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); - mutexInitialized = true; - assert(pthread_cond_init(&_pthread_cond, NULL) == 0); - condInitialized = true; - } catch(...) { cleanup(); } } - ~Impl() {cleanup();} + ~Impl() { cleanup(); } - void lock() const {pthread_mutex_lock(&_pthread_mutex);} + void lock() const { pthread_mutex_lock(&_pthread_mutex); } - void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} + void unlock() const { pthread_mutex_unlock(&_pthread_mutex); } void wait(long long timeout) const { // XXX Need to assert that caller owns mutex - assert(timeout >= 0LL); - - if(timeout == 0LL) { - + if (timeout == 0LL) { assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0); - } else { - struct timespec abstime; - long long now = Util::currentTime(); - Util::toTimespec(abstime, now + timeout); - - int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); - - if(result == ETIMEDOUT) { - + int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); + if (result == ETIMEDOUT) { assert(Util::currentTime() >= (now + timeout)); } } } void notify() { - // XXX Need to assert that caller owns mutex - assert(pthread_cond_signal(&_pthread_cond) == 0); } void notifyAll() { - // XXX Need to assert that caller owns mutex - assert(pthread_cond_broadcast(&_pthread_cond) == 0); } -private: + private: void cleanup() { - - if(mutexInitialized) { - + if (mutexInitialized) { mutexInitialized = false; - assert(pthread_mutex_destroy(&_pthread_mutex) == 0); } - if(condInitialized) { - + if (condInitialized) { condInitialized = false; - assert(pthread_cond_destroy(&_pthread_cond) == 0); } } mutable pthread_mutex_t _pthread_mutex; - mutable bool mutexInitialized; - mutable pthread_cond_t _pthread_cond; - mutable bool condInitialized; }; Monitor::Monitor() : _impl(new Monitor::Impl()) {} -Monitor::~Monitor() { delete _impl;} +Monitor::~Monitor() { delete _impl; } -void Monitor::lock() const {_impl->lock();} +void Monitor::lock() const { _impl->lock(); } -void Monitor::unlock() const {_impl->unlock();} +void Monitor::unlock() const { _impl->unlock(); } -void Monitor::wait(long long timeout) const {_impl->wait(timeout);} +void Monitor::wait(long long timeout) const { _impl->wait(timeout); } -void Monitor::notify() const {_impl->notify();} +void Monitor::notify() const { _impl->notify(); } -void Monitor::notifyAll() const {_impl->notifyAll();} +void Monitor::notifyAll() const { _impl->notifyAll(); } }}} // facebook::thrift::concurrency - diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h index 13dec185e..62a834461 100644 --- a/lib/cpp/src/concurrency/Monitor.h +++ b/lib/cpp/src/concurrency/Monitor.h @@ -1,18 +1,22 @@ -#if !defined(_concurrency_Monitor_h_) -#define _concurrency_Monitor_h_ 1 +#ifndef _THRIFT_CONCURRENCY_MONITOR_H_ +#define _THRIFT_CONCURRENCY_MONITOR_H_ 1 namespace facebook { namespace thrift { namespace concurrency { -/** A monitor is a combination mutex and condition-event. Waiting and notifying condition events requires that the caller own the mutex. Mutex - lock and unlock operations can be performed independently of condition events. This is more or less analogous to java.lang.Object multi-thread - operations - - Note that all methods are const. Monitors implement logical constness, not bit constness. This allows const methods to call monitor - methods without needing to cast away constness or change to non-const signatures. - - @author marc - @version $Id:$ */ - +/** + * A monitor is a combination mutex and condition-event. Waiting and + * notifying condition events requires that the caller own the mutex. Mutex + * lock and unlock operations can be performed independently of condition + * events. This is more or less analogous to java.lang.Object multi-thread + * operations + * + * Note that all methods are const. Monitors implement logical constness, not + * bit constness. This allows const methods to call monitor methods without + * needing to cast away constness or change to non-const signatures. + * + * @author marc + * @version $Id:$ + */ class Monitor { public: @@ -56,4 +60,4 @@ class Synchronized { }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_Monitor_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_ diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc index 8282e73f8..416341e11 100644 --- a/lib/cpp/src/concurrency/Mutex.cc +++ b/lib/cpp/src/concurrency/Mutex.cc @@ -3,41 +3,42 @@ #include #include -/** Implementation of Mutex class using POSIX mutex - - @author marc - @version $Id:$ */ - namespace facebook { namespace thrift { namespace concurrency { +/** + * Implementation of Mutex class using POSIX mutex + * + * @author marc + * @version $Id:$ + */ class Mutex::impl { -public: + public: impl() : initialized(false) { assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); initialized = true; } ~impl() { - if(initialized) { + if (initialized) { initialized = false; assert(pthread_mutex_destroy(&_pthread_mutex) == 0); } } - void lock() const {pthread_mutex_lock(&_pthread_mutex);} + void lock() const { pthread_mutex_lock(&_pthread_mutex); } - void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} + void unlock() const { pthread_mutex_unlock(&_pthread_mutex); } -private: + private: mutable pthread_mutex_t _pthread_mutex; mutable bool initialized; }; Mutex::Mutex() : _impl(new Mutex::impl()) {} -void Mutex::lock() const {_impl->lock();} +void Mutex::lock() const { _impl->lock(); } -void Mutex::unlock() const {_impl->unlock();} +void Mutex::unlock() const { _impl->unlock(); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index 20d4c0b0d..9eceb49d1 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -1,39 +1,31 @@ -#if !defined(_concurrency_mutex_h_) -#define _concurrency_mutex_h_ 1 +#ifndef _THRIFT_CONCURRENCY_MUTEX_H_ +#define _THRIFT_CONCURRENCY_MUTEX_H_ 1 namespace facebook { namespace thrift { namespace concurrency { -/** A simple mutex class - - @author marc - @version $Id:$ */ - +/** + * A simple mutex class + * + * @author marc + * @version $Id:$ + */ class Mutex { - public: - Mutex(); - virtual ~Mutex() {} - virtual void lock() const; - virtual void unlock() const; private: - class impl; - impl* _impl; }; class MutexMonitor { - public: - + public: MutexMonitor(const Mutex& value) : _mutex(value) { _mutex.lock(); } - ~MutexMonitor() { _mutex.unlock(); } @@ -45,4 +37,4 @@ class MutexMonitor { }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_mutex_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_ diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index f07db86a2..5df87ec2b 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -11,40 +11,36 @@ namespace facebook { namespace thrift { namespace concurrency { using namespace boost; -/** The POSIX thread class. - - @author marc - @version $Id:$ */ - +/** + * The POSIX thread class. + * + * @author marc + * @version $Id:$ + */ class PthreadThread: public Thread { + public: -public: - enum STATE {uninitialized, - starting, - started, - stopping, - stopped + enum STATE { + uninitialized, + starting, + started, + stopping, + stopped }; static const int MB = 1024 * 1024; static void* threadMain(void* arg); -private: - + private: pthread_t _pthread; - STATE _state; - int _policy; - int _priority; - int _stackSize; - weak_ptr _self; -public: + public: PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : _pthread(0), @@ -56,58 +52,47 @@ public: this->Thread::runnable(runnable); } - ~PthreadThread() { - } + ~PthreadThread() {} void start() { - - if(_state != uninitialized) { + if (_state != uninitialized) { return; } _state = starting; pthread_attr_t thread_attr; - assert(pthread_attr_init(&thread_attr) == 0); - assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); // Set thread stack size - assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); // Set thread policy - assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); struct sched_param sched_param; sched_param.sched_priority = _priority; // Set thread priority - assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); + // Create reference shared_ptr* selfRef = new shared_ptr(); - *selfRef = _self.lock(); - assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); } void join() { - - if(_state != stopped) { - + if (_state != stopped) { void* ignore; - pthread_join(_pthread, &ignore); } } - shared_ptr runnable() const {return Thread::runnable();} + shared_ptr runnable() const { return Thread::runnable(); } - void runnable(shared_ptr value) {Thread::runnable(value);} + void runnable(shared_ptr value) { Thread::runnable(value); } void weakRef(shared_ptr self) { assert(self.get() == this); @@ -117,46 +102,41 @@ public: void* PthreadThread::threadMain(void* arg) { // XXX need a lock here when testing thread state - shared_ptr thread = *(shared_ptr*)arg; - delete reinterpret_cast*>(arg); - if(thread == NULL) { + if (thread == NULL) { return (void*)0; } - if(thread->_state != starting) { + if (thread->_state != starting) { return (void*)0; } thread->_state = starting; - thread->runnable()->run(); - - if(thread->_state != stopping && thread->_state != stopped) { + if (thread->_state != stopping && thread->_state != stopped) { thread->_state = stopping; } return (void*)0; } -/** POSIX Thread factory implementation */ - +/** + * POSIX Thread factory implementation + */ class PosixThreadFactory::Impl { -private: - + private: POLICY _policy; - PRIORITY _priority; - int _stackSize; - bool _detached; - /** Converts generic posix thread schedule policy enums into pthread API values. */ - + /** + * Converts generic posix thread schedule policy enums into pthread + * API values. + */ static int toPthreadPolicy(POLICY policy) { switch(policy) { case OTHER: return SCHED_OTHER; break; @@ -166,83 +146,76 @@ private: } } - /** Converts relative thread priorities to absolute value based on posix thread scheduler policy - - The idea is simply to divide up the priority range for the given policy into the correpsonding relative - priority level (lowest..highest) and then pro-rate accordingly. */ - + /** + * Converts relative thread priorities to absolute value based on posix + * thread scheduler policy + * + * The idea is simply to divide up the priority range for the given policy + * into the correpsonding relative priority level (lowest..highest) and + * then pro-rate accordingly. + */ static int toPthreadPriority(POLICY policy, PRIORITY priority) { - int pthread_policy = toPthreadPolicy(policy); - int min_priority = sched_get_priority_min(pthread_policy); - int max_priority = sched_get_priority_max(pthread_policy); - int quanta = (HIGHEST - LOWEST) + 1; - float stepsperquanta = (max_priority - min_priority) / quanta; if(priority <= HIGHEST) { - return (int)(min_priority + stepsperquanta * priority); } else { - // should never get here for priority increments. - assert(false); - return (int)(min_priority + stepsperquanta * NORMAL); } } -public: + public: Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : _policy(policy), _priority(priority), _stackSize(stackSize), - _detached(detached) { - } - - /** Creates a new POSIX thread to run the runnable object - - @param runnable A runnable object */ + _detached(detached) {} + /** + * Creates a new POSIX thread to run the runnable object + * + * @param runnable A runnable object + */ shared_ptr newThread(shared_ptr runnable) const { - shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); result->weakRef(result); runnable->thread(result); return result; } - int stackSize() const { return _stackSize;} + int stackSize() const { return _stackSize; } - void stackSize(int value) { _stackSize = value;} + void stackSize(int value) { _stackSize = value; } - PRIORITY priority() const { return _priority;} - - /** Sets priority. - - XXX - Need to handle incremental priorities properly. */ - - void priority(PRIORITY value) { _priority = value;} + PRIORITY priority() const { return _priority; } + /** + * Sets priority. + * + * XXX + * Need to handle incremental priorities properly. + */ + void priority(PRIORITY value) { _priority = value; } }; PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} -shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const {return _impl->newThread(runnable);} +shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return _impl->newThread(runnable); } -int PosixThreadFactory::stackSize() const {return _impl->stackSize();} +int PosixThreadFactory::stackSize() const { return _impl->stackSize(); } -void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);} +void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); } -PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();} +PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); } -void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);} +void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index 0095cf815..a56999c80 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -1,5 +1,5 @@ -#if !defined(_concurrency_PosixThreadFactory_h_) -#define _concurrency_PosixThreadFactory_h_ 1 +#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1 #include "Thread.h" @@ -9,28 +9,33 @@ namespace facebook { namespace thrift { namespace concurrency { using namespace boost; -/** A thread factory to create posix threads - - @author marc - @version $Id:$ */ - +/** + * A thread factory to create posix threads + * + * @author marc + * @version $Id:$ + */ class PosixThreadFactory : public ThreadFactory { public: - /** POSIX Thread scheduler policies */ - + /** + * POSIX Thread scheduler policies + */ enum POLICY { OTHER, FIFO, ROUND_ROBIN }; - /** POSIX Thread scheduler relative priorities, - - Absolute priority is determined by scheduler policy and OS. This enumeration specifies relative priorities such that one can - specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */ - + /** + * POSIX Thread scheduler relative priorities, + * + * Absolute priority is determined by scheduler policy and OS. This + * enumeration specifies relative priorities such that one can specify a + * priority withing a giving scheduler policy without knowing the absolute + * value of the priority. + */ enum PRIORITY { LOWEST = 0, LOWER = 1, @@ -46,36 +51,37 @@ class PosixThreadFactory : public ThreadFactory { PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false); // From ThreadFactory; - shared_ptr newThread(shared_ptr runnable) const; - /** Sets stack size for created threads - - @param value size in megabytes */ - + /** + * Sets stack size for created threads + * + * @param value size in megabytes + */ virtual void stackSize(int value); - /** Gets stack size for created threads - - @return int size in megabytes */ - + /** + * Gets stack size for created threads + * + * @return int size in megabytes + */ virtual int stackSize() const; - /** Sets priority relative to current policy */ - + /** + * Sets priority relative to current policy + */ virtual void priority(PRIORITY priority); - /** Gets priority relative to current policy */ - + /** + * Gets priority relative to current policy + */ virtual PRIORITY priority() const; private: - class Impl; - shared_ptr _impl; }; }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_PosixThreadFactory_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index ea6b999f1..24d590824 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -1,5 +1,5 @@ -#if !defined(_concurrency_Thread_h_) -#define _concurrency_Thread_h_ 1 +#ifndef _THRIFT_CONCURRENCY_THREAD_H_ +#define _THRIFT_CONCURRENCY_THREAD_H_ 1 #include #include @@ -10,77 +10,84 @@ using namespace boost; class Thread; -/** Minimal runnable class. More or less analogous to java.lang.Runnable. - - @author marc - @version $Id:$ */ - +/** + * Minimal runnable class. More or less analogous to java.lang.Runnable. + * + * @author marc + * @version $Id:$ + */ class Runnable { public: - virtual ~Runnable() {}; - virtual void run() = 0; - /** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */ + /** + * Gets the thread object that is hosting this runnable object - can return + * an empty shared pointer if no references remain on thet thread object + */ + virtual shared_ptr thread() { return _thread.lock(); } - virtual shared_ptr thread() {return _thread.lock();} - - /** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */ - - virtual void thread(shared_ptr value) {_thread = value;} + /** + * Sets the thread that is executing this object. This is only meant for + * use by concrete implementations of Thread. + */ + virtual void thread(shared_ptr value) { _thread = value; } private: - weak_ptr _thread; }; -/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread - (minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific - ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */ - - +/** + * Minimal thread class. Returned by thread factory bound to a Runnable object + * and ready to start execution. More or less analogous to java.lang.Thread + * (minus all the thread group, priority, mode and other baggage, since that + * is difficult to abstract across platforms and is left for platform-specific + * ThreadFactory implemtations to deal with + * + * @see facebook::thrift::concurrency::ThreadFactory) + */ class Thread { public: - virtual ~Thread() {}; - /** Starts the thread. Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this - thread. */ - + /** + * Starts the thread. Does platform specific thread creation and + * configuration then invokes the run method of the Runnable object bound + * to this thread. + */ virtual void start() = 0; - /** Join this thread - - Current thread blocks until this target thread completes. */ - + /** + * Join this thread. Current thread blocks until this target thread + * completes. + */ virtual void join() = 0; - /** Gets the runnable object this thread is hosting */ - - virtual shared_ptr runnable() const {return _runnable;} + /** + * Gets the runnable object this thread is hosting + */ + virtual shared_ptr runnable() const { return _runnable; } protected: - - virtual void runnable(shared_ptr value) {_runnable = value;} + virtual void runnable(shared_ptr value) { _runnable = value; } private: shared_ptr _runnable; }; -/** Factory to create platform-specific thread object and bind them to Runnable object for execution */ - +/** + * Factory to create platform-specific thread object and bind them to Runnable + * object for execution + */ class ThreadFactory { public: - virtual ~ThreadFactory() {} - virtual shared_ptr newThread(shared_ptr runnable) const = 0; }; }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_Thread_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_ diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index a5b8f05b4..7d6fef75a 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -17,48 +17,41 @@ namespace facebook { namespace thrift { namespace concurrency { using namespace boost; -/** ThreadManager class - - This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather - it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times. - - @author marc - @version $Id:$ */ - +/** + * ThreadManager class + * + * This class manages a pool of threads. It uses a ThreadFactory to create + * threads. It never actually creates or destroys worker threads, rather + * it maintains statistics on number of idle threads, number of active threads, + * task backlog, and average wait and service times. + * + * @author marc + * @version $Id:$ + */ class ThreadManager::Impl : public ThreadManager { public: - Impl() : _workerCount(0), _workerMaxCount(0), _idleCount(0), - _state(ThreadManager::UNINITIALIZED) - {} + _state(ThreadManager::UNINITIALIZED) {} - ~Impl() { - stop(); - } + ~Impl() { stop(); } void start(); void stop(); - const ThreadManager::STATE state() const { - return _state; - }; + const ThreadManager::STATE state() const { return _state; } shared_ptr threadFactory() const { - Synchronized s(_monitor); - return _threadFactory; } - void threadFactory(shared_ptr value) { - - Synchronized s(_monitor); - + void threadFactory(shared_ptr value) { + Synchronized s(_monitor); _threadFactory = value; } @@ -66,26 +59,20 @@ class ThreadManager::Impl : public ThreadManager { void removeWorker(size_t value); - size_t idleWorkerCount() const {return _idleCount;} + size_t idleWorkerCount() const { return _idleCount; } size_t workerCount() const { - Synchronized s(_monitor); - return _workerCount; } size_t pendingTaskCount() const { - Synchronized s(_monitor); - return _tasks.size(); } size_t totalTaskCount() const { - - Synchronized s(_monitor); - + Synchronized s(_monitor); return _tasks.size() + _workerCount - _idleCount; } @@ -94,35 +81,26 @@ class ThreadManager::Impl : public ThreadManager { void remove(shared_ptr task); private: - size_t _workerCount; - size_t _workerMaxCount; - size_t _idleCount; - ThreadManager::STATE _state; - shared_ptr _threadFactory; + friend class ThreadManager::Task; - std::queue > _tasks; - Monitor _monitor; - Monitor _workerMonitor; friend class ThreadManager::Worker; - std::set > _workers; - std::set > _deadWorkers; }; class ThreadManager::Task : public Runnable { -public: + public: enum STATE { WAITING, EXECUTING, @@ -132,29 +110,24 @@ public: Task(shared_ptr runnable) : _runnable(runnable), - _state(WAITING) - {} + _state(WAITING) {} - ~Task() {}; + ~Task() {} void run() { - if(_state == EXECUTING) { + if (_state == EXECUTING) { _runnable->run(); _state = COMPLETE; } } private: - shared_ptr _runnable; - friend class ThreadManager::Worker; - STATE _state; }; class ThreadManager::Worker: public Runnable { - enum STATE { UNINITIALIZED, STARTING, @@ -164,177 +137,139 @@ class ThreadManager::Worker: public Runnable { }; public: - Worker(ThreadManager::Impl* manager) : _manager(manager), _state(UNINITIALIZED), - _idle(false) - {} + _idle(false) {} ~Worker() {} - bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;} - - /** Worker entry point - - As long as worker thread is running, pull tasks off the task queue and execute. */ + bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; } + /** + * Worker entry point + * + * As long as worker thread is running, pull tasks off the task queue and + * execute. + */ void run() { - - bool active = false; - + bool active = false; bool notifyManager = false; - /** Increment worker semaphore and notify manager if worker count reached desired max - - Note - We have to release the monitor and acquire the workerMonitor since that is what the manager - blocks on for worker add/remove */ - - {Synchronized s(_manager->_monitor); - + /** + * Increment worker semaphore and notify manager if worker count reached + * desired max + * + * Note: We have to release the monitor and acquire the workerMonitor + * since that is what the manager blocks on for worker add/remove + */ + { + Synchronized s(_manager->_monitor); active = _manager->_workerCount < _manager->_workerMaxCount; - - if(active) { - + if (active) { _manager->_workerCount++; - notifyManager = _manager->_workerCount == _manager->_workerMaxCount; } } - if(notifyManager) { - + if (notifyManager) { Synchronized s(_manager->_workerMonitor); - _manager->_workerMonitor.notify(); - notifyManager = false; } - while(active) { - + while (active) { shared_ptr task; - /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop). - - Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented - such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying - the next blocked thread but eventually the manager will see it. */ - - {Synchronized s(_manager->_monitor); - + /** + * While holding manager monitor block for non-empty task queue (Also + * check that the thread hasn't been requested to stop). Once the queue + * is non-empty, dequeue a task, release monitor, and execute. If the + * worker max count has been decremented such that we exceed it, mark + * ourself inactive, decrement the worker count and notify the manager + * (technically we're notifying the next blocked thread but eventually + * the manager will see it. + */ + { + Synchronized s(_manager->_monitor); active = isActive(); - - while(active && _manager->_tasks.empty()) { - - _manager->_idleCount++; - + while (active && _manager->_tasks.empty()) { + _manager->_idleCount++; _idle = true; - - _manager->_monitor.wait(); - - active = isActive(); - + _manager->_monitor.wait(); + active = isActive(); _idle = false; - - _manager->_idleCount--; + _manager->_idleCount--; } - if(active) { - - if(!_manager->_tasks.empty()) { - - task = _manager->_tasks.front(); - - _manager->_tasks.pop(); - - if(task->_state == ThreadManager::Task::WAITING) { - - task->_state = ThreadManager::Task::EXECUTING; + if (active) { + if (!_manager->_tasks.empty()) { + task = _manager->_tasks.front(); + _manager->_tasks.pop(); + if (task->_state == ThreadManager::Task::WAITING) { + task->_state = ThreadManager::Task::EXECUTING; } } } else { - - _idle = true; - + _idle = true; _manager->_workerCount--; - - notifyManager = _manager->_workerCount == _manager->_workerMaxCount; + notifyManager = _manager->_workerCount == _manager->_workerMaxCount; } } - - if(task != NULL) { - - if(task->_state == ThreadManager::Task::EXECUTING) { + + if (task != NULL) { + if (task->_state == ThreadManager::Task::EXECUTING) { try { - - task->run(); - - } catch(...) { - - // XXX need to log this + task->run(); + } catch(...) { + // XXX need to log this } } } } - - {Synchronized s(_manager->_workerMonitor); - + + { + Synchronized s(_manager->_workerMonitor); _manager->_deadWorkers.insert(this->thread()); - - if(notifyManager) { - - _manager->_workerMonitor.notify(); + if (notifyManager) { + _manager->_workerMonitor.notify(); } } - + return; } - - private: - - ThreadManager::Impl* _manager; - - friend class ThreadManager::Impl; - - STATE _state; - - bool _idle; + + private: + ThreadManager::Impl* _manager; + friend class ThreadManager::Impl; + STATE _state; + bool _idle; }; -void ThreadManager::Impl::addWorker(size_t value) { - + + void ThreadManager::Impl::addWorker(size_t value) { std::set > newThreads; - - for(size_t ix = 0; ix < value; ix++) { - - class ThreadManager::Worker; - + for (size_t ix = 0; ix < value; ix++) { + class ThreadManager::Worker; shared_ptr worker = shared_ptr(new ThreadManager::Worker(this)); - newThreads.insert(_threadFactory->newThread(worker)); } - {Synchronized s(_monitor); - + { + Synchronized s(_monitor); _workerMaxCount+= value; - _workers.insert(newThreads.begin(), newThreads.end()); } - for(std::set >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - + for (std::set >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { shared_ptr worker = dynamic_pointer_cast((*ix)->runnable()); - worker->_state = ThreadManager::Worker::STARTING; - (*ix)->start(); } - {Synchronized s(_workerMonitor); - - while(_workerCount != _workerMaxCount) { + { + Synchronized s(_workerMonitor); + while (_workerCount != _workerMaxCount) { _workerMonitor.wait(); } } @@ -342,133 +277,110 @@ void ThreadManager::Impl::addWorker(size_t value) { void ThreadManager::Impl::start() { - if(_state == ThreadManager::STOPPED) { + if (_state == ThreadManager::STOPPED) { return; } - {Synchronized s(_monitor); - - if(_state == ThreadManager::UNINITIALIZED) { - - if(_threadFactory == NULL) {throw InvalidArgumentException();} - + { + Synchronized s(_monitor); + if (_state == ThreadManager::UNINITIALIZED) { + if (_threadFactory == NULL) { + throw InvalidArgumentException(); + } _state = ThreadManager::STARTED; - _monitor.notifyAll(); } - while(_state == STARTING) { - + while (_state == STARTING) { _monitor.wait(); } } } void ThreadManager::Impl::stop() { - bool doStop = false; - - if(_state == ThreadManager::STOPPED) { + if (_state == ThreadManager::STOPPED) { return; } - {Synchronized s(_monitor); - - if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { - + { + Synchronized s(_monitor); + if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { doStop = true; - _state = ThreadManager::STOPPING; } } - if(doStop) { - + if (doStop) { removeWorker(_workerCount); - _state = ThreadManager::STOPPING; } // XXX - // should be able to block here for transition to STOPPED since we're now using shared_ptrs - + // should be able to block here for transition to STOPPED since we're no + // using shared_ptrs } void ThreadManager::Impl::removeWorker(size_t value) { - std::set > removedThreads; - - {Synchronized s(_monitor); - - if(value > _workerMaxCount) { - + { + Synchronized s(_monitor); + if (value > _workerMaxCount) { throw InvalidArgumentException(); } _workerMaxCount-= value; - if(_idleCount < value) { - - for(size_t ix = 0; ix < _idleCount; ix++) { - + if (_idleCount < value) { + for (size_t ix = 0; ix < _idleCount; ix++) { _monitor.notify(); } } else { - _monitor.notifyAll(); } } - {Synchronized s(_workerMonitor); + { + Synchronized s(_workerMonitor); - while(_workerCount != _workerMaxCount) { + while (_workerCount != _workerMaxCount) { _workerMonitor.wait(); } - for(std::set >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { - - _workers.erase(*ix); - + for (std::set >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { + _workers.erase(*ix); } - + _deadWorkers.clear(); } } void ThreadManager::Impl::add(shared_ptr value) { - Synchronized s(_monitor); - if(_state != ThreadManager::STARTED) { - + if (_state != ThreadManager::STARTED) { throw IllegalStateException(); } _tasks.push(shared_ptr(new ThreadManager::Task(value))); - /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this - task in time. */ - - if(_idleCount > 0) { - + // If idle thread is available notify it, otherwise all worker threads are + // running and will get around to this task in time. + if (_idleCount > 0) { _monitor.notify(); } } void ThreadManager::Impl::remove(shared_ptr task) { - - Synchronized s(_monitor); - - if(_state != ThreadManager::STARTED) { - - throw IllegalStateException(); - } + Synchronized s(_monitor); + if (_state != ThreadManager::STARTED) { + throw IllegalStateException(); + } } class SimpleThreadManager : public ThreadManager::Impl { public: - SimpleThreadManager(size_t workerCount=4) : _workerCount(workerCount), _firstTime(true) { @@ -476,12 +388,10 @@ public: void start() { ThreadManager::Impl::start(); - addWorker(_workerCount); } private: - const size_t _workerCount; bool _firstTime; Monitor _monitor; @@ -497,4 +407,3 @@ shared_ptr ThreadManager::newSimpleThreadManager(size_t count) { } }}} // facebook::thrift::concurrency - diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index f36564359..a90c5d281 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -1,49 +1,59 @@ -#if !defined(_concurrency_ThreadManager_h_) -#define _concurrency_ThreadManager_h_ 1 +#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ +#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1 #include - #include - #include "Thread.h" namespace facebook { namespace thrift { namespace concurrency { using namespace boost; -/** Thread Pool Manager and related classes - - @author marc - @version $Id:$ */ - +/** + * Thread Pool Manager and related classes + * + * @author marc + * @version $Id:$ + */ class ThreadManager; -/** ThreadManager class - - This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather - it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the - PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool - size needs to be adjusted and call this object addWorker and removeWorker methods to make changes. - - This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on - policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */ - +/** + * ThreadManager class + * + * This class manages a pool of threads. It uses a ThreadFactory to create + * threads. It never actually creates or destroys worker threads, rather + * It maintains statistics on number of idle threads, number of active threads, + * task backlog, and average wait and service times and informs the PoolPolicy + * object bound to instances of this manager of interesting transitions. It is + * then up the PoolPolicy object to decide if the thread pool size needs to be + * adjusted and call this object addWorker and removeWorker methods to make + * changes. + * + * This design allows different policy implementations to used this code to + * handle basic worker thread management and worker task execution and focus on + * policy issues. The simplest policy, StaticPolicy, does nothing other than + * create a fixed number of threads. + */ class ThreadManager { public: - ThreadManager() {} virtual ~ThreadManager() {} - /** Starts the thread manager. Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */ - + /** + * Starts the thread manager. Verifies all attributes have been properly + * initialized, then allocates necessary resources to begin operation + */ virtual void start() = 0; - /** Stops the thread manager. Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources. - This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that - won't terminate. */ - + /** + * Stops the thread manager. Aborts all remaining unprocessed task, shuts + * down all created worker threads, and realeases all allocated resources. + * This method blocks for all worker threads to complete, thus it can + * potentially block forever if a worker thread is running a task that + * won't terminate. + */ virtual void stop() = 0; enum STATE { @@ -64,37 +74,43 @@ class ThreadManager { virtual void removeWorker(size_t value=1) = 0; - /** Gets the current number of idle worker threads */ - + /** + * Gets the current number of idle worker threads + */ virtual size_t idleWorkerCount() const = 0; - /** Gets the current number of total worker threads */ - + /** + * Gets the current number of total worker threads + */ virtual size_t workerCount() const = 0; - /** Gets the current number of pending tasks */ - + /** + * Gets the current number of pending tasks + */ virtual size_t pendingTaskCount() const = 0; - /** Gets the current number of pending and executing tasks */ - + /** + * Gets the current number of pending and executing tasks + */ virtual size_t totalTaskCount() const = 0; - /** Adds a task to be execued at some time in the future by a worker thread. - - @param value The task to run */ - - + /** + * Adds a task to be execued at some time in the future by a worker thread. + * + * @param value The task to run + */ virtual void add(shared_ptrvalue) = 0; - /** Removes a pending task */ - + /** + * Removes a pending task + */ virtual void remove(shared_ptr task) = 0; static shared_ptr newThreadManager(); - /** Creates a simple thread manager the uses count number of worker threads */ - + /** + * Creates a simple thread manager the uses count number of worker threads + */ static shared_ptr newSimpleThreadManager(size_t count=4); class Task; @@ -106,4 +122,4 @@ class ThreadManager { }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_ThreadManager_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc index a223a77a9..f48df4ebe 100644 --- a/lib/cpp/src/concurrency/TimerManager.cc +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -8,17 +8,18 @@ namespace facebook { namespace thrift { namespace concurrency { -/** TimerManager class - - @author marc - @version $Id:$ */ - typedef std::multimap >::iterator task_iterator; typedef std::pair task_range; +/** + * TimerManager class + * + * @author marc + * @version $Id:$ + */ class TimerManager::Task : public Runnable { -public: + public: enum STATE { WAITING, EXECUTING, @@ -28,130 +29,101 @@ public: Task(shared_ptr runnable) : _runnable(runnable), - _state(WAITING) - {} + _state(WAITING) {} ~Task() { - std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug -}; + //debug + std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; + } void run() { - if(_state == EXECUTING) { + if (_state == EXECUTING) { _runnable->run(); _state = COMPLETE; } } private: - shared_ptr _runnable; - class TimerManager::Dispatcher; - friend class TimerManager::Dispatcher; - STATE _state; }; class TimerManager::Dispatcher: public Runnable { -public: + public: Dispatcher(TimerManager* manager) : - _manager(manager) { -} + _manager(manager) {} ~Dispatcher() { - std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug + // debug + std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; } - /** Dispatcher entry point - - As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */ - + /** + * Dispatcher entry point + * + * As long as dispatcher thread is running, pull tasks off the task _taskMap + * and execute. + */ void run() { - - {Synchronized s(_manager->_monitor); - - if(_manager->_state == TimerManager::STARTING) { - + { + Synchronized s(_manager->_monitor); + if (_manager->_state == TimerManager::STARTING) { _manager->_state = TimerManager::STARTED; - _manager->_monitor.notifyAll(); } } do { - std::set > expiredTasks; - - {Synchronized s(_manager->_monitor); - + { + Synchronized s(_manager->_monitor); task_iterator expiredTaskEnd; - long long now = Util::currentTime(); - - while(_manager->_state == TimerManager::STARTED && - (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { - + while (_manager->_state == TimerManager::STARTED && + (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { long long timeout = 0LL; - - if(!_manager->_taskMap.empty()) { - - timeout = _manager->_taskMap.begin()->first - now; + if (!_manager->_taskMap.empty()) { + timeout = _manager->_taskMap.begin()->first - now; } - - assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); - - _manager->_monitor.wait(timeout); - + assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); + _manager->_monitor.wait(timeout); now = Util::currentTime(); } - if(_manager->_state == TimerManager::STARTED) { - - for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { - + if (_manager->_state == TimerManager::STARTED) { + for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { shared_ptr task = ix->second; - - expiredTasks.insert(task); - - if(task->_state == TimerManager::Task::WAITING) { - + expiredTasks.insert(task); + if (task->_state == TimerManager::Task::WAITING) { task->_state = TimerManager::Task::EXECUTING; } - - _manager->_taskCount--; + _manager->_taskCount--; } - - _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); + _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); } } - for(std::set >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { - - (*ix)->run(); + for (std::set >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { + (*ix)->run(); } - } while(_manager->_state == TimerManager::STARTED); - - {Synchronized s(_manager->_monitor); - - if(_manager->_state == TimerManager::STOPPING) { + } while (_manager->_state == TimerManager::STARTED); + { + Synchronized s(_manager->_monitor); + if (_manager->_state == TimerManager::STOPPING) { _manager->_state = TimerManager::STOPPED; - _manager->_monitor.notify(); - } } - return; } private: - TimerManager* _manager; - friend class TimerManager; }; @@ -164,141 +136,106 @@ TimerManager::TimerManager() : TimerManager::~TimerManager() { - /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since - stop already takes care of reentrancy. */ - + // If we haven't been explicitly stopped, do so now. We don't need to grab + // the monitor here, since stop already takes care of reentrancy. std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; - if(_state != STOPPED) { - + if (_state != STOPPED) { try { - stop(); - } catch(...) { std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl; throw; - // uhoh - } } } void TimerManager::start() { - bool doStart = false; - - {Synchronized s(_monitor); - - if(_threadFactory == NULL) {throw InvalidArgumentException();} - - if(_state == TimerManager::UNINITIALIZED) { - + { + Synchronized s(_monitor); + if (_threadFactory == NULL) { + throw InvalidArgumentException(); + } + if (_state == TimerManager::UNINITIALIZED) { _state = TimerManager::STARTING; - doStart = true; } } - if(doStart) { - + if (doStart) { _dispatcherThread = _threadFactory->newThread(_dispatcher); - _dispatcherThread->start(); } - {Synchronized s(_monitor); - - while(_state == TimerManager::STARTING) { - + { + Synchronized s(_monitor); + while (_state == TimerManager::STARTING) { _monitor.wait(); } - assert(_state != TimerManager::STARTING); } } void TimerManager::stop() { - bool doStop = false; - - {Synchronized s(_monitor); - - if(_state == TimerManager::UNINITIALIZED) { - + { + Synchronized s(_monitor); + if (_state == TimerManager::UNINITIALIZED) { _state = TimerManager::STOPPED; - - } else if(_state != STOPPING && _state != STOPPED) { - + } else if (_state != STOPPING && _state != STOPPED) { doStop = true; - _state = STOPPING; - _monitor.notifyAll(); } - - while(_state != STOPPED) { - + while (_state != STOPPED) { _monitor.wait(); } } - if(doStop) { - + if (doStop) { // Clean up any outstanding tasks - - for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { - + for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { _taskMap.erase(ix); } // Remove dispatcher's reference to us. - _dispatcher->_manager = NULL; } } shared_ptr TimerManager::threadFactory() const { - Synchronized s(_monitor); - return _threadFactory; } void TimerManager::threadFactory(shared_ptr value) { - Synchronized s(_monitor); - _threadFactory = value; } size_t TimerManager::taskCount() const { - return _taskCount; } void TimerManager::add(shared_ptr task, long long timeout) { - long long now = Util::currentTime(); - timeout += now; - {Synchronized s(_monitor); - - if(_state != TimerManager::STARTED) { + { + Synchronized s(_monitor); + if (_state != TimerManager::STARTED) { throw IllegalStateException(); } _taskCount++; - _taskMap.insert(std::pair >(timeout, shared_ptr(new Task(task)))); - /* If the task map was empty, or if we have an expiration that is earlier than any previously seen, - kick the dispatcher so it can update its timeout */ - - if(_taskCount == 1 || timeout < _taskMap.begin()->first) { - + // If the task map was empty, or if we have an expiration that is earlier + // than any previously seen, kick the dispatcher so it can update its + // timeout + if (_taskCount == 1 || timeout < _taskMap.begin()->first) { _monitor.notify(); } } @@ -306,13 +243,12 @@ void TimerManager::add(shared_ptr task, long long timeout) { void TimerManager::add(shared_ptr task, const struct timespec& value) { - long long expiration; - + long long expiration; Util::toMilliseconds(expiration, value); long long now = Util::currentTime(); - if(expiration < now) { + if (expiration < now) { throw InvalidArgumentException(); } @@ -321,15 +257,13 @@ void TimerManager::add(shared_ptr task, const struct timespec& value) void TimerManager::remove(shared_ptr task) { - {Synchronized s(_monitor); - - if(_state != TimerManager::STARTED) { - throw IllegalStateException(); - } + Synchronized s(_monitor); + if (_state != TimerManager::STARTED) { + throw IllegalStateException(); } } -const TimerManager::STATE TimerManager::state() const { return _state;} +const TimerManager::STATE TimerManager::state() const { return _state; } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h index c0e63408c..50a0c13fe 100644 --- a/lib/cpp/src/concurrency/TimerManager.h +++ b/lib/cpp/src/concurrency/TimerManager.h @@ -1,27 +1,26 @@ -#if !defined(_concurrency_TimerManager_h_) -#define _concurrency_TimerManager_h_ 1 +#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_ +#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1 #include "Exception.h" #include "Monitor.h" #include "Thread.h" #include - #include - #include namespace facebook { namespace thrift { namespace concurrency { using namespace boost; -/** Timer Manager - - This class dispatches timer tasks when they fall due. - - @author marc - @version $Id:$ */ - +/** + * Timer Manager + * + * This class dispatches timer tasks when they fall due. + * + * @author marc + * @version $Id:$ + */ class TimerManager { public: @@ -34,39 +33,46 @@ class TimerManager { virtual void threadFactory(shared_ptr value); - /** Starts the timer manager service - - @throws IllegalArgumentException Missing thread factory attribute */ - + /** + * Starts the timer manager service + * + * @throws IllegalArgumentException Missing thread factory attribute + */ virtual void start(); - /** Stops the timer manager service */ - + /** + * Stops the timer manager service + */ virtual void stop(); virtual size_t taskCount() const ; - /** Adds a task to be executed at some time in the future by a worker thread. - - @param task The task to execute - @param timeout Time in milliseconds to delay before executing task */ - + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Time in milliseconds to delay before executing task + */ virtual void add(shared_ptr task, long long timeout); - /** Adds a task to be executed at some time in the future by a worker thread. - - @param task The task to execute - @param timeout Absolute time in the future to execute task. */ - + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Absolute time in the future to execute task. + */ virtual void add(shared_ptr task, const struct timespec& timeout); - /** Removes a pending task - - @throws NoSuchTaskException Specified task doesn't exist. It was either processed already or this call was made for a task that - was never added to this timer - - @throws UncancellableTaskException Specified task is already being executed or has completed execution. */ - + /** + * Removes a pending task + * + * @throws NoSuchTaskException Specified task doesn't exist. It was either + * processed already or this call was made for a + * task that was never added to this timer + * + * @throws UncancellableTaskException Specified task is already being + * executed or has completed execution. + */ virtual void remove(shared_ptr task); enum STATE { @@ -80,31 +86,19 @@ class TimerManager { virtual const STATE state() const; private: - shared_ptr _threadFactory; - class Task; - friend class Task; - std::multimap > _taskMap; - size_t _taskCount; - Monitor _monitor; - STATE _state; - class Dispatcher; - friend class Dispatcher; - shared_ptr _dispatcher; - shared_ptr _dispatcherThread; - }; }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_TimerManager_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_ diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h index df37471bb..724b69a8c 100644 --- a/lib/cpp/src/concurrency/Util.h +++ b/lib/cpp/src/concurrency/Util.h @@ -1,5 +1,5 @@ -#if !defined(_concurrency_Util_h_) -#define _concurrency_Util_h_ 1 +#ifndef _THRIFT_CONCURRENCY_UTIL_H_ +#define _THRIFT_CONCURRENCY_UTIL_H_ 1 #include @@ -13,68 +13,71 @@ namespace facebook { namespace thrift { namespace concurrency { -/** Utility methods - - This class contains basic utility methods for converting time formats, and other common platform-dependent concurrency operations. - It should not be included in API headers for other concurrency library headers, since it will, by definition, pull in all sorts of - horrid platform dependent crap. Rather it should be inluded directly in concurrency library implementation source. - - @author marc - @version $Id:$ */ - +/** + * Utility methods + * + * This class contains basic utility methods for converting time formats, + * and other common platform-dependent concurrency operations. + * It should not be included in API headers for other concurrency library + * headers, since it will, by definition, pull in all sorts of horrid + * platform dependent crap. Rather it should be inluded directly in + * concurrency library implementation source. + * + * @author marc + * @version $Id:$ + */ class Util { static const long long NS_PER_S = 1000000000LL; - static const long long MS_PER_S = 1000LL; - static const long long NS_PER_MS = 1000000LL; public: - /** Converts timespec to milliseconds - - @param struct timespec& result - @param time or duration in milliseconds */ - + /** + * Converts timespec to milliseconds + * + * @param struct timespec& result + * @param time or duration in milliseconds + */ static void toTimespec(struct timespec& result, long long value) { - - result.tv_sec = value / MS_PER_S; // ms to s - + result.tv_sec = value / MS_PER_S; // ms to s result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns } - /** Converts timespec to milliseconds */ - + /** + * Converts timespec to milliseconds + */ static const void toMilliseconds(long long& result, const struct timespec& value) { - - result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS) + (value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; + result = + (value.tv_sec * MS_PER_S) + + (value.tv_nsec / NS_PER_MS) + + (value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0); } - /** Get current time as milliseconds from epoch */ - + /** + * Get current time as milliseconds from epoch + */ static const long long currentTime() { - #if defined(HAVE_CLOCK_GETTIME) - struct timespec now; - assert(clock_gettime(CLOCK_REALTIME, &now) == 0); - - return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; - + return + (now.tv_sec * MS_PER_S) + + (now.tv_nsec / NS_PER_MS) + + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; #elif defined(HAVE_GETTIMEOFDAY) - struct timeval now; - assert(gettimeofday(&now, NULL) == 0); - - return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0); - + return + (((long long)now.tv_sec) * MS_PER_S) + + (now.tv_usec / MS_PER_S) + + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0); #endif // defined(HAVE_GETTIMEDAY) } + }; }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_Util_h_) +#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_ diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc index 5c4dd2428..77e5551f4 100644 --- a/lib/cpp/src/concurrency/test/Tests.cc +++ b/lib/cpp/src/concurrency/test/Tests.cc @@ -14,13 +14,13 @@ int main(int argc, char** argv) { args[0] = "all"; - for(int ix = 1; ix < argc; ix++) { + for (int ix = 1; ix < argc; ix++) { args[ix - 1] = std::string(argv[ix]); } bool runAll = args[0].compare("all") == 0; - if(runAll || args[0].compare("thread-factory") == 0) { + if (runAll || args[0].compare("thread-factory") == 0) { ThreadFactoryTests threadFactoryTests; @@ -41,7 +41,7 @@ int main(int argc, char** argv) { assert(threadFactoryTests.monitorTimeoutTest()); } - if(runAll || args[0].compare("util") == 0) { + if (runAll || args[0].compare("util") == 0) { std::cout << "Util tests..." << std::endl; @@ -56,7 +56,7 @@ int main(int argc, char** argv) { time01 = time00; size_t count = 0; - while(time01 < time00 + 10) { + while (time01 < time00 + 10) { count++; time01 = Util::currentTime(); } @@ -65,7 +65,7 @@ int main(int argc, char** argv) { } - if(runAll || args[0].compare("timer-manager") == 0) { + if (runAll || args[0].compare("timer-manager") == 0) { std::cout << "TimerManager tests..." << std::endl; @@ -76,7 +76,7 @@ int main(int argc, char** argv) { assert(timerManagerTests.test00()); } - if(runAll || args[0].compare("thread-manager") == 0) { + if (runAll || args[0].compare("thread-manager") == 0) { std::cout << "ThreadManager tests..." << std::endl; @@ -96,7 +96,7 @@ int main(int argc, char** argv) { } } - if(runAll || args[0].compare("thread-manager-benchmark") == 0) { + if (runAll || args[0].compare("thread-manager-benchmark") == 0) { std::cout << "ThreadManager benchmark tests..." << std::endl; @@ -110,7 +110,7 @@ int main(int argc, char** argv) { long long delay = 10LL; - for(size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) { + for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) { size_t taskCount = workerCount * tasksPerWorker; diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index 26e4c28f8..34b03d930 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -11,11 +11,12 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test { using namespace facebook::thrift::concurrency; -/** ThreadManagerTests class - - @author marc - @version $Id:$ */ - +/** + * ThreadManagerTests class + * + * @author marc + * @version $Id:$ + */ class ThreadFactoryTests { public: @@ -33,8 +34,9 @@ public: } }; - /** Hello world test */ - + /** + * Hello world test + */ bool helloWorldTest() { PosixThreadFactory threadFactory = PosixThreadFactory(); @@ -52,28 +54,26 @@ public: return true; } - /** Reap N threads */ - - class ReapNTask: public Runnable { - - public: - - ReapNTask(Monitor& monitor, int& activeCount) : - _monitor(monitor), - _count(activeCount) { - } + /** + * Reap N threads + */ + class ReapNTask: public Runnable { + public: + + ReapNTask(Monitor& monitor, int& activeCount) : + _monitor(monitor), + _count(activeCount) {} + void run() { - - {Synchronized s(_monitor); - - _count--; - - //std::cout << "\t\t\tthread count: " << _count << std::endl; - - if(_count == 0) { - _monitor.notify(); - } + Synchronized s(_monitor); + + _count--; + + //std::cout << "\t\t\tthread count: " << _count << std::endl; + + if (_count == 0) { + _monitor.notify(); } } @@ -92,25 +92,24 @@ public: std::set > threads; - for(int ix = 0; ix < count; ix++) { + for (int ix = 0; ix < count; ix++) { threads.insert(threadFactory.newThread(shared_ptr(new ReapNTask(*monitor, *activeCount)))); } - for(std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { (*thread)->start(); } - {Synchronized s(*monitor); - - while(*activeCount > 0) { + { + Synchronized s(*monitor); + while (*activeCount > 0) { monitor->wait(1000); } } - for(std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { - + for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { threads.erase(*thread); } @@ -119,10 +118,10 @@ public: return true; } - class SynchStartTask: public Runnable { - - public: + class SynchStartTask: public Runnable { + public: + enum STATE { UNINITIALIZED, STARTING, @@ -131,38 +130,33 @@ public: STOPPED }; - SynchStartTask(Monitor& monitor, - volatile STATE& state) : - _monitor(monitor), - _state(state) { - } + SynchStartTask(Monitor& monitor, volatile STATE& state) : + _monitor(monitor), + _state(state) {} void run() { - - {Synchronized s(_monitor); - - if(_state == SynchStartTask::STARTING) { + { + Synchronized s(_monitor); + if (_state == SynchStartTask::STARTING) { _state = SynchStartTask::STARTED; _monitor.notify(); } } - {Synchronized s(_monitor); - - while(_state == SynchStartTask::STARTED) { + { + Synchronized s(_monitor); + while (_state == SynchStartTask::STARTED) { _monitor.wait(); } - if(_state == SynchStartTask::STOPPING) { - - _state = SynchStartTask::STOPPED; - - _monitor.notifyAll(); + if (_state == SynchStartTask::STOPPING) { + _state = SynchStartTask::STOPPED; + _monitor.notifyAll(); } } } - private: + private: Monitor& _monitor; volatile STATE& _state; }; @@ -179,34 +173,35 @@ public: shared_ptr thread = threadFactory.newThread(task); - if(state == SynchStartTask::UNINITIALIZED) { + if (state == SynchStartTask::UNINITIALIZED) { state = SynchStartTask::STARTING; thread->start(); } - {Synchronized s(monitor); - - while(state == SynchStartTask::STARTING) { + { + Synchronized s(monitor); + while (state == SynchStartTask::STARTING) { monitor.wait(); } } assert(state != SynchStartTask::STARTING); - {Synchronized s(monitor); + { + Synchronized s(monitor); monitor.wait(100); - if(state == SynchStartTask::STARTED) { + if (state == SynchStartTask::STARTED) { state = SynchStartTask::STOPPING; monitor.notify(); } - while(state == SynchStartTask::STOPPING) { + while (state == SynchStartTask::STOPPING) { monitor.wait(); } } @@ -228,8 +223,9 @@ public: long long startTime = Util::currentTime(); - for(size_t ix = 0; ix < count; ix++) { - {Synchronized s(monitor); + for (size_t ix = 0; ix < count; ix++) { + { + Synchronized s(monitor); monitor.wait(timeout); } } @@ -238,7 +234,7 @@ public: double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout); - if(error < 0.0) { + if (error < 0.0) { error *= 1.0; } diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index 7e74aac87..e17434353 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -14,11 +14,12 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test { using namespace facebook::thrift::concurrency; -/** ThreadManagerTests class - - @author marc - @version $Id:$ */ - +/** + * ThreadManagerTests class + * + * @author marc + * @version $Id:$ + */ class ThreadManagerTests { public: @@ -39,8 +40,8 @@ public: _startTime = Util::currentTime(); - {Synchronized s(_sleep); - + { + Synchronized s(_sleep); _sleep.wait(_timeout); } @@ -49,13 +50,14 @@ public: _done = true; - {Synchronized s(_monitor); + { + Synchronized s(_monitor); // std::cout << "Thread " << _count << " completed " << std::endl; _count--; - if(_count == 0) { + if (_count == 0) { _monitor.notify(); } @@ -71,9 +73,11 @@ public: Monitor _sleep; }; - /** Dispatch count tasks, each of which blocks for timeout milliseconds then completes. - Verify that all tasks completed and that thread manager cleans up properly on delete. */ - + /** + * Dispatch count tasks, each of which blocks for timeout milliseconds then + * completes. Verify that all tasks completed and that thread manager cleans + * up properly on delete. + */ bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) { Monitor monitor; @@ -92,20 +96,21 @@ public: std::set > tasks; - for(size_t ix = 0; ix < count; ix++) { + for (size_t ix = 0; ix < count; ix++) { tasks.insert(shared_ptr(new ThreadManagerTests::Task(monitor, activeCount, timeout))); } long long time00 = Util::currentTime(); - for(std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + for (std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { threadManager->add(*ix); } - {Synchronized s(monitor); - + { + Synchronized s(monitor); + while(activeCount > 0) { monitor.wait(); @@ -121,7 +126,7 @@ public: long long minTime = 9223372036854775807LL; long long maxTime = 0; - for(std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + for (std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { shared_ptr task = *ix; @@ -129,19 +134,19 @@ public: assert(delta > 0); - if(task->_startTime < firstTime) { + if (task->_startTime < firstTime) { firstTime = task->_startTime; } - if(task->_endTime > lastTime) { + if (task->_endTime > lastTime) { lastTime = task->_endTime; } - if(delta < minTime) { + if (delta < minTime) { minTime = delta; } - if(delta > maxTime) { + if (delta > maxTime) { maxTime = delta; } @@ -156,7 +161,7 @@ public: double error = ((time01 - time00) - expectedTime) / expectedTime; - if(error < 0) { + if (error < 0) { error*= -1.0; } diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h index fe56d3124..faab7336c 100644 --- a/lib/cpp/src/concurrency/test/TimerManagerTests.h +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h @@ -10,29 +10,29 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test { using namespace facebook::thrift::concurrency; -/** ThreadManagerTests class - - @author marc - @version $Id:$ */ - +/** + * ThreadManagerTests class + * + * @author marc + * @version $Id:$ + */ class TimerManagerTests { public: static const double ERROR; - class Task: public Runnable { - - public: + class Task: public Runnable { + public: Task(Monitor& monitor, long long timeout) : _timeout(timeout), _startTime(Util::currentTime()), _monitor(monitor), _success(false), - _done(false) {} + _done(false) {} - ~Task() {std::cerr << this << std::endl;} + ~Task() { std::cerr << this << std::endl; } void run() { @@ -58,9 +58,7 @@ class TimerManagerTests { {Synchronized s(_monitor); _monitor.notifyAll(); } - } - - + } long long _timeout; long long _startTime; @@ -70,10 +68,12 @@ class TimerManagerTests { bool _done; }; - /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that - the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its - destructor is called. */ - + /** + * This test creates two tasks and waits for the first to expire within 10% + * of the expected expiration time. It then verifies that the timer manager + * properly clean up itself and the remaining orphaned timeout task when the + * manager goes out of scope and its destructor is called. + */ bool test00(long long timeout=1000LL) { shared_ptr orphanTask = shared_ptr(new TimerManagerTests::Task(_monitor, 10 * timeout)); @@ -90,7 +90,8 @@ class TimerManagerTests { shared_ptr task = shared_ptr(new TimerManagerTests::Task(_monitor, timeout)); - {Synchronized s(_monitor); + { + Synchronized s(_monitor); timerManager.add(orphanTask, 10 * timeout); diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h index d66fa2fe1..5bca6dd04 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.h +++ b/lib/cpp/src/protocol/TBinaryProtocol.h @@ -1,5 +1,5 @@ -#ifndef T_BINARY_PROTOCOL_H -#define T_BINARY_PROTOCOL_H +#ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_ +#define _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_ 1 #include "TProtocol.h" @@ -147,5 +147,6 @@ using namespace boost; }}} // facebook::thrift::protocol -#endif +#endif // #ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_ + diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h index e9d560c33..65d6f3c23 100644 --- a/lib/cpp/src/protocol/TProtocol.h +++ b/lib/cpp/src/protocol/TProtocol.h @@ -1,5 +1,5 @@ -#ifndef T_PROTOCOL_H -#define T_PROTOCOL_H +#ifndef _THRIFT_PROTOCOL_TPROTOCOL_H_ +#define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1 #include @@ -20,7 +20,7 @@ using namespace facebook::thrift::transport; #define htonll(x) ntohll(x) -/** Forward declaration for TProtocol */ +// Forward declaration for TProtocol struct TBuf; /** @@ -49,7 +49,8 @@ enum TType { }; /** - * Enumerated definition of the message types that the Thrift protocol supports. + * Enumerated definition of the message types that the Thrift protocol + * supports. */ enum TMessageType { T_CALL = 1, @@ -297,5 +298,4 @@ class TProtocol { }}} // facebook::thrift::protocol -#endif - +#endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1 diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h index c94d926a7..0783ca416 100644 --- a/lib/cpp/src/transport/TBufferedTransport.h +++ b/lib/cpp/src/transport/TBufferedTransport.h @@ -1,5 +1,5 @@ -#ifndef T_BUFFERED_TRANSPORT_H -#define T_BUFFERED_TRANSPORT_H +#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1 #include "TTransport.h" #include @@ -84,4 +84,4 @@ class TBufferedTransport : public TTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h index c6312d051..ebc27801f 100644 --- a/lib/cpp/src/transport/TChunkedTransport.h +++ b/lib/cpp/src/transport/TChunkedTransport.h @@ -1,5 +1,5 @@ -#ifndef T_CHUNKED_TRANSPORT_H -#define T_CHUNKED_TRANSPORT_H +#ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_ 1 #include "TTransport.h" #include @@ -80,4 +80,4 @@ class TChunkedTransport : public TTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TNullTransport.h b/lib/cpp/src/transport/TNullTransport.h index 8bb5bd29a..dd2999f59 100644 --- a/lib/cpp/src/transport/TNullTransport.h +++ b/lib/cpp/src/transport/TNullTransport.h @@ -1,5 +1,5 @@ -#ifndef T_NULL_TRANSPORT -#define T_NULL_TRANSPORT +#ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1 #include "TTransport.h" @@ -25,4 +25,4 @@ class TNullTransport : public TTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h index a7af44b1a..29cfdfefa 100644 --- a/lib/cpp/src/transport/TServerSocket.h +++ b/lib/cpp/src/transport/TServerSocket.h @@ -1,5 +1,5 @@ -#ifndef T_SERVER_SOCKET_H -#define T_SERVER_SOCKET_H +#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_ +#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1 #include "TServerTransport.h" #include @@ -34,4 +34,4 @@ class TServerSocket : public TServerTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_ diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h index 390fa704b..0abab475e 100644 --- a/lib/cpp/src/transport/TServerTransport.h +++ b/lib/cpp/src/transport/TServerTransport.h @@ -1,5 +1,5 @@ -#ifndef T_SERVER_TRANSPORT_H -#define T_SERVER_TRANSPORT_H +#ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1 #include "TTransport.h" #include "TTransportException.h" @@ -65,4 +65,4 @@ class TServerTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index bf90b63a4..f1a065fbf 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -1,5 +1,5 @@ -#ifndef T_SOCKET_H -#define T_SOCKET_H +#ifndef _THRIFT_TRANSPORT_TSOCKET_H_ +#define _THRIFT_TRANSPORT_TSOCKET_H_ 1 #include @@ -100,4 +100,6 @@ class TSocket : public TTransport { }; }}} // facebook::thrift::transport -#endif + +#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_ + diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index e5e40e48b..b2e2a5ba2 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -1,5 +1,5 @@ -#ifndef T_TRANSPORT_H -#define T_TRANSPORT_H +#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 #include "TTransportException.h" #include @@ -98,4 +98,4 @@ class TTransport { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h index 4b2be8745..c54084d3c 100644 --- a/lib/cpp/src/transport/TTransportException.h +++ b/lib/cpp/src/transport/TTransportException.h @@ -1,5 +1,5 @@ -#ifndef T_TRANSPORT_EXCEPTION_H -#define T_TRANSPORT_EXCEPTION_H +#ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ +#define _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ 1 #include @@ -64,4 +64,4 @@ class TTransportException { }}} // facebook::thrift::transport -#endif +#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ diff --git a/test/cpp/src/TestServer.cc b/test/cpp/src/TestServer.cc index 2f2d97d28..75b88d43f 100644 --- a/test/cpp/src/TestServer.cc +++ b/test/cpp/src/TestServer.cc @@ -248,13 +248,12 @@ class TestServer : public ThriftTestServer { printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str()); - if(arg0.compare("Xception") == 0) { + if (arg0.compare("Xception") == 0) { Xception e; e.errorCode = 1001; e.message = "This is an Xception"; throw e; - - } else if(arg0.compare("Xception2") == 0) { + } else if (arg0.compare("Xception2") == 0) { Xception2 e; e.errorCode = 2002; e.struct_thing.string_thing = "This is an Xception2"; @@ -287,15 +286,11 @@ int main(int argc, char **argv) { map args; - for(int ix = 1; ix < argc; ix++) { - + for (int ix = 1; ix < argc; ix++) { string arg(argv[ix]); - - if(arg.compare(0,2, "--") == 0) { - + if (arg.compare(0,2, "--") == 0) { size_t end = arg.find_first_of("=", 2); - - if(end != string::npos) { + if (end != string::npos) { args[string(arg, 2, end - 2)] = string(arg, end + 1); } else { args[string(arg, 2, end - 2)] = "true"; @@ -308,40 +303,35 @@ int main(int argc, char **argv) { try { - if(!args["port"].empty()) { + if (!args["port"].empty()) { port = atoi(args["port"].c_str()); } - if(!args["server-type"].empty()) { - serverType = args["server-type"]; - - if(serverType == "simple") { - - } else if(serverType == "thread-pool") { - + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + if (serverType == "simple") { + } else if (serverType == "thread-pool") { } else { - throw invalid_argument("Unknown server type "+serverType); } } - if(!args["protocol-type"].empty()) { + if (!args["protocol-type"].empty()) { protocolType = args["protocol-type"]; - - if(protocolType == "binary") { - } else if(protocolType == "ascii") { + if (protocolType == "binary") { + } else if (protocolType == "ascii") { throw invalid_argument("ASCII protocol not supported"); - } else if(protocolType == "xml") { + } else if (protocolType == "xml") { throw invalid_argument("XML protocol not supported"); } else { throw invalid_argument("Unknown protocol type "+protocolType); } } - if(!args["workers"].empty()) { + if (!args["workers"].empty()) { workerCount = atoi(args["workers"].c_str()); } - } catch(exception& e) { + } catch (exception& e) { cerr << e.what() << endl; cerr << usage; } @@ -369,9 +359,11 @@ int main(int argc, char **argv) { } else if (serverType == "thread-pool") { - shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); + shared_ptr threadManager = + ThreadManager::newSimpleThreadManager(workerCount); - shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = + shared_ptr(new PosixThreadFactory()); threadManager->threadFactory(threadFactory);