Merge pull request #2449 from BioDataAnalysis/bda_unify_nonblockingserversocket

Updated TNonblockingServerSocket to better match TServerSocket
This commit is contained in:
Mario Emmenlauer 2021-09-02 08:51:06 +02:00 committed by GitHub
commit b8069cbe9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 50 deletions

View File

@ -43,6 +43,9 @@
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#include <thrift/transport/PlatformSocket.h>
#include <thrift/transport/TNonblockingServerSocket.h>
@ -72,13 +75,13 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}
using std::shared_ptr;
using std::string;
namespace apache {
namespace thrift {
namespace transport {
using std::shared_ptr;
using std::string;
TNonblockingServerSocket::TNonblockingServerSocket(int port)
: port_(port),
listenPort_(port),
@ -145,6 +148,35 @@ TNonblockingServerSocket::~TNonblockingServerSocket() {
close();
}
bool TNonblockingServerSocket::isOpen() const {
if (serverSocket_ == THRIFT_INVALID_SOCKET)
return false;
if (!listening_)
return false;
if (isUnixDomainSocket() && (path_[0] != '\0')) {
// On some platforms the domain socket file may not be instantly
// available yet, i.e. the Windows file system can be slow. Therefore
// we should check that the domain socket file actually exists.
#ifdef _MSC_VER
// Currently there is a bug in ClangCl on Windows so the stat() call
// does not work. Workaround is a Windows-specific call if file exists:
DWORD const f_attrib = GetFileAttributesA(path_.c_str());
if (f_attrib == INVALID_FILE_ATTRIBUTES) {
#else
struct THRIFT_STAT path_info;
if (::THRIFT_STAT(path_.c_str(), &path_info) < 0) {
#endif
const std::string vError = "TNonblockingServerSocket::isOpen(): The domain socket path '" + path_ + "' does not exist (yet).";
GlobalOutput.perror(vError.c_str(), THRIFT_GET_SOCKET_ERROR);
return false;
}
}
return true;
}
void TNonblockingServerSocket::setSendTimeout(int sendTimeout) {
sendTimeout_ = sendTimeout;
}
@ -174,26 +206,28 @@ void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
}
void TNonblockingServerSocket::_setup_sockopts() {
// Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
int one = 1;
if (-1 == setsockopt(serverSocket_,
SOL_SOCKET,
THRIFT_NO_SOCKET_CACHING,
cast_sockopt(&one),
sizeof(one))) {
// ignore errors coming out of this setsockopt on Windows. This is because
// SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
// want to force servers to be an admin.
#ifndef _WIN32
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN,
"Could not set THRIFT_NO_SOCKET_CACHING",
errno_copy);
#endif
if (!isUnixDomainSocket()) {
// Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept.
// This does not work with Domain sockets on most platforms. And
// on Windows it completely breaks the socket. Therefore do not
// use this on Domain sockets.
if (-1 == setsockopt(serverSocket_,
SOL_SOCKET,
THRIFT_NO_SOCKET_CACHING,
cast_sockopt(&one),
sizeof(one))) {
// NOTE: SO_EXCLUSIVEADDRUSE socket option can only be used by members
// of the Administrators security group on Windows XP and earlier. But
// we do not target WinXP anymore so no special checks required.
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN,
"Could not set THRIFT_NO_SOCKET_CACHING",
errno_copy);
}
}
// Set TCP buffer sizes
@ -265,8 +299,10 @@ void TNonblockingServerSocket::_setup_sockopts() {
"THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
errno_copy);
}
}
} // _setup_sockopts()
void TNonblockingServerSocket::_setup_unixdomain_sockopts() {
}
void TNonblockingServerSocket::_setup_tcp_sockopts() {
int one = 1;
@ -302,13 +338,10 @@ void TNonblockingServerSocket::_setup_tcp_sockopts() {
} // _setup_tcp_sockopts()
void TNonblockingServerSocket::listen() {
listening_ = true;
#ifdef _WIN32
TWinsockSingleton::create();
#endif // _WIN32
// tcp == false means Unix Domain socket
bool tcp = (path_.empty());
// Validate port number
if (port_ < 0 || port_ > 0xFFFF) {
@ -317,12 +350,16 @@ void TNonblockingServerSocket::listen() {
// Resolve host:port strings into an iterable of struct addrinfo*
AddressResolutionHelper resolved_addresses;
if (tcp) {
if (!isUnixDomainSocket()) {
try {
resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
#ifdef ANDROID
AI_PASSIVE | AI_ADDRCONFIG);
#else
AI_PASSIVE | AI_V4MAPPED);
#endif
} catch (const std::system_error& e) {
GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what());
GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what());
close();
throw TTransportException(TTransportException::NOT_OPEN,
"Could not resolve host for server socket.");
@ -334,14 +371,14 @@ void TNonblockingServerSocket::listen() {
int retries = 0;
int errno_copy = 0;
if (!tcp) {
if (isUnixDomainSocket()) {
// -- Unix Domain Socket -- //
serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
if (serverSocket_ == THRIFT_INVALID_SOCKET) {
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN,
"Could not create server socket.",
@ -349,14 +386,11 @@ void TNonblockingServerSocket::listen() {
}
_setup_sockopts();
//_setup_unixdomain_sockopts();
/*
* TODO: seems that windows now support unix sockets,
* see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
*/
#ifndef _WIN32
_setup_unixdomain_sockopts();
// Windows supports Unix domain sockets since it ships the header
// HAVE_AF_UNIX_H (see https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/)
#if (!defined(_WIN32) || defined(HAVE_AF_UNIX_H))
struct sockaddr_un address;
socklen_t structlen = fillUnixSocketAddr(address, path_);
@ -368,12 +402,11 @@ void TNonblockingServerSocket::listen() {
// use short circuit evaluation here to only sleep if we need to
} while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
#else
GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
GlobalOutput.perror("TNonblockingServerSocket::open() Unix Domain socket path not supported on this version of Windows", -99);
throw TTransportException(TTransportException::NOT_OPEN,
" Unix Domain socket path not supported");
#endif
} else {
// -- TCP socket -- //
auto addr_iter = AddressResolutionHelper::Iter{};
@ -405,7 +438,7 @@ void TNonblockingServerSocket::listen() {
IPV6_V6ONLY,
cast_sockopt(&zero),
sizeof(zero))) {
GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
GlobalOutput.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
}
}
#endif // #ifdef IPV6_V6ONLY
@ -440,7 +473,7 @@ void TNonblockingServerSocket::listen() {
// throw error if socket still wasn't created successfully
if (serverSocket_ == THRIFT_INVALID_SOCKET) {
GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN,
"Could not create server socket.",
@ -450,10 +483,15 @@ void TNonblockingServerSocket::listen() {
// throw an error if we failed to bind properly
if (retries > retryLimit_) {
char errbuf[1024];
if (!tcp) {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
if (isUnixDomainSocket()) {
#ifdef _WIN32
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to domain socket path %s, error %d", path_.c_str(), WSAGetLastError());
#else
// Fixme: This does not currently handle abstract domain sockets:
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to domain socket path %s", path_.c_str());
#endif
} else {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to port %d", port_);
}
GlobalOutput(errbuf);
close();
@ -474,6 +512,7 @@ void TNonblockingServerSocket::listen() {
}
// The socket is now listening!
listening_ = true;
}
int TNonblockingServerSocket::getPort() {
@ -484,6 +523,14 @@ int TNonblockingServerSocket::getListenPort() {
return listenPort_;
}
std::string TNonblockingServerSocket::getPath() const {
return path_;
}
bool TNonblockingServerSocket::isUnixDomainSocket() const {
return !path_.empty();
}
shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
if (serverSocket_ == THRIFT_INVALID_SOCKET) {
throw TTransportException(TTransportException::NOT_OPEN,
@ -524,6 +571,7 @@ shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
}
shared_ptr<TSocket> client = createSocket(clientSocket);
client->setPath(path_);
if (sendTimeout_ > 0) {
client->setSendTimeout(sendTimeout_);
}

View File

@ -73,6 +73,8 @@ public:
~TNonblockingServerSocket() override;
bool isOpen() const;
void setSendTimeout(int sendTimeout);
void setRecvTimeout(int recvTimeout);
@ -103,6 +105,10 @@ public:
int getListenPort() override;
std::string getPath() const;
bool isUnixDomainSocket() const;
void listen() override;
void close() override;
@ -111,6 +117,10 @@ protected:
virtual std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
private:
void _setup_sockopts();
void _setup_unixdomain_sockopts();
void _setup_tcp_sockopts();
int port_;
int listenPort_;
std::string address_;
@ -128,9 +138,6 @@ private:
socket_func_t listenCallback_;
socket_func_t acceptCallback_;
void _setup_sockopts();
void _setup_tcp_sockopts();
};
}
}

View File

@ -90,13 +90,13 @@ void destroyer_of_fine_sockets(THRIFT_SOCKET* ssock) {
delete ssock;
}
using std::shared_ptr;
using std::string;
namespace apache {
namespace thrift {
namespace transport {
using std::shared_ptr;
TServerSocket::TServerSocket(int port)
: interruptableChildren_(true),
@ -249,12 +249,12 @@ void TServerSocket::setInterruptableChildren(bool enable) {
}
void TServerSocket::_setup_sockopts() {
int one = 1;
if (!isUnixDomainSocket()) {
// Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept.
// This does not work with Domain sockets on most platforms. And
// on Windows it completely breaks the socket. Therefore do not
// use this on Domain sockets.
int one = 1;
if (-1 == setsockopt(serverSocket_,
SOL_SOCKET,
THRIFT_NO_SOCKET_CACHING,
@ -458,7 +458,6 @@ void TServerSocket::listen() {
" Unix Domain socket path not supported");
#endif
} else {
// -- TCP socket -- //
auto addr_iter = AddressResolutionHelper::Iter{};