THRIFT-2441 Cannot shutdown TThreadedServer when clients are still connected

Author: James E. King, III <Jim.King@simplivity.com>
This commit is contained in:
Ben Craig 2015-04-24 08:52:44 -05:00
parent 95717c92d8
commit 1684c42950
19 changed files with 857 additions and 139 deletions

2
.gitignore vendored
View File

@ -98,6 +98,8 @@ test-driver
/lib/cpp/test/TFileTransportTest
/lib/cpp/test/TNonblockingServerTest
/lib/cpp/test/TPipedTransportTest
/lib/cpp/test/TServerIntegrationTest
/lib/cpp/test/TSocketInterruptTest
/lib/cpp/test/TransportTest
/lib/cpp/test/UnitTests
/lib/cpp/test/ZlibTest

View File

@ -136,7 +136,10 @@ if test "$with_cpp" = "yes"; then
AX_BOOST_BASE([1.53.0])
if test "x$succeeded" = "xyes" ; then
AC_SUBST([BOOST_LIB_DIR], [$(echo "$BOOST_LDFLAGS" | sed -e 's/^\-L//')])
AC_SUBST([BOOST_CHRONO_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_chrono.a")])
AC_SUBST([BOOST_SYSTEM_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_system.a")])
AC_SUBST([BOOST_TEST_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_unit_test_framework.a")])
AC_SUBST([BOOST_THREAD_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_thread.a")])
have_cpp="yes"
fi

View File

@ -70,11 +70,11 @@ void TSimpleServer::serve() {
if (client) {
client->close();
}
if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
continue;
if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@ -88,7 +88,7 @@ void TSimpleServer::serve() {
string errStr = string("Some kind of accept exception: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
} catch (string s) {
} catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@ -122,8 +122,12 @@ void TSimpleServer::serve() {
}
}
} catch (const TTransportException& ttx) {
string errStr = string("TSimpleServer client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
if (ttx.getType() != TTransportException::END_OF_FILE &&
ttx.getType() != TTransportException::INTERRUPTED)
{
string errStr = string("TSimpleServer client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
} catch (const std::exception& x) {
GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
} catch (...) {
@ -163,6 +167,15 @@ void TSimpleServer::serve() {
stop_ = false;
}
}
void TSimpleServer::stop() {
if (!stop_) {
stop_ = true;
serverTransport_->interrupt();
serverTransport_->interruptChildren();
}
}
}
}
} // apache::thrift::server

View File

@ -32,7 +32,6 @@ namespace server {
* continuous loop of accepting a single connection, processing requests on
* that connection until it closes, and then repeating. It is a good example
* of how to extend the TServer interface.
*
*/
class TSimpleServer : public TServer {
public:
@ -84,14 +83,20 @@ public:
outputProtocolFactory),
stop_(false) {}
~TSimpleServer() {}
/**
* Process one connection at a time using the caller's thread.
* Call stop() on another thread to interrupt processing and
* return control to the caller.
* Post-conditions (return guarantees):
* The serverTransport will be closed.
* There will be no connected client.
*/
void serve();
void stop() {
stop_ = true;
serverTransport_->interrupt();
}
/**
* Interrupt serve() so that it meets post-conditions.
*/
void stop();
protected:
bool stop_;

View File

@ -69,11 +69,12 @@ public:
break;
}
}
} catch (const TTransportException&) {
// This is reasonably expected, client didn't send a full request so just
// ignore him
// string errStr = string("TThreadPoolServer client died: ") + ttx.what();
// GlobalOutput(errStr.c_str());
} catch (const TTransportException& ttx) {
if (ttx.getType() != TTransportException::END_OF_FILE &&
ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
} catch (const std::exception& x) {
GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
} catch (...) {
@ -108,8 +109,7 @@ private:
shared_ptr<TTransport> transport_;
};
TThreadPoolServer::~TThreadPoolServer() {
}
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
@ -160,11 +160,11 @@ void TThreadPoolServer::serve() {
if (client) {
client->close();
}
if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
continue;
if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@ -178,7 +178,7 @@ void TThreadPoolServer::serve() {
string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
} catch (string s) {
} catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@ -207,6 +207,14 @@ void TThreadPoolServer::serve() {
}
}
void TThreadPoolServer::stop() {
if (!stop_) {
stop_ = true;
serverTransport_->interrupt();
serverTransport_->interruptChildren();
}
}
int64_t TThreadPoolServer::getTimeout() const {
return timeout_;
}

View File

@ -109,15 +109,12 @@ public:
virtual void serve();
virtual void stop();
virtual int64_t getTimeout() const;
virtual void setTimeout(int64_t value);
virtual void stop() {
stop_ = true;
serverTransport_->interrupt();
}
virtual int64_t getTaskExpiration() const;
virtual void setTaskExpiration(int64_t value);

View File

@ -55,10 +55,6 @@ public:
~Task() {}
void stop() {
input_->getTransport()->close();
}
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
void* connectionContext = NULL;
@ -76,7 +72,8 @@ public:
}
}
} catch (const TTransportException& ttx) {
if (ttx.getType() != TTransportException::END_OF_FILE) {
if (ttx.getType() != TTransportException::END_OF_FILE &&
ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadedServer client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
@ -130,8 +127,7 @@ void TThreadedServer::init() {
}
}
TThreadedServer::~TThreadedServer() {
}
TThreadedServer::~TThreadedServer() {}
void TThreadedServer::serve() {
@ -196,11 +192,11 @@ void TThreadedServer::serve() {
if (client) {
client->close();
}
if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
continue;
if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@ -214,7 +210,7 @@ void TThreadedServer::serve() {
string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
} catch (string s) {
} catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@ -240,8 +236,6 @@ void TThreadedServer::serve() {
}
try {
Synchronized s(tasksMonitor_);
for ( std::set<Task*>::iterator tIt = tasks_.begin(); tIt != tasks_.end(); ++tIt )
(*tIt)->stop();
while (!tasks_.empty()) {
tasksMonitor_.wait();
}
@ -252,6 +246,14 @@ void TThreadedServer::serve() {
stop_ = false;
}
}
void TThreadedServer::stop() {
if (!stop_) {
stop_ = true;
serverTransport_->interrupt();
serverTransport_->interruptChildren();
}
}
}
}
} // apache::thrift::server

View File

@ -75,11 +75,7 @@ public:
virtual ~TThreadedServer();
virtual void serve();
void stop() {
stop_ = true;
serverTransport_->interrupt();
}
void stop();
protected:
void init();

View File

@ -20,6 +20,7 @@
#include <thrift/thrift-config.h>
#include <cstring>
#include <stdexcept>
#include <sys/types.h>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
@ -69,6 +70,12 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}
void destroyer_of_fine_sockets(THRIFT_SOCKET *ssock)
{
::THRIFT_CLOSESOCKET(*ssock);
delete ssock;
}
namespace apache {
namespace thrift {
namespace transport {
@ -88,9 +95,12 @@ TServerSocket::TServerSocket(int port)
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
intSock1_(THRIFT_INVALID_SOCKET),
intSock2_(THRIFT_INVALID_SOCKET) {
}
interruptableChildren_(true),
listening_(false),
interruptSockWriter_(THRIFT_INVALID_SOCKET),
interruptSockReader_(THRIFT_INVALID_SOCKET),
childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
{}
TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
: port_(port),
@ -104,9 +114,12 @@ TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
intSock1_(THRIFT_INVALID_SOCKET),
intSock2_(THRIFT_INVALID_SOCKET) {
}
interruptableChildren_(true),
listening_(false),
interruptSockWriter_(THRIFT_INVALID_SOCKET),
interruptSockReader_(THRIFT_INVALID_SOCKET),
childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
{}
TServerSocket::TServerSocket(const string& address, int port)
: port_(port),
@ -121,9 +134,12 @@ TServerSocket::TServerSocket(const string& address, int port)
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
intSock1_(THRIFT_INVALID_SOCKET),
intSock2_(THRIFT_INVALID_SOCKET) {
}
interruptableChildren_(true),
listening_(false),
interruptSockWriter_(THRIFT_INVALID_SOCKET),
interruptSockReader_(THRIFT_INVALID_SOCKET),
childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
{}
TServerSocket::TServerSocket(const string& path)
: port_(0),
@ -138,9 +154,12 @@ TServerSocket::TServerSocket(const string& path)
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
intSock1_(THRIFT_INVALID_SOCKET),
intSock2_(THRIFT_INVALID_SOCKET) {
}
interruptableChildren_(true),
listening_(false),
interruptSockWriter_(THRIFT_INVALID_SOCKET),
interruptSockReader_(THRIFT_INVALID_SOCKET),
childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
{}
TServerSocket::~TServerSocket() {
close();
@ -178,18 +197,41 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
tcpRecvBuffer_ = tcpRecvBuffer;
}
void TServerSocket::setInterruptableChildren(bool enable) {
if (listening_) {
throw std::logic_error("setInterruptableChildren cannot be called after listen()");
}
interruptableChildren_ = enable;
}
void TServerSocket::listen() {
listening_ = true;
#ifdef _WIN32
TWinsockSingleton::create();
#endif // _WIN32
THRIFT_SOCKET sv[2];
// Create the socket pair used to interrupt
if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
intSock1_ = THRIFT_INVALID_SOCKET;
intSock2_ = THRIFT_INVALID_SOCKET;
GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
THRIFT_GET_SOCKET_ERROR);
interruptSockWriter_ = THRIFT_INVALID_SOCKET;
interruptSockReader_ = THRIFT_INVALID_SOCKET;
} else {
intSock1_ = sv[1];
intSock2_ = sv[0];
interruptSockWriter_ = sv[1];
interruptSockReader_ = sv[0];
}
// Create the socket pair used to interrupt all clients
if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
THRIFT_GET_SOCKET_ERROR);
childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
pChildInterruptSockReader_.reset();
} else {
childInterruptSockWriter_ = sv[1];
pChildInterruptSockReader_ =
boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]),
destroyer_of_fine_sockets);
}
// Validate port number
@ -469,8 +511,8 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
std::memset(fds, 0, sizeof(fds));
fds[0].fd = serverSocket_;
fds[0].events = THRIFT_POLLIN;
if (intSock2_ != THRIFT_INVALID_SOCKET) {
fds[1].fd = intSock2_;
if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
fds[1].fd = interruptSockReader_;
fds[1].events = THRIFT_POLLIN;
}
/*
@ -491,9 +533,9 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check for an interrupt signal
if (intSock2_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
if (interruptSockReader_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
int8_t buf;
if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
if (-1 == recv(interruptSockReader_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ",
THRIFT_GET_SOCKET_ERROR);
}
@ -562,16 +604,28 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
}
shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
return shared_ptr<TSocket>(new TSocket(clientSocket));
if (interruptableChildren_) {
return shared_ptr<TSocket>(new TSocket(clientSocket, pChildInterruptSockReader_));
} else {
return shared_ptr<TSocket>(new TSocket(clientSocket));
}
}
void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
if (notifySocket != THRIFT_INVALID_SOCKET) {
int8_t byte = 0;
if (-1 == send(notifySocket, cast_sockopt(&byte), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::notify() send() ", THRIFT_GET_SOCKET_ERROR);
}
}
}
void TServerSocket::interrupt() {
if (intSock1_ != THRIFT_INVALID_SOCKET) {
int8_t byte = 0;
if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
}
}
notify(interruptSockWriter_);
}
void TServerSocket::interruptChildren() {
notify(childInterruptSockWriter_);
}
void TServerSocket::close() {
@ -579,16 +633,23 @@ void TServerSocket::close() {
shutdown(serverSocket_, THRIFT_SHUT_RDWR);
::THRIFT_CLOSESOCKET(serverSocket_);
}
if (intSock1_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(intSock1_);
if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(interruptSockWriter_);
}
if (intSock2_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(intSock2_);
if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(interruptSockReader_);
}
if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(childInterruptSockWriter_);
}
serverSocket_ = THRIFT_INVALID_SOCKET;
intSock1_ = THRIFT_INVALID_SOCKET;
intSock2_ = THRIFT_INVALID_SOCKET;
interruptSockWriter_ = THRIFT_INVALID_SOCKET;
interruptSockReader_ = THRIFT_INVALID_SOCKET;
childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
pChildInterruptSockReader_.reset();
listening_ = false;
}
}
}
} // apache::thrift::transport

View File

@ -100,17 +100,33 @@ public:
// socket, this is the place to do it.
void setAcceptCallback(const socket_func_t& acceptCallback) { acceptCallback_ = acceptCallback; }
void listen();
void close();
// When enabled (the default), new children TSockets will be constructed so
// they can be interrupted by TServerTransport::interruptChildren().
// This is more expensive in terms of system calls (poll + recv) however
// ensures a connected client cannot interfere with TServer::stop().
//
// When disabled, TSocket children do not incur an additional poll() call.
// Server-side reads are more efficient, however a client can interfere with
// the server's ability to shutdown properly by staying connected.
//
// Must be called before listen(); mode cannot be switched after that.
// \throws std::logic_error if listen() has been called
void setInterruptableChildren(bool enable);
void interrupt();
int getPort();
void listen();
void interrupt();
void interruptChildren();
void close();
protected:
boost::shared_ptr<TTransport> acceptImpl();
virtual boost::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
private:
void notify(THRIFT_SOCKET notifySock);
int port_;
std::string address_;
std::string path_;
@ -124,9 +140,13 @@ private:
int tcpSendBuffer_;
int tcpRecvBuffer_;
bool keepAlive_;
bool interruptableChildren_;
bool listening_;
THRIFT_SOCKET intSock1_;
THRIFT_SOCKET intSock2_;
THRIFT_SOCKET interruptSockWriter_; // is notified on interrupt()
THRIFT_SOCKET interruptSockReader_; // is used in select/poll with serverSocket_ for interruptability
THRIFT_SOCKET childInterruptSockWriter_; // is notified on interruptChildren()
boost::shared_ptr<THRIFT_SOCKET> pChildInterruptSockReader_; // if interruptableChildren_ this is shared with child TSockets
socket_func_t listenCallback_;
socket_func_t acceptCallback_;

View File

@ -30,8 +30,9 @@ namespace transport {
/**
* Server transport framework. A server needs to have some facility for
* creating base transports to read/write from.
*
* creating base transports to read/write from. The server is expected
* to keep track of TTransport children that it creates for purposes of
* controlling their lifetime.
*/
class TServerTransport {
public:
@ -67,10 +68,20 @@ public:
* For "smart" TServerTransport implementations that work in a multi
* threaded context this can be used to break out of an accept() call.
* It is expected that the transport will throw a TTransportException
* with the interrupted error code.
* with the INTERRUPTED error code.
*
* This will not make an attempt to interrupt any TTransport children.
*/
virtual void interrupt() {}
/**
* This will interrupt the children created by the server transport.
* allowing them to break out of any blocking data reception call.
* It is expected that the children will throw a TTransportException
* with the INTERRUPTED error code.
*/
virtual void interruptChildren() {}
/**
* Closes this transport such that future calls to accept will do nothing.
*/

View File

@ -74,7 +74,7 @@ using namespace std;
*
*/
TSocket::TSocket(string host, int port)
TSocket::TSocket(const string& host, int port)
: host_(host),
port_(port),
path_(""),
@ -89,7 +89,7 @@ TSocket::TSocket(string host, int port)
maxRecvRetries_(5) {
}
TSocket::TSocket(string path)
TSocket::TSocket(const string& path)
: host_(""),
port_(0),
path_(path),
@ -143,6 +143,30 @@ TSocket::TSocket(THRIFT_SOCKET socket)
#endif
}
TSocket::TSocket(THRIFT_SOCKET socket,
boost::shared_ptr<THRIFT_SOCKET> interruptListener) :
host_(""),
port_(0),
path_(""),
socket_(socket),
interruptListener_(interruptListener),
connTimeout_(0),
sendTimeout_(0),
recvTimeout_(0),
keepAlive_(false),
lingerOn_(1),
lingerVal_(0),
noDelay_(1),
maxRecvRetries_(5) {
cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
#ifdef SO_NOSIGPIPE
{
int one = 1;
setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
}
#endif
}
TSocket::~TSocket() {
close();
}
@ -153,8 +177,41 @@ bool TSocket::isOpen() {
bool TSocket::peek() {
if (!isOpen()) {
return false;
return false;
}
if (interruptListener_)
{
for (int retries = 0; ; ) {
struct THRIFT_POLLFD fds[2];
std::memset(fds, 0, sizeof(fds));
fds[0].fd = socket_;
fds[0].events = THRIFT_POLLIN;
fds[1].fd = *(interruptListener_.get());
fds[1].events = THRIFT_POLLIN;
int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
int errno_copy = THRIFT_GET_SOCKET_ERROR;
if (ret < 0) {
// error cases
if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
continue;
}
GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check the interruptListener
if (fds[1].revents & THRIFT_POLLIN) {
return false;
}
// There must be data or a disconnection, fall through to the PEEK
break;
} else {
// timeout
return false;
}
}
}
// Check to see if data is available or if the remote side closed
uint8_t buf;
int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
if (r == -1) {
@ -455,9 +512,44 @@ try_again:
// an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
begin.tv_sec = begin.tv_usec = 0;
}
int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
int errno_copy = THRIFT_GET_SOCKET_ERROR; // THRIFT_GETTIMEOFDAY can change
// THRIFT_GET_SOCKET_ERROR
int got = 0;
if (interruptListener_)
{
struct THRIFT_POLLFD fds[2];
std::memset(fds, 0, sizeof(fds));
fds[0].fd = socket_;
fds[0].events = THRIFT_POLLIN;
fds[1].fd = *(interruptListener_.get());
fds[1].events = THRIFT_POLLIN;
int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
int errno_copy = THRIFT_GET_SOCKET_ERROR;
if (ret < 0) {
// error cases
if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
goto try_again;
}
GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check the interruptListener
if (fds[1].revents & THRIFT_POLLIN) {
throw TTransportException(TTransportException::INTERRUPTED,
"Interrupted");
}
} else /* ret == 0 */ {
throw TTransportException(TTransportException::TIMED_OUT,
"THRIFT_EAGAIN (timed out)");
}
// falling through means there is something to recv and it cannot block
}
got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
// THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
int errno_copy = THRIFT_GET_SOCKET_ERROR;
// Check for error on read
if (got < 0) {
@ -493,29 +585,9 @@ try_again:
goto try_again;
}
#if defined __FreeBSD__ || defined __MACH__
if (errno_copy == THRIFT_ECONNRESET) {
/* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
* THRIFT_ECONNRESET if peer performed shutdown
* edhall: eliminated close() since we do that in the destructor.
*/
return 0;
}
#endif
#ifdef _WIN32
if (errno_copy == WSAECONNRESET) {
return 0; // EOF
}
#endif
// Now it's not a try again case, but a real probblez
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
// If we disconnect with no linger time
if (errno_copy == THRIFT_ECONNRESET) {
throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
}
// This ish isn't open
if (errno_copy == THRIFT_ENOTCONN) {
@ -527,18 +599,13 @@ try_again:
throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
}
// Now it's not a try again case, but a real probblez
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
// Some other error, whatevz
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
}
// The remote host has closed the socket
if (got == 0) {
// edhall: we used to call close() here, but our caller may want to deal
// with the socket fd and we'll close() in our destructor in any case.
return 0;
}
// Pack data into string
return got;
}

