mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 18:58:51 +00:00
-- fix read timeout handling in TSocket
Summary: - turns out that EAGAIN can be returned both when there is a transmission timeout and when there is a lack of system resources. This diff has a hacky fix for respecting a user specified read timeout. Reviewed By: Steve Grimm, Marc, Slee Test Plan: - Tested by trying to crash an srp machine Revert Plan: No. Notes: - Also added functionality to allow users to specify the max number of recv retries (in the case when EAGAIN is returned due to a lack of system resources) git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665121 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3a3b53bc6e
commit
e04475b5aa
@ -20,9 +20,19 @@ class InvalidArgumentException : public facebook::thrift::TException {};
|
||||
|
||||
class IllegalStateException : public facebook::thrift::TException {};
|
||||
|
||||
class TimedOutException : public facebook::thrift::TException {};
|
||||
class TimedOutException : public facebook::thrift::TException {
|
||||
public:
|
||||
TimedOutException():TException("TimedOutException"){};
|
||||
TimedOutException(const std::string& message ) :
|
||||
TException(message) {}
|
||||
};
|
||||
|
||||
class TooManyPendingTasksException : public facebook::thrift::TException {};
|
||||
class TooManyPendingTasksException : public facebook::thrift::TException {
|
||||
public:
|
||||
TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
|
||||
TooManyPendingTasksException(const std::string& message ) :
|
||||
TException(message) {}
|
||||
};
|
||||
|
||||
class SystemResourceException : public facebook::thrift::TException {
|
||||
public:
|
||||
|
@ -36,9 +36,6 @@ uint32_t g_socket_syscalls = 0;
|
||||
// Mutex to protect syscalls to netdb
|
||||
static Monitor s_netdb_monitor;
|
||||
|
||||
// TODO(mcslee): Make this an option to the socket class
|
||||
#define MAX_RECV_RETRIES 20
|
||||
|
||||
TSocket::TSocket(string host, int port) :
|
||||
host_(host),
|
||||
port_(port),
|
||||
@ -48,7 +45,8 @@ TSocket::TSocket(string host, int port) :
|
||||
recvTimeout_(0),
|
||||
lingerOn_(1),
|
||||
lingerVal_(0),
|
||||
noDelay_(1) {
|
||||
noDelay_(1),
|
||||
maxRecvRetries_(5) {
|
||||
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
|
||||
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
|
||||
}
|
||||
@ -62,7 +60,8 @@ TSocket::TSocket() :
|
||||
recvTimeout_(0),
|
||||
lingerOn_(1),
|
||||
lingerVal_(0),
|
||||
noDelay_(1) {
|
||||
noDelay_(1),
|
||||
maxRecvRetries_(5) {
|
||||
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
|
||||
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
|
||||
}
|
||||
@ -76,7 +75,8 @@ TSocket::TSocket(int socket) :
|
||||
recvTimeout_(0),
|
||||
lingerOn_(1),
|
||||
lingerVal_(0),
|
||||
noDelay_(1) {
|
||||
noDelay_(1),
|
||||
maxRecvRetries_(5) {
|
||||
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
|
||||
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
|
||||
}
|
||||
@ -235,23 +235,52 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
|
||||
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
|
||||
}
|
||||
|
||||
uint32_t retries = 0;
|
||||
|
||||
try_again:
|
||||
int32_t retries = 0;
|
||||
|
||||
// EAGAIN can be signalled both when a timeout has occurred and when
|
||||
// the system is out of resources (an awesome undocumented feature).
|
||||
// The following is an approximation of the time interval under which
|
||||
// EAGAIN is taken to indicate an out of resources error.
|
||||
uint32_t eagainThresholdMicros = 0;
|
||||
if (recvTimeout_) {
|
||||
// if a readTimeout is specified along with a max number of recv retries, then
|
||||
// the threshold will ensure that the read timeout is not exceeded even in the
|
||||
// case of resource errors
|
||||
eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
|
||||
}
|
||||
|
||||
try_again:
|
||||
// Read from the socket
|
||||
struct timeval begin;
|
||||
gettimeofday(&begin, NULL);
|
||||
int got = recv(socket_, buf, len, 0);
|
||||
struct timeval end;
|
||||
gettimeofday(&end, NULL);
|
||||
uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
|
||||
+ (((uint64_t)(end.tv_usec - begin.tv_usec))));
|
||||
++g_socket_syscalls;
|
||||
|
||||
|
||||
// Check for error on read
|
||||
if (got < 0) {
|
||||
// If temporarily out of resources, sleep a bit and try again
|
||||
if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
|
||||
usleep(50);
|
||||
goto try_again;
|
||||
if (errno == EAGAIN) {
|
||||
// check if this is the lack of resources or timeout case
|
||||
if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
|
||||
if (retries++ < maxRecvRetries_) {
|
||||
usleep(50);
|
||||
goto try_again;
|
||||
} else {
|
||||
throw TTransportException(TTransportException::TIMED_OUT,
|
||||
"EAGAIN (unavailable resources)");
|
||||
}
|
||||
} else {
|
||||
// infer that timeout has been hit
|
||||
throw TTransportException(TTransportException::TIMED_OUT,
|
||||
"EAGAIN (timed out)");
|
||||
}
|
||||
}
|
||||
|
||||
// If interrupted, try again
|
||||
if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
|
||||
if (errno == EINTR && retries++ < maxRecvRetries_) {
|
||||
goto try_again;
|
||||
}
|
||||
|
||||
@ -407,4 +436,8 @@ void TSocket::setSendTimeout(int ms) {
|
||||
}
|
||||
}
|
||||
|
||||
void TSocket::setMaxRecvRetries(int maxRecvRetries) {
|
||||
maxRecvRetries_ = maxRecvRetries;
|
||||
}
|
||||
|
||||
}}} // facebook::thrift::transport
|
||||
|
@ -19,6 +19,7 @@ namespace facebook { namespace thrift { namespace transport {
|
||||
* TCP Socket implementation of the TTransport interface.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
* @author Aditya Agarwal <aditya@facebook.com>
|
||||
*/
|
||||
class TSocket : public TTransport {
|
||||
/**
|
||||
@ -130,6 +131,12 @@ class TSocket : public TTransport {
|
||||
*/
|
||||
void setSendTimeout(int ms);
|
||||
|
||||
/**
|
||||
* Set the max number of recv retries in case of an EAGAIN
|
||||
* error
|
||||
*/
|
||||
void setMaxRecvRetries(int maxRecvRetries);
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Constructor to create socket from raw UNIX handle. Never called directly
|
||||
@ -164,6 +171,9 @@ class TSocket : public TTransport {
|
||||
/** Nodelay */
|
||||
bool noDelay_;
|
||||
|
||||
/** Recv EGAIN retries */
|
||||
int maxRecvRetries_;
|
||||
|
||||
/** Recv timeout timeval */
|
||||
struct timeval recvTimeval_;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user