cpp: TNonBlockingServer overload handling and optimizations

- Establish a mechanism for TNonBlockingServer to handle overloads by
  limiting the number of connections accepted or in-process.

- Provide a framework for further work in handling server overloads.

- Limit memory consumption of connection object pool.

- Drop connections when overloaded.

- Add overload-handling behavior allowing pending tasks to be dropped
  from the front of the task queue (short of the ability to terminate
  running tasks, these are the oldest tasks in the system and thus the
  most likely to be beyond their freshness date).  This reduces the
  chance of spending valuable CPU time processing a request that the
  client has already timed out.

- Uses a single persistent pipe() to communicate task completion instead
  of constructing and monitoring a new socketpair() for every task in
  the system.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920664 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Reiss 2010-03-09 05:19:25 +00:00
parent e574a065a9
commit 01fe15322f
4 changed files with 550 additions and 116 deletions

View File

@ -117,6 +117,8 @@ class ThreadManager::Impl : public ThreadManager {
void remove(shared_ptr<Runnable> task);
shared_ptr<Runnable> removeNextPending();
private:
void stopImpl(bool join);
@ -163,6 +165,10 @@ class ThreadManager::Task : public Runnable {
}
}
shared_ptr<Runnable> getRunnable() {
return runnable_;
}
private:
shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
@ -458,6 +464,22 @@ void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
}
}
boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
if (tasks_.empty()) {
return boost::shared_ptr<Runnable>();
}
shared_ptr<ThreadManager::Task> task = tasks_.front();
tasks_.pop();
return task->getRunnable();
}
class SimpleThreadManager : public ThreadManager::Impl {
public:

View File

@ -148,6 +148,13 @@ class ThreadManager {
*/
virtual void remove(boost::shared_ptr<Runnable> task) = 0;
/**
* Remove the next pending task which would be run.
*
* @return the task removed.
*/
virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
static boost::shared_ptr<ThreadManager> newThreadManager();
/**

View File

@ -45,11 +45,11 @@ class TConnection::Task: public Runnable {
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
int taskHandle) :
TConnection* connection) :
processor_(processor),
input_(input),
output_(output),
taskHandle_(taskHandle) {}
connection_(connection) {}
void run() {
try {
@ -66,21 +66,21 @@ class TConnection::Task: public Runnable {
cerr << "TNonblockingServer uncaught exception." << endl;
}
// Signal completion back to the libevent thread via a socketpair
int8_t b = 0;
if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
}
if (-1 == ::close(taskHandle_)) {
GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyServer()) {
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
TConnection* getTConnection() {
return connection_;
}
private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
int taskHandle_;
TConnection* connection_;
};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
@ -99,8 +99,6 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
taskHandle_ = -1;
// Set flags, which also registers the event
setFlags(eventFlags);
@ -256,37 +254,20 @@ void TConnection::transition() {
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
server_->incrementActiveProcessors();
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
int sv[2];
if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
GlobalOutput.perror("TConnection::socketpair() failed ", errno);
// Now we will fall through to the APP_WAIT_TASK block with no response
} else {
// Create task and dispatch to the thread manager
boost::shared_ptr<Runnable> task =
boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
inputProtocol_,
outputProtocol_,
sv[1]));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
// Create an event to be notified when the task finishes
event_set(&taskEvent_,
taskHandle_ = sv[0],
EV_READ,
TConnection::taskHandler,
this);
// Create task and dispatch to the thread manager
boost::shared_ptr<Runnable> task =
boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
inputProtocol_,
outputProtocol_,
this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
// Attach to the base
event_base_set(server_->getEventBase(), &taskEvent_);
// Add the event and start up the server
if (-1 == event_add(&taskEvent_, 0)) {
GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
try {
server_->addTask(task);
} catch (IllegalStateException & ise) {
@ -295,26 +276,28 @@ void TConnection::transition() {
close();
}
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();
return;
}
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();
return;
} else {
try {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &ttx) {
GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
server_->decrementActiveProcessors();
close();
return;
} catch (TException &x) {
GlobalOutput.printf("TException: Server::process() %s", x.what());
server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
server_->decrementActiveProcessors();
close();
return;
}
@ -328,6 +311,7 @@ void TConnection::transition() {
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@ -425,6 +409,11 @@ void TConnection::transition() {
return;
case APP_CLOSE_CONNECTION:
server_->decrementActiveProcessors();
close();
return;
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
@ -453,7 +442,7 @@ void TConnection::setFlags(short eventFlags) {
return;
}
/**
/*
* event_set:
*
* Prepares the event structure &event to be used in future calls to
@ -499,10 +488,10 @@ void TConnection::close() {
}
// Close the socket
if (socket_ > 0) {
if (socket_ >= 0) {
::close(socket_);
}
socket_ = 0;
socket_ = -1;
// close any factory produced transports
factoryInputTransport_->close();
@ -512,7 +501,7 @@ void TConnection::close() {
server_->returnConnection(this);
}
void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
void TConnection::checkIdleBufferMemLimit(size_t limit) {
if (readBufferSize_ > limit) {
readBufferSize_ = limit;
readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
@ -572,7 +561,21 @@ void TNonblockingServer::handleEvent(int fd, short which) {
// one, this helps us to avoid having to go back into the libevent engine so
// many times
while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
close(clientSocket);
continue;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
close(clientSocket);
continue;
}
}
}
// Explicitly set this socket to NONBLOCK mode
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@ -707,6 +710,13 @@ void TNonblockingServer::listenSocket(int s) {
serverSocket_ = s;
}
void TNonblockingServer::createNotificationPipe() {
if (pipe(notificationPipeFDs_) != 0) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
throw TException("can't create notification pipe");
}
}
/**
* Register the core libevent events onto the proper base.
*/
@ -732,6 +742,59 @@ void TNonblockingServer::registerEvents(event_base* base) {
if (-1 == event_add(&serverEvent_, 0)) {
throw TException("TNonblockingServer::serve(): coult not event_add");
}
if (threadPoolProcessing_) {
// Create an event to be notified when a task finishes
event_set(&notificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TConnection::taskHandler,
this);
// Attach to the base
event_base_set(eventBase_, &notificationEvent_);
// Add the event and start up the server
if (-1 == event_add(&notificationEvent_, 0)) {
throw TException("TNonblockingServer::serve(): notification event_add fail");
}
}
}
bool TNonblockingServer::serverOverloaded() {
size_t activeConnections = numTConnections_ - connectionStack_.size();
if (numActiveProcessors_ > maxActiveProcessors_ ||
activeConnections > maxConnections_) {
if (!overloaded_) {
GlobalOutput.printf("thrift non-blocking server overload condition");
overloaded_ = true;
}
} else {
if (overloaded_ &&
(numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
(activeConnections <= overloadHysteresis_ * maxConnections_)) {
GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
nConnectionsDropped_, nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
}
}
return overloaded_;
}
bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
TConnection* connection =
static_cast<TConnection::Task*>(task.get())->getTConnection();
assert(connection && connection->getServer()
&& connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
}
return false;
}
/**
@ -742,6 +805,11 @@ void TNonblockingServer::serve() {
// Init socket
listenSocket();
if (threadPoolProcessing_) {
// Init task completion notification pipe
createNotificationPipe();
}
// Initialize libevent core
registerEvents(static_cast<event_base*>(event_init()));

View File

@ -24,6 +24,7 @@
#include <server/TServer.h>
#include <transport/TBufferTransports.h>
#include <concurrency/ThreadManager.h>
#include <climits>
#include <stack>
#include <string>
#include <errno.h>
@ -50,48 +51,96 @@ class TConnection;
* operations hardcoded for use with select.
*
*/
/// Overload condition actions.
enum TOverloadAction {
T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};
class TNonblockingServer : public TServer {
private:
// Listen backlog
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
// Default limit on size of idle connection pool
/// Default limit on size of idle connection pool
static const size_t CONNECTION_STACK_LIMIT = 1024;
// Maximum size of buffer allocated to idle connection
/// Maximum size of buffer allocated to idle connection
static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
// Server socket file descriptor
/// Default limit on total number of connected sockets
static const int MAX_CONNECTIONS = INT_MAX;
/// Default limit on connections in handler/task processing
static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
/// Server socket file descriptor
int serverSocket_;
// Port server runs on
/// Port server runs on
int port_;
// For processing via thread pool, may be NULL
/// For processing via thread pool, may be NULL
boost::shared_ptr<ThreadManager> threadManager_;
// Is thread pool processing?
/// Is thread pool processing?
bool threadPoolProcessing_;
// The event base for libevent
/// The event base for libevent
event_base* eventBase_;
// Event struct, for use with eventBase_
/// Event struct, used with eventBase_ for connection events
struct event serverEvent_;
// Number of TConnection object we've created
/// Event struct, used with eventBase_ for task completion notification
struct event notificationEvent_;
/// Number of TConnection object we've created
size_t numTConnections_;
// Limit for how many TConnection objects to cache
/// Number of Connections processing or waiting to process
size_t numActiveProcessors_;
/// Limit for how many TConnection objects to cache
size_t connectionStackLimit_;
/// Limit for number of connections processing or waiting to process
size_t maxActiveProcessors_;
/// Limit for number of open connections
size_t maxConnections_;
/**
* Hysteresis for overload state. This is the fraction of the overload
* value that needs to be reached before the overload state is cleared;
* must be <= 1.0.
*/
double overloadHysteresis_;
/// Action to take when we're overloaded.
TOverloadAction overloadAction_;
/**
* Max read buffer size for an idle connection. When we place an idle
* TConnection into connectionStack_, we insure that its read buffer is
* reduced to this size to insure that idle connections don't hog memory.
*/
uint32_t idleBufferMemLimit_;
size_t idleBufferMemLimit_;
/// Set if we are currently in an overloaded state.
bool overloaded_;
/// Count of connections dropped since overload started
uint32_t nConnectionsDropped_;
/// Count of connections dropped on overload since server started
uint64_t nTotalConnectionsDropped_;
/// File descriptors for pipe used for task completion notification.
int notificationPipeFDs_[2];
/**
* This is a stack of all the objects that have been created but that
@ -101,6 +150,14 @@ class TNonblockingServer : public TServer {
*/
std::stack<TConnection*> connectionStack_;
/**
* Called when server socket had something happen. We accept all waiting
* client connections on listen socket fd and assign TConnection objects
* to handle those requests.
*
* @param fd the listen socket.
* @param which the event flag that triggered the handler.
*/
void handleEvent(int fd, short which);
public:
@ -112,8 +169,16 @@ class TNonblockingServer : public TServer {
threadPoolProcessing_(false),
eventBase_(NULL),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
@ -125,8 +190,16 @@ class TNonblockingServer : public TServer {
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
@ -142,13 +215,21 @@ class TNonblockingServer : public TServer {
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
serverSocket_(-1),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
@ -197,22 +278,150 @@ class TNonblockingServer : public TServer {
return eventBase_;
}
/// Increment our count of the number of connected sockets.
void incrementNumConnections() {
++numTConnections_;
}
/// Decrement our count of the number of connected sockets.
void decrementNumConnections() {
--numTConnections_;
}
size_t getNumConnections() {
/**
* Return the count of sockets currently connected to.
*
* @return count of connected sockets.
*/
size_t getNumConnections() const {
return numTConnections_;
}
size_t getNumIdleConnections() {
/**
* Return the count of connection objects allocated but not in use.
*
* @return count of idle connection objects.
*/
size_t getNumIdleConnections() const {
return connectionStack_.size();
}
/**
* Return count of number of connections which are currently processing.
* This is defined as a connection where all data has been received and
* either assigned a task (when threading) or passed to a handler (when
* not threading), and where the handler has not yet returned.
*
* @return # of connections currently processing.
*/
size_t getNumActiveProcessors() const {
return numActiveProcessors_;
}
/// Increment the count of connections currently processing.
void incrementActiveProcessors() {
++numActiveProcessors_;
}
/// Decrement the count of connections currently processing.
void decrementActiveProcessors() {
if (numActiveProcessors_ > 0) {
--numActiveProcessors_;
}
}
/**
* Get the maximum # of connections allowed before overload.
*
* @return current setting.
*/
size_t getMaxConnections() const {
return maxConnections_;
}
/**
* Set the maximum # of connections allowed before overload.
*
* @param maxConnections new setting for maximum # of connections.
*/
void setMaxConnections(size_t maxConnections) {
maxConnections_ = maxConnections;
}
/**
* Get the maximum # of connections waiting in handler/task before overload.
*
* @return current setting.
*/
size_t getMaxActiveProcessors() const {
return maxActiveProcessors_;
}
/**
* Set the maximum # of connections waiting in handler/task before overload.
*
* @param maxActiveProcessors new setting for maximum # of active processes.
*/
void setMaxActiveProcessors(size_t maxActiveProcessors) {
maxActiveProcessors_ = maxActiveProcessors;
}
/**
* Get fraction of maximum limits before an overload condition is cleared.
*
* @return hysteresis fraction
*/
double getOverloadHysteresis() const {
return overloadHysteresis_;
}
/**
* Set fraction of maximum limits before an overload condition is cleared.
* A good value would probably be between 0.5 and 0.9.
*
* @param hysteresisFraction fraction <= 1.0.
*/
void setOverloadHysteresis(double hysteresisFraction) {
if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
overloadHysteresis_ = hysteresisFraction;
}
}
/**
* Get the action the server will take on overload.
*
* @return a TOverloadAction enum value for the currently set action.
*/
TOverloadAction getOverloadAction() const {
return overloadAction_;
}
/**
* Set the action the server is to take on overload.
*
* @param overloadAction a TOverloadAction enum value for the action.
*/
void setOverloadAction(TOverloadAction overloadAction) {
overloadAction_ = overloadAction;
}
/**
* Determine if the server is currently overloaded.
* This function checks the maximums for open connections and connections
* currently in processing, and sets an overload condition if they are
* exceeded. The overload will persist until both values are below the
* current hysteresis fraction of their maximums.
*
* @return true if an overload condition exists, false if not.
*/
bool serverOverloaded();
/** Pop and discard next task on threadpool wait queue.
*
* @return true if a task was discarded, false if the wait queue was empty.
*/
bool drainPendingTask();
/**
* Get the maximum limit of memory allocated to idle TConnection objects.
*
@ -233,44 +442,105 @@ class TNonblockingServer : public TServer {
idleBufferMemLimit_ = limit;
}
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* and flags.
*
* @param socket FD of socket associated with this connection.
* @param flags initial lib_event flags for this connection.
* @return pointer to initialized TConnection object.
*/
TConnection* createConnection(int socket, short flags);
/**
* Returns a connection to pool or deletion. If the connection pool
* (a stack) isn't full, place the connection object on it, otherwise
* just delete it.
*
* @param connection the TConection being returned.
*/
void returnConnection(TConnection* connection);
/**
* C-callable event handler for listener events. Provides a callback
* that libevent can understand which invokes server->handleEvent().
*
* @param fd the descriptor the event occured on.
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
static void eventHandler(int fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
/// Creates a socket to listen on and binds it to the local port.
void listenSocket();
/**
* Takes a socket created by listenSocket() and sets various options on it
* to prepare for use in the server.
*
* @param fd descriptor of socket to be initialized/
*/
void listenSocket(int fd);
/// Create the pipe used to notify I/O process of task completion.
void createNotificationPipe();
/**
* Get notification pipe send descriptor.
*
* @return write fd for pipe.
*/
int getNotificationSendFD() const {
return notificationPipeFDs_[1];
}
/**
* Get notification pipe receive descriptor.
*
* @return read fd of pipe.
*/
int getNotificationRecvFD() const {
return notificationPipeFDs_[0];
}
/**
* Register the core libevent events onto the proper base.
*
* @param base pointer to the event base to be initialized.
*/
void registerEvents(event_base* base);
/**
* Main workhorse function, starts up the server listening on a port and
* loops over the libevent handler.
*/
void serve();
};
/**
* Two states for sockets, recv and send mode
*/
/// Two states for sockets, recv and send mode
enum TSocketState {
SOCKET_RECV,
SOCKET_SEND
};
/**
* Four states for the nonblocking servr:
* Five states for the nonblocking servr:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
/**
@ -280,108 +550,120 @@ enum TAppState {
class TConnection {
private:
class Task;
/// Starting size for new connection buffer
static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
// Server handle
/// Server handle
TNonblockingServer* server_;
// Socket handle
/// Socket handle
int socket_;
// Libevent object
/// Libevent object
struct event event_;
// Libevent flags
/// Libevent flags
short eventFlags_;
// Socket mode
/// Socket mode
TSocketState socketState_;
// Application state
/// Application state
TAppState appState_;
// How much data needed to read
/// How much data needed to read
uint32_t readWant_;
// Where in the read buffer are we
/// Where in the read buffer are we
uint32_t readBufferPos_;
// Read buffer
/// Read buffer
uint8_t* readBuffer_;
// Read buffer size
/// Read buffer size
uint32_t readBufferSize_;
// Write buffer
/// Write buffer
uint8_t* writeBuffer_;
// Write buffer size
/// Write buffer size
uint32_t writeBufferSize_;
// How far through writing are we?
/// How far through writing are we?
uint32_t writeBufferPos_;
// How many times have we read since our last buffer reset?
/// How many times have we read since our last buffer reset?
uint32_t numReadsSinceReset_;
// How many times have we written since our last buffer reset?
/// How many times have we written since our last buffer reset?
uint32_t numWritesSinceReset_;
// Task handle
/// Task handle
int taskHandle_;
// Task event
/// Task event
struct event taskEvent_;
// Transport to read from
/// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
// Transport that processor writes to
/// Transport that processor writes to
boost::shared_ptr<TMemoryBuffer> outputTransport_;
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
/// extra transport generated by transport factory (e.g. BufferedRouterTransport)
boost::shared_ptr<TTransport> factoryInputTransport_;
boost::shared_ptr<TTransport> factoryOutputTransport_;
// Protocol decoder
/// Protocol decoder
boost::shared_ptr<TProtocol> inputProtocol_;
// Protocol encoder
/// Protocol encoder
boost::shared_ptr<TProtocol> outputProtocol_;
// Go into read mode
/// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
}
// Go into write mode
/// Go into write mode
void setWrite() {
setFlags(EV_WRITE | EV_PERSIST);
}
// Set socket idle
/// Set socket idle
void setIdle() {
setFlags(0);
}
// Set event flags
/**
* Set event flags for this connection.
*
* @param eventFlags flags we pass to libevent for the connection.
*/
void setFlags(short eventFlags);
// Libevent handlers
/**
* Libevent handler called (via our static wrapper) when the connection
* socket had something happen. Rather than use the flags libevent passed,
* we use the connection state to determine whether we need to read or
* write the socket.
*/
void workSocket();
// Close this client and reset
/// Close this connection and free or reset its resources.
void close();
public:
// Constructor
class Task;
/// Constructor
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)std::malloc(1024);
readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
if (readBuffer_ == NULL) {
throw new apache::thrift::TException("Out of memory.");
}
readBufferSize_ = 1024;
readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
numReadsSinceReset_ = 0;
numWritesSinceReset_ = 0;
@ -405,29 +687,84 @@ class TConnection {
*
* @param limit we limit buffer size to.
*/
void checkIdleBufferMemLimit(uint32_t limit);
void checkIdleBufferMemLimit(size_t limit);
// Initialize
/// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s);
// Transition into a new state
/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
void transition();
// Handler wrapper
/**
* C-callable event handler for connection events. Provides a callback
* that libevent can understand which invokes connection_->workSocket().
*
* @param fd the descriptor the event occured on.
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TConnection's "this".
*/
static void eventHandler(int fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
// Handler wrapper for task block
static void taskHandler(int fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->taskHandle_);
if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
/**
* C-callable event handler for signaling task completion. Provides a
* callback that libevent can understand that will read a connection
* object's address from a pipe and call connection->transition() for
* that object.
*
* @param fd the descriptor the event occured on.
*/
static void taskHandler(int fd, short /* which */, void* /* v */) {
TConnection* connection;
if (read(fd, (void*)&connection, sizeof(TConnection*))
!= sizeof(TConnection*)) {
GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
return;
}
((TConnection*)v)->transition();
connection->transition();
}
/**
* Notification to server that processing has ended on this request.
* Can be called either when processing is completed or when a waiting
* task has been preemptively terminated (on overload).
*
* @return true if successful, false if unable to notify (check errno).
*/
bool notifyServer() {
TConnection* connection = this;
if (write(server_->getNotificationSendFD(), (const void*)&connection,
sizeof(TConnection*)) != sizeof(TConnection*)) {
return false;
}
return true;
}
/// Force connection shutdown for this connection.
void forceClose() {
appState_ = APP_CLOSE_CONNECTION;
if (!notifyServer()) {
throw TException("TConnection::forceClose: failed write on notify pipe");
}
}
/// return the server this connection was initialized for.
TNonblockingServer* getServer() {
return server_;
}
/// get state of connection.
TAppState getState() {
return appState_;
}
};
}}} // apache::thrift::server