View File

@ -61,7 +61,7 @@ public:
* @param host An IP address or hostname to connect to
* @param port The port to connect on
*/
TSocket(std::string host, int port);
TSocket(const std::string& host, int port);
/**
* Constructs a new Unix domain socket.
@ -69,7 +69,7 @@ public:
*
* @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
*/
TSocket(std::string path);
TSocket(const std::string& path);
/**
* Destroyes the socket object, closing it if necessary.
@ -102,6 +102,13 @@ public:
/**
* Reads from the underlying socket.
* \returns the number of bytes read or 0 indicates EOF
* \throws TTransportException of types:
* INTERRUPTED means the socket was interrupted
* out of a blocking call
* NOT_OPEN means the socket has been closed
* TIMED_OUT means the receive timeout expired
* UNKNOWN means something unexpected happened
*/
virtual uint32_t read(uint8_t* buf, uint32_t len);
@ -242,10 +249,17 @@ public:
virtual const std::string getOrigin();
/**
* Constructor to create socket from raw UNIX handle.
* Constructor to create socket from file descriptor.
*/
TSocket(THRIFT_SOCKET socket);
/**
* Constructor to create socket from file descriptor that
* can be interrupted safely.
*/
TSocket(THRIFT_SOCKET socket,
boost::shared_ptr<THRIFT_SOCKET> interruptListener);
/**
* Set a cache of the peer address (used when trivially available: e.g.
* accept() or connect()). Only caches IPV4 and IPV6; unset for others.
@ -274,9 +288,15 @@ protected:
/** UNIX domain socket path */
std::string path_;
/** Underlying UNIX socket handle */
/** Underlying socket handle */
THRIFT_SOCKET socket_;
/**
* A shared socket pointer that will interrupt a blocking read if data
* becomes available on it
*/
boost::shared_ptr<THRIFT_SOCKET> interruptListener_;
/** Connect timeout in ms */
int connTimeout_;

