Fail and retry logic for TSocketPool

Summary: Replicating php logic: If opening fails enough times, mark server as down for some amount of time

Reviewed By: aditya

Test Plan: compiling thrift - any good test ideas?

Revert: OK

DiffCamp Revision: 8381


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Reiss 2008-02-28 21:20:12 +00:00
parent 9163073ca1
commit 6d0cccd640
2 changed files with 91 additions and 13 deletions

View File

@ -13,6 +13,26 @@ namespace facebook { namespace thrift { namespace transport {
using namespace std;
/**
* TSocketPoolServer implementation
*
* @author Akhil Wable <akhil@facebook.com>
*/
TSocketPoolServer::TSocketPoolServer()
: host_(""),
port_(0),
lastFailTime_(0),
consecutiveFailures_(0) {}
/**
* Constructor for TSocketPool server
*/
TSocketPoolServer::TSocketPoolServer(const std::string &host, int port)
: host_(host),
port_(port),
lastFailTime_(0),
consecutiveFailures_(0) {}
/**
* TSocketPool implementation.
*
@ -38,13 +58,15 @@ TSocketPool::TSocketPool(const vector<string> &hosts,
}
TSocketPool::TSocketPool(const vector<pair<string, int> > servers) : TSocket(),
servers_(servers),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
for (unsigned i = 0; i < servers.size(); ++i) {
addServer(servers[i].first, servers[i].second);
}
}
TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
@ -62,7 +84,7 @@ TSocketPool::~TSocketPool() {
}
void TSocketPool::addServer(const string& host, int port) {
servers_.push_back(pair<string, int>(host, port));
servers_.push_back(TSocketPoolServer(host, port));
}
void TSocketPool::setNumRetries(int numRetries) {
@ -92,14 +114,31 @@ void TSocketPool::open() {
std::random_shuffle(servers_.begin(), servers_.end());
}
for (unsigned int i = 0; i < servers_.size(); ++i) {
host_ = servers_[i].first;
port_ = servers_[i].second;
unsigned int numServers = servers_.size();
for (unsigned int i = 0; i < numServers; ++i) {
TSocketPoolServer &server = servers_[i];
bool retryIntervalPassed = (server.lastFailTime_ == 0);
bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
if (server.lastFailTime_ > 0) {
// The server was marked as down, so check if enough time has elapsed to retry
int elapsedTime = time(NULL) - server.lastFailTime_;
if (elapsedTime > retryInterval_) {
retryIntervalPassed = true;
}
}
if (retryIntervalPassed || isLastServer) {
for (int j = 0; j < numRetries_; ++j) {
try {
TSocket::open();
// reset lastFailTime_ is required
if (server.lastFailTime_) {
server.lastFailTime_ = 0;
}
// success
return;
} catch (TException e) {
@ -108,6 +147,14 @@ void TSocketPool::open() {
}
}
++server.consecutiveFailures_;
if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
// Mark server as down
server.consecutiveFailures_ = 0;
server.lastFailTime_ = time(NULL);
}
}
GlobalOutput("TSocketPool::open: all connections failed");
throw TTransportException(TTransportException::NOT_OPEN);
}

View File

@ -12,6 +12,37 @@
namespace facebook { namespace thrift { namespace transport {
/**
* Class to hold server information for TSocketPool
*
* @author Akhil Wable <akhil@facebook.com>
*/
class TSocketPoolServer {
public:
/**
* Default constructor for server info
*/
TSocketPoolServer();
/**
* Constructor for TSocketPool server
*/
TSocketPoolServer(const std::string &host, int port);
// Host name
std::string host_;
// Port to connect on
int port_;
// Last time connecting to this server failed
int lastFailTime_;
// Number of consecutive times connecting to this server failed
int consecutiveFailures_;
};
/**
* TCP Socket implementation of the TTransport interface.
*
@ -87,7 +118,7 @@ class TSocketPool : public TSocket {
protected:
/** List of servers to connect to */
std::vector<std::pair<std::string, int> > servers_;
std::vector<TSocketPoolServer> servers_;
/** How many times to retry each host in connect */
int numRetries_;