From 54bec5dbd8f91305d2cd1d1f1125cf8b54fdd6bc Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:10:45 +0000 Subject: [PATCH] THRIFT-926. cpp: Add configurable buffer recycling for TNonblockingServer Add methods to TNonblockingServer to set the maximum size of idle read and write buffers and the check interval (in calls). When checked, if the buffers are larger than the configured maximum, they will be resized down the to maximum size. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005164 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 27 +++- lib/cpp/src/server/TNonblockingServer.h | 148 ++++++++++++++++++---- 2 files changed, 149 insertions(+), 26 deletions(-) diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 73edd9385..1bf4e68e5 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -112,9 +112,11 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s, writeBuffer_ = NULL; writeBufferSize_ = 0; writeBufferPos_ = 0; + largestWriteBufferSize_ = 0; socketState_ = SOCKET_RECV; appState_ = APP_INIT; + callsForResize_ = 0; // Set flags, which also registers the event setFlags(eventFlags); @@ -342,6 +344,16 @@ void TConnection::transition() { goto LABEL_APP_INIT; case APP_SEND_RESULT: + // it's now safe to perform buffer size housekeeping. + if (writeBufferSize_ > largestWriteBufferSize_) { + largestWriteBufferSize_ = writeBufferSize_; + } + if (server_->getResizeBufferEveryN() > 0 + && ++callsForResize_ >= server_->getResizeBufferEveryN()) { + checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(), + server_->getIdleWriteBufferLimit()); + callsForResize_ = 0; + } // N.B.: We also intentionally fall through here into the INIT state! @@ -486,15 +498,22 @@ void TConnection::close() { server_->returnConnection(this); } -void TConnection::checkIdleBufferMemLimit(size_t limit) { - if (readBufferSize_ > limit) { - readBufferSize_ = limit; +void TConnection::checkIdleBufferMemLimit(size_t readLimit, + size_t writeLimit) { + if (readLimit > 0 && readBufferSize_ > readLimit) { + readBufferSize_ = readLimit; readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_); if (readBuffer_ == NULL) { GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc"); close(); } } + + if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { + // just start over + outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP); + largestWriteBufferSize_ = 0; + } } TNonblockingServer::~TNonblockingServer() { @@ -546,7 +565,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) { (connectionStack_.size() >= connectionStackLimit_)) { delete connection; } else { - connection->checkIdleBufferMemLimit(idleBufferMemLimit_); + connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_); connectionStack_.push(connection); } } diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 3ad49c1c8..501433cb0 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -70,15 +70,21 @@ class TNonblockingServer : public TServer { /// Default limit on size of idle connection pool static const size_t CONNECTION_STACK_LIMIT = 1024; - /// Maximum size of buffer allocated to idle connection - static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192; - /// Default limit on total number of connected sockets static const int MAX_CONNECTIONS = INT_MAX; /// Default limit on connections in handler/task processing static const int MAX_ACTIVE_PROCESSORS = INT_MAX; + /// Maximum size of read buffer allocated to idle connection (0 = unlimited) + static const int IDLE_READ_BUFFER_LIMIT = 1024; + + /// Maximum size of write buffer allocated to idle connection (0 = unlimited) + static const int IDLE_WRITE_BUFFER_LIMIT = 1024; + + /// # of calls before resizing oversized buffers (0 = check only on close) + static const int RESIZE_BUFFER_EVERY_N = 512; + /// Server socket file descriptor int serverSocket_; @@ -129,11 +135,27 @@ class TNonblockingServer : public TServer { TOverloadAction overloadAction_; /** - * Max read buffer size for an idle connection. When we place an idle - * TConnection into connectionStack_, we insure that its read buffer is - * reduced to this size to insure that idle connections don't hog memory. + * Max read buffer size for an idle TConnection. When we place an idle + * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls, + * we insure that its read buffer is reduced to this size to insure that + * idle connections don't hog memory. 0 disables this check. */ - size_t idleBufferMemLimit_; + size_t idleReadBufferLimit_; + + /** + * Max write buffer size for an idle connection. When we place an idle + * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls, + * we insure that its write buffer is <= to this size; otherwise we + * replace it with a new one to insure that idle connections don't hog + * memory. 0 disables this check. + */ + size_t idleWriteBufferLimit_; + + /** + * Every N calls we check the buffer size limits on a connected TConnection. + * 0 disables (i.e. the checks are only done when a connection closes). + */ + int32_t resizeBufferEveryN_; /// Set if we are currently in an overloaded state. bool overloaded_; @@ -181,7 +203,9 @@ class TNonblockingServer : public TServer { taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), - idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), + idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT), + idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT), + resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N), overloaded_(false), nConnectionsDropped_(0), nTotalConnectionsDropped_(0) {} @@ -203,7 +227,9 @@ class TNonblockingServer : public TServer { taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), - idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), + idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT), + idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT), + resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N), overloaded_(false), nConnectionsDropped_(0), nTotalConnectionsDropped_(0) { @@ -234,7 +260,9 @@ class TNonblockingServer : public TServer { taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), - idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), + idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT), + idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT), + resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N), overloaded_(false), nConnectionsDropped_(0), nTotalConnectionsDropped_(0) { @@ -446,25 +474,94 @@ class TNonblockingServer : public TServer { bool drainPendingTask(); /** - * Get the maximum limit of memory allocated to idle TConnection objects. + * Get the maximum size of read buffer allocated to idle TConnection objects. + * + * @return # bytes beyond which we will shrink buffers when idle. + */ + size_t getIdleReadBufferLimit() const { + return idleReadBufferLimit_; + } + + /** + * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().] + * Get the maximum size of read buffer allocated to idle TConnection objects. * * @return # bytes beyond which we will shrink buffers when idle. */ size_t getIdleBufferMemLimit() const { - return idleBufferMemLimit_; + return idleReadBufferLimit_; } /** - * Set the maximum limit of memory allocated to idle TConnection objects. - * If a TConnection object goes idle with more than this much memory - * allocated to its buffer, we shrink it to this value. + * Set the maximum size read buffer allocated to idle TConnection objects. + * If a TConnection object is found (either on connection close or between + * calls when resizeBufferEveryN_ is set) with more than this much memory + * allocated to its read buffer, we shrink it to this value. + * + * @param limit of bytes beyond which we will shrink buffers when checked. + */ + void setIdleReadBufferLimit(size_t limit) { + idleReadBufferLimit_ = limit; + } + + /** + * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().] + * Set the maximum size read buffer allocated to idle TConnection objects. + * If a TConnection object is found (either on connection close or between + * calls when resizeBufferEveryN_ is set) with more than this much memory + * allocated to its read buffer, we shrink it to this value. + * + * @param limit of bytes beyond which we will shrink buffers when checked. + */ + void setIdleBufferMemLimit(size_t limit) { + idleReadBufferLimit_ = limit; + } + + + + /** + * Get the maximum size of write buffer allocated to idle TConnection objects. + * + * @return # bytes beyond which we will reallocate buffers when checked. + */ + size_t getIdleWriteBufferLimit() const { + return idleWriteBufferLimit_; + } + + /** + * Set the maximum size write buffer allocated to idle TConnection objects. + * If a TConnection object is found (either on connection close or between + * calls when resizeBufferEveryN_ is set) with more than this much memory + * allocated to its write buffer, we destroy and construct that buffer. * * @param limit of bytes beyond which we will shrink buffers when idle. */ - void setIdleBufferMemLimit(size_t limit) { - idleBufferMemLimit_ = limit; + void setIdleWriteBufferLimit(size_t limit) { + idleWriteBufferLimit_ = limit; } + /** + * Get # of calls made between buffer size checks. 0 means disabled. + * + * @return # of calls between buffer size checks. + */ + int32_t getResizeBufferEveryN() const { + return resizeBufferEveryN_; + } + + /** + * Check buffer sizes every "count" calls. This allows buffer limits + * to be enforced for persistant connections with a controllable degree + * of overhead. 0 disables checks except at connection close. + * + * @param count the number of calls between checks, or 0 to disable + */ + void setResizeBufferEveryN(int32_t count) { + resizeBufferEveryN_ = count; + } + + + /** * Return an initialized connection object. Creates or recovers from * pool a TConnection and initializes it with the provided socket FD @@ -581,7 +678,7 @@ enum TAppState { * Represents a connection that is handled via libevent. This connection * essentially encapsulates a socket that has some associated libevent state. */ - class TConnection { +class TConnection { private: /// Starting size for new connection buffer @@ -626,6 +723,12 @@ enum TAppState { /// How far through writing are we? uint32_t writeBufferPos_; + /// Largest size of write buffer seen since buffer was constructed + size_t largestWriteBufferSize_; + + /// Count of the number of calls for use with getResizeBufferEveryN(). + int32_t callsForResize_; + /// Task handle int taskHandle_; @@ -716,12 +819,13 @@ enum TAppState { server_->decrementNumConnections(); } - /** - * Check read buffer against a given limit and shrink it if exceeded. + /** + * Check buffers against any size limits and shrink it if exceeded. * - * @param limit we limit buffer size to. + * @param readLimit we reduce read buffer size to this (if nonzero). + * @param writeLimit if nonzero and write buffer is larger, replace it. */ - void checkIdleBufferMemLimit(size_t limit); + void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); /// Initialize void init(int socket, short eventFlags, TNonblockingServer *s,