View File

@ -20,7 +20,7 @@
# Find required packages
set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework
find_package(Boost 1.53.0 REQUIRED COMPONENTS unit_test_framework)
find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono system thread unit_test_framework)
include_directories(SYSTEM "${Boost_INCLUDE_DIRS}")
#Make sure gen-cpp files can be included
@ -50,6 +50,8 @@ target_link_libraries(testgencpp thrift)
set(testgencpp_cob_SOURCES
gen-cpp/ChildService.cpp
gen-cpp/ChildService.h
gen-cpp/EmptyService.cpp
gen-cpp/EmptyService.h
gen-cpp/ParentService.cpp
gen-cpp/ParentService.h
gen-cpp/proc_types.cpp
@ -71,6 +73,7 @@ set(UnitTest_SOURCES
ToStringTest.cpp
TypedefTest.cpp
TServerSocketTest.cpp
TServerTransportTest.cpp
)
if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS)
@ -81,6 +84,26 @@ add_executable(UnitTests ${UnitTest_SOURCES})
target_link_libraries(UnitTests testgencpp thrift ${Boost_LIBRARIES})
add_test(NAME UnitTests COMMAND UnitTests)
add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
target_link_libraries(TSocketInterruptTest
testgencpp
${Boost_LIBRARIES}
#-lrt
)
if (NOT MSVC)
target_link_libraries(TSocketInterruptTest -lrt)
endif ()
add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
target_link_libraries(TServerIntegrationTest
testgencpp_cob
${Boost_LIBRARIES}
)
if (NOT MSVC)
target_link_libraries(TServerIntegrationTest -lrt)
endif ()
add_test(NAME TServerIntegrationTest COMMAND TServerIntegrationTest)
if(WITH_ZLIB)
add_executable(TransportTest TransportTest.cpp)
@ -255,7 +278,7 @@ endif()
#
add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h
add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h
COMMAND thrift-compiler --gen cpp:dense ${PROJECT_SOURCE_DIR}/test/DebugProtoTest.thrift
)

View File

@ -25,6 +25,7 @@ BUILT_SOURCES = gen-cpp/DebugProtoTest_types.h \
gen-cpp/ThriftTest_types.h \
gen-cpp/TypedefTest_types.h \
gen-cpp/ChildService.h \
gen-cpp/EmptyService.h \
gen-cpp/ParentService.h \
gen-cpp/proc_types.h
@ -50,6 +51,8 @@ nodist_libtestgencpp_la_SOURCES = \
nodist_libprocessortest_la_SOURCES = \
gen-cpp/ChildService.cpp \
gen-cpp/ChildService.h \
gen-cpp/EmptyService.cpp \
gen-cpp/EmptyService.h \
gen-cpp/ParentService.cpp \
gen-cpp/ParentService.h \
gen-cpp/proc_types.cpp \
@ -79,6 +82,8 @@ check_PROGRAMS = \
SpecializationTest \
AllProtocolsTest \
TransportTest \
TSocketInterruptTest \
TServerIntegrationTest \
ZlibTest \
TFileTransportTest \
link_test \
@ -107,17 +112,38 @@ UnitTests_SOURCES = \
Base64Test.cpp \
ToStringTest.cpp \
TypedefTest.cpp \
TServerSocketTest.cpp
TServerSocketTest.cpp \
TServerTransportTest.cpp
if !WITH_BOOSTTHREADS
UnitTests_SOURCES += \
RWMutexStarveTest.cpp
RWMutexStarveTest.cpp
endif
UnitTests_LDADD = \
libtestgencpp.la \
$(BOOST_TEST_LDADD)
TSocketInterruptTest_SOURCES = \
TSocketInterruptTest.cpp
TSocketInterruptTest_LDADD = \
libtestgencpp.la \
$(BOOST_TEST_LDADD) \
$(BOOST_CHRONO_LDADD) \
$(BOOST_SYSTEM_LDADD) \
$(BOOST_THREAD_LDADD)
TServerIntegrationTest_SOURCES = \
TServerIntegrationTest.cpp
TServerIntegrationTest_LDADD = \
libtestgencpp.la \
libprocessortest.la \
$(BOOST_TEST_LDADD) \
$(BOOST_SYSTEM_LDADD) \
$(BOOST_THREAD_LDADD)
TransportTest_SOURCES = \
TransportTest.cpp
@ -273,7 +299,7 @@ OpenSSLManualInitTest_LDADD = \
#
THRIFT = $(top_builddir)/compiler/cpp/thrift
gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h: $(top_srcdir)/test/DebugProtoTest.thrift
gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h: $(top_srcdir)/test/DebugProtoTest.thrift
$(THRIFT) --gen cpp:dense $<
gen-cpp/EnumTest_types.cpp gen-cpp/EnumTest_types.h: $(top_srcdir)/test/EnumTest.thrift

View File

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <thrift/server/TThreadedServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransport.h>
#include "gen-cpp/ParentService.h"
#include "TestPortFixture.h"
#include <vector>
using apache::thrift::concurrency::Guard;
using apache::thrift::concurrency::Monitor;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Synchronized;
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::protocol::TBinaryProtocolFactory;
using apache::thrift::protocol::TProtocol;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportFactory;
using apache::thrift::server::TServerEventHandler;
using apache::thrift::server::TThreadedServer;
using apache::thrift::test::ParentServiceClient;
using apache::thrift::test::ParentServiceIf;
using apache::thrift::test::ParentServiceProcessor;
/**
* preServe runs after listen() is successful, when we can connect
*/
class TServerReadyEventHandler : public TServerEventHandler, public Monitor
{
public:
TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
virtual ~TServerReadyEventHandler() {}
virtual void preServe() {
Synchronized sync(*this);
isListening_ = true;
notify();
}
virtual void* createContext(boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output) {
Synchronized sync(*this);
++accepted_;
notify();
(void)input;
(void)output;
return NULL;
}
bool isListening() const { return isListening_; }
uint64_t acceptedCount() const { return accepted_; }
private:
bool isListening_;
uint64_t accepted_;
};
class ParentHandler : virtual public ParentServiceIf {
public:
ParentHandler() : generation_(0) {}
int32_t incrementGeneration() {
Guard g(mutex_);
return ++generation_;
}
int32_t getGeneration() {
Guard g(mutex_);
return generation_;
}
void addString(const std::string& s) {
Guard g(mutex_);
strings_.push_back(s);
}
void getStrings(std::vector<std::string>& _return) {
Guard g(mutex_);
_return = strings_;
}
void getDataWait(std::string& _return, int32_t length) {
}
void onewayWait() {
}
void exceptionWait(const std::string& message) {
}
void unexpectedExceptionWait(const std::string& message) {
}
protected:
Mutex mutex_;
int32_t generation_;
std::vector<std::string> strings_;
};
class TServerIntegrationTestFixture : public TestPortFixture
{
public:
TServerIntegrationTestFixture() :
pServer(new TThreadedServer(
boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
boost::shared_ptr<TTransportFactory>(new TTransportFactory),
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
{
pServer->setServerEventHandler(pEventHandler);
}
void startServer() {
pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
// block until listen() completes so clients will be able to connect
Synchronized sync(*(pEventHandler.get()));
while (!pEventHandler->isListening()) {
pEventHandler->wait();
}
BOOST_MESSAGE("server is listening");
}
void blockUntilAccepted(uint64_t numAccepted) {
Synchronized sync(*(pEventHandler.get()));
while (pEventHandler->acceptedCount() < numAccepted) {
pEventHandler->wait();
}
BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
}
void stopServer() {
pServer->stop();
BOOST_MESSAGE("server stop completed");
pServerThread->join();
BOOST_MESSAGE("server thread joined");
}
~TServerIntegrationTestFixture() {
stopServer();
}
void delayClose(boost::shared_ptr<TTransport> toClose) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
toClose->close();
}
boost::shared_ptr<TThreadedServer> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
};
BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
{
// this test establishes some basic sanity
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
ParentServiceClient client1(pClientProtocol1);
pClientSock1->open();
client1.incrementGeneration();
pClientSock1->close();
stopServer();
}
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
{
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
pClientSock1->open();
boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
// The test fixture destructor will force the sockets to disconnect
// Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
stopServer();
// extra proof the server end disconnected the clients
uint8_t buf[1];
BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
pClientSock1->close();
pClientSock2->close();
}
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
{
// This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
// disconnect.
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
pClientSock1->open();
boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
// Once the clients disconnect the server will stop
stopServer();
pClientSock1->close();
pClientSock2->close();
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -22,6 +22,7 @@
#include <thrift/transport/TServerSocket.h>
#include "TestPortFixture.h"
#include "TTransportCheckThrow.h"
#include <iostream>
using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TSocket;
@ -30,24 +31,18 @@ using apache::thrift::transport::TTransportException;
BOOST_FIXTURE_TEST_SUITE ( TServerSocketTest, TestPortFixture )
class TestTServerSocket : public TServerSocket
{
public:
TestTServerSocket(const std::string& address, int port) : TServerSocket(address, port) { }
using TServerSocket::acceptImpl;
};
BOOST_AUTO_TEST_CASE( test_bind_to_address )
{
TestTServerSocket sock1("localhost", m_serverPort);
TServerSocket sock1("localhost", m_serverPort);
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
boost::shared_ptr<TTransport> accepted = sock1.acceptImpl();
boost::shared_ptr<TTransport> accepted = sock1.accept();
accepted->close();
sock1.close();
TServerSocket sock2("this.is.truly.an.unrecognizable.address.", m_serverPort);
std::cout << "An error message from getaddrinfo on the console is expected:" << std::endl;
TServerSocket sock2("257.258.259.260", m_serverPort);
BOOST_CHECK_THROW(sock2.listen(), TTransportException);
sock2.close();
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <boost/test/auto_unit_test.hpp>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerTransport.h>
#include "TestPortFixture.h"
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
BOOST_AUTO_TEST_SUITE ( TServerTransportTest )
class TestTTransport : public TTransport
{
};
class TestTServerTransport : public TServerTransport
{
public:
TestTServerTransport() : valid_(true) {}
void close() {}
bool valid_;
protected:
boost::shared_ptr<TTransport> acceptImpl()
{
return valid_ ? boost::shared_ptr<TestTTransport>(new TestTTransport) : boost::shared_ptr<TestTTransport>();
}
};
BOOST_AUTO_TEST_CASE( test_positive_accept )
{
TestTServerTransport uut;
BOOST_CHECK(uut.accept());
}
BOOST_AUTO_TEST_CASE( test_negative_accept )
{
TestTServerTransport uut;
uut.valid_ = false;
BOOST_CHECK_THROW(uut.accept(), TTransportException);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#define BOOST_TEST_MODULE TSocketInterruptTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
#include <boost/chrono/duration.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/thread/thread.hpp>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerSocket.h>
#include "TestPortFixture.h"
using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
BOOST_FIXTURE_TEST_SUITE ( TSocketInterruptTest, TestPortFixture )
void readerWorker(boost::shared_ptr<TTransport> tt, uint32_t expectedResult)
{
uint8_t buf[4];
BOOST_CHECK_EQUAL(expectedResult, tt->read(buf, 4));
}
void readerWorkerMustThrow(boost::shared_ptr<TTransport> tt)
{
try
{
uint8_t buf[4];
tt->read(buf, 4);
BOOST_ERROR("should not have gotten here");
}
catch (const TTransportException& tx)
{
BOOST_CHECK_EQUAL(TTransportException::INTERRUPTED, tx.getType());
}
}
BOOST_AUTO_TEST_CASE( test_interruptable_child_read )
{
TServerSocket sock1("localhost", m_serverPort);
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
boost::shared_ptr<TTransport> accepted = sock1.accept();
boost::thread readThread(boost::bind(readerWorkerMustThrow, accepted));
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
// readThread is practically guaranteed to be blocking now
sock1.interruptChildren();
BOOST_CHECK_MESSAGE(readThread.try_join_for(boost::chrono::milliseconds(200)),
"server socket interruptChildren did not interrupt child read");
clientSock.close();
accepted->close();
sock1.close();
}
BOOST_AUTO_TEST_CASE( test_non_interruptable_child_read )
{
TServerSocket sock1("localhost", m_serverPort);
sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
boost::shared_ptr<TTransport> accepted = sock1.accept();
boost::thread readThread(boost::bind(readerWorker, accepted, 0));
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
// readThread is practically guaranteed to be blocking here
sock1.interruptChildren();
BOOST_CHECK_MESSAGE(!readThread.try_join_for(boost::chrono::milliseconds(200)),
"server socket interruptChildren interrupted child read");
// only way to proceed is to have the client disconnect
clientSock.close();
readThread.join();
accepted->close();
sock1.close();
}
BOOST_AUTO_TEST_CASE( test_cannot_change_after_listen )
{
TServerSocket sock1("localhost", m_serverPort);
sock1.listen();
BOOST_CHECK_THROW(sock1.setInterruptableChildren(false), std::logic_error);
sock1.close();
}
void peekerWorker(boost::shared_ptr<TTransport> tt, bool expectedResult)
{
BOOST_CHECK_EQUAL(expectedResult, tt->peek());
}
BOOST_AUTO_TEST_CASE( test_interruptable_child_peek )
{
TServerSocket sock1("localhost", m_serverPort);
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
boost::shared_ptr<TTransport> accepted = sock1.accept();
// peek() will return false if child is interrupted
boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
// peekThread is practically guaranteed to be blocking now
sock1.interruptChildren();
BOOST_CHECK_MESSAGE(peekThread.try_join_for(boost::chrono::milliseconds(200)),
"server socket interruptChildren did not interrupt child peek");
clientSock.close();
accepted->close();
sock1.close();
}
BOOST_AUTO_TEST_CASE( test_non_interruptable_child_peek )
{
TServerSocket sock1("localhost", m_serverPort);
sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
boost::shared_ptr<TTransport> accepted = sock1.accept();
// peek() will return false when remote side is closed
boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
// peekThread is practically guaranteed to be blocking now
sock1.interruptChildren();
BOOST_CHECK_MESSAGE(!peekThread.try_join_for(boost::chrono::milliseconds(200)),
"server socket interruptChildren interrupted child peek");
// only way to proceed is to have the client disconnect
clientSock.close();
peekThread.join();
accepted->close();
sock1.close();
}
BOOST_AUTO_TEST_SUITE_END()