THRIFT-1690 Sockets and Pipe Handles truncated on Win64

Patch: Ben Craig


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1394182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Roger Meier 2012-10-04 18:02:15 +00:00
parent bd5db3aa70
commit b69d24dbf7
35 changed files with 353 additions and 372 deletions

View File

@ -114,8 +114,8 @@ class Monitor::Impl : public boost::condition_variable_any {
struct timespec currenttime;
Util::toTimespec(currenttime, Util::currentTime());
long tv_sec = abstime->tv_sec - currenttime.tv_sec;
long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
long tv_nsec = static_cast<long>(abstime->tv_nsec - currenttime.tv_nsec);
if(tv_sec < 0)
tv_sec = 0;
if(tv_nsec < 0)

View File

@ -102,7 +102,7 @@ class TimerManager::Dispatcher: public Runnable {
assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
try {
manager_->monitor_.wait(timeout);
} catch (TimedOutException &e) {}
} catch (TimedOutException &) {}
now = Util::currentTime();
}
@ -140,12 +140,20 @@ class TimerManager::Dispatcher: public Runnable {
friend class TimerManager;
};
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4355) // 'this' used in base member initializer list
#endif
TimerManager::TimerManager() :
taskCount_(0),
state_(TimerManager::UNINITIALIZED),
dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
TimerManager::~TimerManager() {

View File

@ -38,6 +38,7 @@ int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
struct timespec now;
int ret = clock_gettime(CLOCK_REALTIME, &now);
assert(ret == 0);
ret = ret; //squelching "unused variable" warning
toTicks(result, now, ticksPerSec);
#elif defined(HAVE_GETTIMEOFDAY)
struct timeval now;

View File

@ -67,7 +67,7 @@ class Util {
}
static void toTimeval(struct timeval& result, int64_t value) {
result.tv_sec = value / MS_PER_S; // ms to s
result.tv_sec = (uint32_t)(value / MS_PER_S); // ms to s
result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
}

View File

@ -178,7 +178,9 @@ uint32_t TBinaryProtocolT<Transport_>::writeDouble(const double dub) {
template <class Transport_>
template<typename StrType>
uint32_t TBinaryProtocolT<Transport_>::writeString(const StrType& str) {
uint32_t size = str.size();
if(str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t size = static_cast<uint32_t>(str.size());
uint32_t result = writeI32((int32_t)size);
if (size > 0) {
this->trans_->write((uint8_t*)str.data(), size);

View File

@ -32,6 +32,7 @@ using std::string;
static string byte_to_hex(const uint8_t byte) {
char buf[3];
int ret = std::sprintf(buf, "%02x", (int)byte);
ret = ret; //squelching "unused variable" warning
assert(ret == 2);
assert(buf[2] == '\0');
return buf;
@ -74,14 +75,23 @@ void TDebugProtocol::indentDown() {
}
uint32_t TDebugProtocol::writePlain(const string& str) {
trans_->write((uint8_t*)str.data(), str.length());
return str.length();
if(str.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
trans_->write((uint8_t*)str.data(), static_cast<uint32_t>(str.length()));
return static_cast<uint32_t>(str.length());
}
uint32_t TDebugProtocol::writeIndented(const string& str) {
trans_->write((uint8_t*)indent_str_.data(), indent_str_.length());
trans_->write((uint8_t*)str.data(), str.length());
return indent_str_.length() + str.length();
if(str.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
if(indent_str_.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint64_t total_len = indent_str_.length() + str.length();
if(total_len > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
trans_->write((uint8_t*)indent_str_.data(), static_cast<uint32_t>(indent_str_.length()));
trans_->write((uint8_t*)str.data(), static_cast<uint32_t>(str.length()));
return static_cast<uint32_t>(indent_str_.length() + str.length());
}
uint32_t TDebugProtocol::startItem() {

View File

@ -245,7 +245,7 @@ inline uint32_t TDenseProtocol::vlqWrite(uint64_t vlq) {
while (vlq > 0) {
assert(pos >= 0);
buf[pos] = (vlq | 0x80);
buf[pos] = static_cast<uint8_t>(vlq | 0x80);
vlq >>= 7;
pos--;
}
@ -463,7 +463,9 @@ inline uint32_t TDenseProtocol::subWriteI32(const int32_t i32) {
}
uint32_t TDenseProtocol::subWriteString(const std::string& str) {
uint32_t size = str.size();
if(str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t size = static_cast<uint32_t>(str.size());
uint32_t xfer = subWriteI32((int32_t)size);
if (size > 0) {
trans_->write((uint8_t*)str.data(), size);

View File

@ -380,7 +380,7 @@ void TJSONProtocol::popContext() {
// Write the character ch as a JSON escape sequence ("\u00xx")
uint32_t TJSONProtocol::writeJSONEscapeChar(uint8_t ch) {
trans_->write((const uint8_t *)kJSONEscapePrefix.c_str(),
kJSONEscapePrefix.length());
static_cast<uint32_t>(kJSONEscapePrefix.length()));
uint8_t outCh = hexChar(ch >> 4);
trans_->write(&outCh, 1);
outCh = hexChar(ch);
@ -442,7 +442,9 @@ uint32_t TJSONProtocol::writeJSONBase64(const std::string &str) {
trans_->write(&kJSONStringDelimiter, 1);
uint8_t b[4];
const uint8_t *bytes = (const uint8_t *)str.c_str();
uint32_t len = str.length();
if(str.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t len = static_cast<uint32_t>(str.length());
while (len >= 3) {
// Encode 3 bytes at a time
base64_encode(bytes, 3, b);
@ -471,8 +473,10 @@ uint32_t TJSONProtocol::writeJSONInteger(NumberType num) {
trans_->write(&kJSONStringDelimiter, 1);
result += 1;
}
trans_->write((const uint8_t *)val.c_str(), val.length());
result += val.length();
if(val.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
trans_->write((const uint8_t *)val.c_str(), static_cast<uint32_t>(val.length()));
result += static_cast<uint32_t>(val.length());
if (escapeNum) {
trans_->write(&kJSONStringDelimiter, 1);
result += 1;
@ -512,8 +516,10 @@ uint32_t TJSONProtocol::writeJSONDouble(double num) {
trans_->write(&kJSONStringDelimiter, 1);
result += 1;
}
trans_->write((const uint8_t *)val.c_str(), val.length());
result += val.length();
if(val.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
trans_->write((const uint8_t *)val.c_str(), static_cast<uint32_t>(val.length()));
result += static_cast<uint32_t>(val.length());
if (escapeNum) {
trans_->write(&kJSONStringDelimiter, 1);
result += 1;
@ -721,7 +727,9 @@ uint32_t TJSONProtocol::readJSONBase64(std::string &str) {
std::string tmp;
uint32_t result = readJSONString(tmp);
uint8_t *b = (uint8_t *)tmp.c_str();
uint32_t len = tmp.length();
if(tmp.length() > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t len = static_cast<uint32_t>(tmp.length());
str.clear();
while (len >= 4) {
base64_decode(b, 4);
@ -869,7 +877,9 @@ uint32_t TJSONProtocol::readMessageBegin(std::string& name,
result += readJSONInteger(tmpVal);
messageType = (TMessageType)tmpVal;
result += readJSONInteger(tmpVal);
seqid = tmpVal;
if(tmpVal > static_cast<uint64_t>((std::numeric_limits<int32_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
seqid = static_cast<int32_t>(tmpVal);
return result;
}
@ -900,7 +910,9 @@ uint32_t TJSONProtocol::readFieldBegin(std::string& name,
uint64_t tmpVal = 0;
std::string tmpStr;
result += readJSONInteger(tmpVal);
fieldId = tmpVal;
if(tmpVal > static_cast<uint32_t>((std::numeric_limits<int16_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
fieldId = static_cast<int16_t>(tmpVal);
result += readJSONObjectStart();
result += readJSONString(tmpStr);
fieldType = getTypeIDForTypeName(tmpStr);
@ -923,7 +935,9 @@ uint32_t TJSONProtocol::readMapBegin(TType& keyType,
result += readJSONString(tmpStr);
valType = getTypeIDForTypeName(tmpStr);
result += readJSONInteger(tmpVal);
size = tmpVal;
if(tmpVal > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
size = static_cast<uint32_t>(tmpVal);
result += readJSONObjectStart();
return result;
}
@ -940,7 +954,9 @@ uint32_t TJSONProtocol::readListBegin(TType& elemType,
result += readJSONString(tmpStr);
elemType = getTypeIDForTypeName(tmpStr);
result += readJSONInteger(tmpVal);
size = tmpVal;
if(tmpVal > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
size = static_cast<uint32_t>(tmpVal);
return result;
}
@ -956,7 +972,9 @@ uint32_t TJSONProtocol::readSetBegin(TType& elemType,
result += readJSONString(tmpStr);
elemType = getTypeIDForTypeName(tmpStr);
result += readJSONInteger(tmpVal);
size = tmpVal;
if(tmpVal > (std::numeric_limits<uint32_t>::max)())
throw TProtocolException(TProtocolException::SIZE_LIMIT);
size = static_cast<uint32_t>(tmpVal);
return result;
}

View File

@ -129,10 +129,13 @@ using apache::thrift::transport::TTransport;
# include <byteswap.h>
# define ntohll(n) bswap_64(n)
# define htonll(n) bswap_64(n)
# else /* GNUC & GLIBC */
# elif defined(_MSC_VER) /* Microsoft Visual C++ */
# define ntohll(n) ( _byteswap_uint64((uint64_t)n) )
# define htonll(n) ( _byteswap_uint64((uint64_t)n) )
# else /* Not GNUC/GLIBC or MSVC */
# define ntohll(n) ( (((uint64_t)ntohl(n)) << 32) + ntohl(n >> 32) )
# define htonll(n) ( (((uint64_t)htonl(n)) << 32) + htonl(n >> 32) )
# endif /* GNUC & GLIBC */
# endif /* GNUC/GLIBC or MSVC or something else */
#else /* __THRIFT_BYTE_ORDER */
# error "Can't define htonll or ntohll!"
#endif

View File

@ -72,7 +72,7 @@ public:
break;
}
}
} catch (const TTransportException& ttx) {
} catch (const TTransportException&) {
// This is reasonably expected, client didn't send a full request so just
// ignore him
// string errStr = string("TThreadPoolServer client died: ") + ttx.what();

View File

@ -28,7 +28,7 @@ namespace apache { namespace thrift { namespace transport {
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t have = rBound_ - rBase_;
uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
// We should only take the slow path if we can't satisfy the read
// with the data already in the buffer.
@ -52,7 +52,7 @@ uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
// Hand over whatever we have.
uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
@ -60,8 +60,8 @@ uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
}
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
uint32_t have_bytes = wBase_ - wBuf_.get();
uint32_t space = wBound_ - wBase_;
uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
uint32_t space = static_cast<uint32_t>(wBound_ - wBase_);
// We should only take the slow path if we can't accomodate the write
// with the free space already in the buffer.
assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
@ -118,7 +118,7 @@ const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
void TBufferedTransport::flush() {
// Write out any data waiting in the write buffer.
uint32_t have_bytes = wBase_ - wBuf_.get();
uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
if (have_bytes > 0) {
// Note that we reset wBase_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
@ -134,7 +134,7 @@ void TBufferedTransport::flush() {
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t want = len;
uint32_t have = rBound_ - rBase_;
uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
// We should only take the slow path if we can't satisfy the read
// with the data already in the buffer.
@ -159,7 +159,7 @@ uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
// TODO(dreiss): Should we warn when reads cross frames?
// Hand over whatever we have.
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
want -= give;
@ -212,7 +212,7 @@ bool TFramedTransport::readFrame() {
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
// Double buffer size until sufficient.
uint32_t have = wBase_ - wBuf_.get();
uint32_t have = static_cast<uint32_t>(wBase_ - wBuf_.get());
uint32_t new_size = wBufSize_;
if (len + have < have /* overflow */ || len + have > 0x7fffffff) {
throw TTransportException(TTransportException::BAD_ARGS,
@ -247,7 +247,7 @@ void TFramedTransport::flush() {
assert(wBufSize_ > sizeof(sz_nbo));
// Slip the frame size into the start of the buffer.
sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
@ -267,7 +267,7 @@ void TFramedTransport::flush() {
}
uint32_t TFramedTransport::writeEnd() {
return wBase_ - wBuf_.get();
return static_cast<uint32_t>(wBase_ - wBuf_.get());
}
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
@ -281,7 +281,7 @@ const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
uint32_t TFramedTransport::readEnd() {
// include framing bytes
return rBound_ - rBuf_.get() + sizeof(uint32_t);
return static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t));
}
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
@ -289,7 +289,7 @@ void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out
rBound_ = wBase_;
// Decide how much to give.
uint32_t give = std::min(len, available_read());
uint32_t give = (std::min)(len, available_read());
*out_start = rBase_;
*out_give = give;

View File

@ -634,7 +634,7 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// Move it into ourself.
this->swap(new_buffer);
// Our old self gets destroyed.
}
}
std::string readAsString(uint32_t len) {
std::string str;
@ -646,7 +646,8 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// return number of bytes read
uint32_t readEnd() {
uint32_t bytes = rBase_ - buffer_;
//This cast should be safe, because buffer_'s size is a uint32_t
uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
if (rBase_ == wBase_) {
resetBuffer();
}
@ -655,7 +656,8 @@ class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
// Return number of bytes written
uint32_t writeEnd() {
return wBase_ - buffer_;
//This cast should be safe, because buffer_'s size is a uint32_t
return static_cast<uint32_t>(wBase_ - buffer_);
}
uint32_t available_read() const {

View File

@ -66,7 +66,9 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
"TFDTransport::read()",
errno_copy);
}
return rv;
//this should be fine, since we already checked for negative values,
//and ::read should only return a 32-bit value since len is 32-bit.
return static_cast<uint32_t>(rv);
}
}
@ -85,7 +87,9 @@ void TFDTransport::write(const uint8_t* buf, uint32_t len) {
}
buf += rv;
len -= rv;
//this should be fine, as we've already checked for negative values, and
//::write shouldn't return more than a uint32_t since len is a uint32_t
len -= static_cast<uint32_t>(rv);
}
}

View File

@ -114,7 +114,7 @@ TFileTransport::TFileTransport(string path, bool readOnly)
openLogFile();
}
void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
filename_ = filename;
offset_ = offset;
@ -827,7 +827,7 @@ void TFileTransport::performRecovery() {
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
(offset_ + readState_.lastDispatchPtr_));
GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
@ -1079,7 +1079,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
}
} catch (TEOFException& teof) {
} catch (TEOFException&) {
if (!tail) {
break;
}
@ -1110,7 +1110,7 @@ void TFileProcessor::processChunk() {
if (curChunk != inputTransport_->getCurChunk()) {
break;
}
} catch (TEOFException& teof) {
} catch (TEOFException&) {
break;
} catch (TException &te) {
cerr << te.what() << endl;

View File

@ -201,7 +201,7 @@ class TFileTransport : public TFileReaderTransport,
uint32_t getCurChunk();
// for changing the output file
void resetOutputFile(int fd, std::string filename, int64_t offset);
void resetOutputFile(int fd, std::string filename, off_t offset);
// Setter/Getter functions for user-controllable options
void setReadBuffSize(uint32_t readBuffSize) {

View File

@ -49,7 +49,7 @@ void THttpClient::parseHeader(char* header) {
if (boost::iends_with(value, "chunked")) {
chunked_ = true;
}
} else if (boost::istarts_with(header, "Content-Length")) {
} else if (boost::istarts_with(header, "Content-Length")) {
chunked_ = false;
contentLength_ = atoi(value);
}
@ -101,8 +101,10 @@ void THttpClient::flush() {
CRLF;
string header = h.str();
if(header.size() > (std::numeric_limits<uint32_t>::max)())
throw TTransportException("Header too big");
// Write the header, then the data, then flush
transport_->write((const uint8_t*)header.c_str(), header.size());
transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
transport_->write(buf, len);
transport_->flush();

View File

@ -39,7 +39,7 @@ void THttpServer::parseHeader(char* header) {
if (colon == NULL) {
return;
}
uint32_t sz = colon - header;
size_t sz = colon - header;
char* value = colon+1;
if (strncmp(header, "Transfer-Encoding", sz) == 0) {
@ -96,7 +96,8 @@ void THttpServer::flush() {
string header = h.str();
// Write the header, then the data, then flush
transport_->write((const uint8_t*)header.c_str(), header.size());
// cast should be fine, because none of "header" is under attacker control
transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
transport_->write(buf, len);
transport_->flush();

View File

@ -171,7 +171,7 @@ char* THttpTransport::readLine() {
// Return pointer to next line
*eol = '\0';
char* line = httpBuf_+httpPos_;
httpPos_ = (eol-httpBuf_) + CRLF_LEN;
httpPos_ = static_cast<uint32_t>((eol-httpBuf_) + CRLF_LEN);
return line;
}
}

View File

@ -28,57 +28,41 @@ using namespace std;
* TPipe implementation.
*/
#ifdef _WIN32
//---- Constructors ----
TPipe::TPipe(int Pipe) :
pipename_(""),
TPipe::TPipe(HANDLE Pipe) :
Pipe_(Pipe),
TimeoutSeconds_(3),
isAnonymous(false)
{
#ifndef _WIN32
GlobalOutput.perror("TPipe: constructor using a pipe handle is not supported under *NIX", -99);
throw TTransportException(TTransportException::NOT_OPEN, " constructor using a pipe handle is not supported under *NIX");
#endif
}
{}
TPipe::TPipe(string pipename) :
pipename_(pipename),
Pipe_(-1),
TPipe::TPipe(const char *pipename) :
Pipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3),
isAnonymous(false)
{
#ifdef _WIN32
if(pipename_.find("\\\\") == -1) {
pipename_ = "\\\\.\\pipe\\" + pipename_;
}
#else
dsocket.reset(new TSocket(pipename));
#endif
setPipename(pipename);
}
TPipe::TPipe(int PipeRd, int PipeWrt) :
pipename_(""),
TPipe::TPipe(const std::string &pipename) :
Pipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3),
isAnonymous(false)
{
setPipename(pipename);
}
TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
Pipe_(PipeRd),
PipeWrt_(PipeWrt),
TimeoutSeconds_(3),
isAnonymous(true)
{
#ifndef _WIN32
GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99);
throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX");
#endif
}
{}
TPipe::TPipe() :
pipename_(""),
Pipe_(-1),
TPipe::TPipe() :
Pipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3)
{
#ifndef _WIN32
GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99);
throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX");
#endif
}
{}
//---- Destructor ----
TPipe::~TPipe() {
@ -86,15 +70,13 @@ TPipe::~TPipe() {
}
bool TPipe::isOpen() {
return (Pipe_ != -1);
}
//---------------------------------------------------------
// Transport callbacks
//---------------------------------------------------------
#ifdef _WIN32 //Windows callbacks
bool TPipe::isOpen() {
return (Pipe_ != INVALID_HANDLE_VALUE);
}
bool TPipe::peek() {
if (!isOpen()) {
@ -102,7 +84,7 @@ bool TPipe::peek() {
}
DWORD bytesavail = 0;
int PeekRet = 0;
PeekRet = PeekNamedPipe((HANDLE)Pipe_, NULL, 0, NULL, &bytesavail, NULL);
PeekRet = PeekNamedPipe(Pipe_, NULL, 0, NULL, &bytesavail, NULL);
return (PeekRet != 0 && bytesavail > 0);
}
@ -116,44 +98,44 @@ void TPipe::open() {
HANDLE hPipe_;
for(int i=0; i<retries; i++)
{
hPipe_ = CreateFile(
hPipe_ = CreateFile(
pipename_.c_str(),
GENERIC_READ | GENERIC_WRITE,
0, // no sharing
GENERIC_READ | GENERIC_WRITE,
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
if ((int)hPipe_ == -1)
if (hPipe_ == INVALID_HANDLE_VALUE)
sleep(SleepInterval);
else
break;
}
if ((int)hPipe_ == -1)
if (hPipe_ == INVALID_HANDLE_VALUE)
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
// The pipe connected; change to message-read mode.
DWORD dwMode = PIPE_READMODE_MESSAGE;
int fSuccess = SetNamedPipeHandleState(
hPipe_, // pipe handle
&dwMode, // new pipe mode
NULL, // don't set maximum bytes
NULL); // don't set maximum time
// The pipe connected; change to message-read mode.
DWORD dwMode = PIPE_READMODE_MESSAGE;
int fSuccess = SetNamedPipeHandleState(
hPipe_, // pipe handle
&dwMode, // new pipe mode
NULL, // don't set maximum bytes
NULL); // don't set maximum time
if (fSuccess == 0)
{
throw TTransportException(TTransportException::NOT_OPEN, "SetNamedPipeHandleState failed");
close();
}
Pipe_ = (int)hPipe_;
Pipe_ = hPipe_;
}
void TPipe::close() {
if (isOpen())
{
CloseHandle((HANDLE)Pipe_);
Pipe_ = -1;
CloseHandle(Pipe_);
Pipe_ = INVALID_HANDLE_VALUE;
}
}
@ -161,13 +143,13 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open pipe");
DWORD cbRead;
int fSuccess = ReadFile(
(HANDLE)Pipe_, // pipe handle
buf, // buffer to receive reply
len, // size of buffer
&cbRead, // number of bytes read
NULL); // not overlapped
DWORD cbRead;
int fSuccess = ReadFile(
Pipe_, // pipe handle
buf, // buffer to receive reply
len, // size of buffer
&cbRead, // number of bytes read
NULL); // not overlapped
if ( !fSuccess && GetLastError() != ERROR_MORE_DATA )
return 0; // No more data, possibly because client disconnected.
@ -179,42 +161,19 @@ void TPipe::write(const uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe");
int WritePipe = isAnonymous? PipeWrt_: Pipe_;
DWORD cbWritten;
int fSuccess = WriteFile(
(HANDLE)WritePipe, // pipe handle
buf, // message
len, // message length
&cbWritten, // bytes written
NULL); // not overlapped
HANDLE WritePipe = isAnonymous? PipeWrt_: Pipe_;
DWORD cbWritten;
int fSuccess = WriteFile(
WritePipe, // pipe handle
buf, // message
len, // message length
&cbWritten, // bytes written
NULL); // not overlapped
if ( !fSuccess)
if ( !fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
#else //*NIX callbacks implemented via Unix Domain Sockets.
bool TPipe::peek() {
return dsocket->peek();
}
void TPipe::open() {
dsocket->open();
}
void TPipe::close() {
dsocket->close();
}
uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
return dsocket->read(buf, len);
}
void TPipe::write(const uint8_t* buf, uint32_t len) {
dsocket->write(buf, len);
}
#endif //callbacks
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
@ -223,23 +182,26 @@ string TPipe::getPipename() {
return pipename_;
}
void TPipe::setPipename(std::string pipename) {
pipename_ = pipename;
void TPipe::setPipename(const std::string &pipename) {
if(pipename.find("\\\\") == -1)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
int TPipe::getPipeHandle() {
HANDLE TPipe::getPipeHandle() {
return Pipe_;
}
void TPipe::setPipeHandle(int pipehandle) {
void TPipe::setPipeHandle(HANDLE pipehandle) {
Pipe_ = pipehandle;
}
int TPipe::getWrtPipeHandle() {
HANDLE TPipe::getWrtPipeHandle() {
return PipeWrt_;
}
void TPipe::setWrtPipeHandle(int pipehandle) {
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
PipeWrt_ = pipehandle;
}
@ -250,5 +212,6 @@ long TPipe::getConnectTimeout() {
void TPipe::setConnectTimeout(long seconds) {
TimeoutSeconds_ = seconds;
}
#endif //_WIN32
}}} // apache::thrift::transport

View File

@ -32,16 +32,19 @@ namespace apache { namespace thrift { namespace transport {
* Windows Pipes implementation of the TTransport interface.
*
*/
#ifdef _WIN32
class TPipe : public TVirtualTransport<TPipe> {
public:
// Constructs a new pipe object.
TPipe();
// Named pipe constructors -
TPipe(int Pipe);
TPipe(std::string pipename);
explicit TPipe(HANDLE Pipe); //HANDLE is a void*
//need a const char * overload so string literals don't go to the HANDLE overload
explicit TPipe(const char *pipename);
explicit TPipe(const std::string &pipename);
// Anonymous pipe -
TPipe(int PipeRd, int PipeWrt);
TPipe(HANDLE PipeRd, HANDLE PipeWrt);
// Destroys the pipe object, closing it if necessary.
virtual ~TPipe();
@ -67,26 +70,25 @@ class TPipe : public TVirtualTransport<TPipe> {
//Accessors
std::string getPipename();
void setPipename(std::string pipename);
int getPipeHandle(); //doubles as the read handle for anon pipe
void setPipeHandle(int pipehandle);
int getWrtPipeHandle();
void setWrtPipeHandle(int pipehandle);
void setPipename(const std::string &pipename);
HANDLE getPipeHandle(); //doubles as the read handle for anon pipe
void setPipeHandle(HANDLE pipehandle);
HANDLE getWrtPipeHandle();
void setWrtPipeHandle(HANDLE pipehandle);
long getConnectTimeout();
void setConnectTimeout(long seconds);
private:
std::string pipename_;
//Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex).
int Pipe_, PipeWrt_;
HANDLE Pipe_, PipeWrt_;
long TimeoutSeconds_;
bool isAnonymous;
#ifndef _WIN32
//*NIX named pipe implementation uses domain socket
boost::shared_ptr<TSocket> dsocket;
#endif
};
#else
typedef TSocket TPipe;
#endif
}}} // apache::thrift::transport

View File

@ -32,30 +32,26 @@
namespace apache { namespace thrift { namespace transport {
#ifdef _WIN32
using namespace std;
using boost::shared_ptr;
//---- Constructors ----
TPipeServer::TPipeServer(string pipename, uint32_t bufsize) :
TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) :
pipename_(pipename),
bufsize_(bufsize),
Pipe_(-1),
Pipe_(INVALID_HANDLE_VALUE),
maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
isAnonymous(false)
{
#ifdef _WIN32
if(pipename_.find("\\\\") == 0) {
pipename_ = "\\\\.\\pipe\\" + pipename_;
}
#else
dsrvsocket.reset(new TServerSocket(pipename));
#endif
setPipename(pipename);
}
TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnections) :
TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
pipename_(pipename),
bufsize_(bufsize),
Pipe_(-1),
Pipe_(INVALID_HANDLE_VALUE),
isAnonymous(false)
{ //Restrict maxconns_ to 1-255
if(maxconnections == 0)
@ -65,39 +61,26 @@ TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnecti
else
maxconns_ = maxconnections;
#ifdef _WIN32
if(pipename_.find("\\\\") == -1) {
pipename_ = "\\\\.\\pipe\\" + pipename_;
}
#else
dsrvsocket.reset(new TServerSocket(pipename));
#endif
setPipename(pipename);
}
TPipeServer::TPipeServer(string pipename) :
TPipeServer::TPipeServer(const std::string &pipename) :
pipename_(pipename),
bufsize_(1024),
Pipe_(-1),
Pipe_(INVALID_HANDLE_VALUE),
maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
isAnonymous(false)
{
#ifdef _WIN32
if(pipename_.find("\\\\") == 0) {
pipename_ = "\\\\.\\pipe\\" + pipename_;
}
#else
dsrvsocket.reset(new TServerSocket(pipename));
#endif
setPipename(pipename);
}
TPipeServer::TPipeServer(int bufsize) :
TPipeServer::TPipeServer(int bufsize) :
pipename_(""),
bufsize_(bufsize),
Pipe_(-1),
Pipe_(INVALID_HANDLE_VALUE),
maxconns_(1),
isAnonymous(true)
{
#ifdef _WIN32
//The anonymous pipe needs to be created first so that the server can
//pass the handles on to the client before the serve (acceptImpl)
//blocking call.
@ -105,28 +88,19 @@ TPipeServer::TPipeServer(int bufsize) :
GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
}
#else
GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99);
throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX");
#endif
}
TPipeServer::TPipeServer() :
TPipeServer::TPipeServer() :
pipename_(""),
bufsize_(1024),
Pipe_(-1),
Pipe_(INVALID_HANDLE_VALUE),
maxconns_(1),
isAnonymous(true)
{
#ifdef _WIN32
if (!TCreateAnonPipe()) {
GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
}
#else
GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99);
throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX");
#endif
}
//---- Destructor ----
@ -138,8 +112,6 @@ TPipeServer::~TPipeServer() {
// Transport callbacks
//---------------------------------------------------------
#ifdef _WIN32
shared_ptr<TTransport> TPipeServer::acceptImpl() {
shared_ptr<TPipe> client;
@ -148,11 +120,11 @@ shared_ptr<TTransport> TPipeServer::acceptImpl() {
//This 0-byte read serves merely as a blocking call.
byte buf;
DWORD br;
int fSuccess = ReadFile(
(HANDLE)Pipe_, // pipe handle
&buf, // buffer to receive reply
0, // size of buffer
&br, // number of bytes read
int fSuccess = ReadFile(
Pipe_, // pipe handle
&buf, // buffer to receive reply
0, // size of buffer
&br, // number of bytes read
NULL); // not overlapped
if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) {
@ -172,9 +144,9 @@ shared_ptr<TTransport> TPipeServer::acceptImpl() {
}
// Wait for the client to connect; if it succeeds, the
// function returns a nonzero value. If the function returns
// zero, GetLastError should return ERROR_PIPE_CONNECTED.
ConnectRet = ConnectNamedPipe((HANDLE)Pipe_, NULL) ?
// function returns a nonzero value. If the function returns
// zero, GetLastError should return ERROR_PIPE_CONNECTED.
ConnectRet = ConnectNamedPipe(Pipe_, NULL) ?
TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
if (ConnectRet == TRUE)
@ -196,27 +168,27 @@ shared_ptr<TTransport> TPipeServer::acceptImpl() {
}
void TPipeServer::interrupt() {
if(Pipe_ != -1) {
CancelIo((HANDLE)Pipe_);
if(Pipe_ != INVALID_HANDLE_VALUE) {
CancelIo(Pipe_);
}
}
void TPipeServer::close() {
if(!isAnonymous)
{
if(Pipe_ != -1) {
DisconnectNamedPipe((HANDLE)Pipe_);
CloseHandle((HANDLE)Pipe_);
Pipe_ = -1;
if(Pipe_ != INVALID_HANDLE_VALUE) {
DisconnectNamedPipe(Pipe_);
CloseHandle(Pipe_);
Pipe_ = INVALID_HANDLE_VALUE;
}
}
else
{
try {
CloseHandle((HANDLE)Pipe_);
CloseHandle((HANDLE)PipeW_);
CloseHandle((HANDLE)ClientAnonRead);
CloseHandle((HANDLE)ClientAnonWrite);
CloseHandle(Pipe_);
CloseHandle(PipeW_);
CloseHandle(ClientAnonRead);
CloseHandle(ClientAnonWrite);
}
catch(...) {
GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError());
@ -255,26 +227,26 @@ bool TPipeServer::TCreateNamedPipe() {
sa.bInheritHandle = FALSE;
// Create an instance of the named pipe
HANDLE hPipe_ = CreateNamedPipe(
pipename_.c_str(), // pipe name
PIPE_ACCESS_DUPLEX, // read/write access
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE, // message-read mode
maxconns_, // max. instances
bufsize_, // output buffer size
bufsize_, // input buffer size
0, // client time-out
&sa); // default security attribute
HANDLE hPipe_ = CreateNamedPipe(
pipename_.c_str(), // pipe name
PIPE_ACCESS_DUPLEX, // read/write access
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE, // message-read mode
maxconns_, // max. instances
bufsize_, // output buffer size
bufsize_, // input buffer size
0, // client time-out
&sa); // default security attribute
if(hPipe_ == INVALID_HANDLE_VALUE)
{
Pipe_ = -1;
Pipe_ = INVALID_HANDLE_VALUE;
GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError());
return false;
}
Pipe_ = (int)hPipe_;
Pipe_ = hPipe_;
return true;
}
@ -302,43 +274,14 @@ bool TPipeServer::TCreateAnonPipe() {
CloseHandle(PipeW_H);
return false;
}
ClientAnonRead = (int)ClientAnonReadH;
ClientAnonWrite = (int)ClientAnonWriteH;
Pipe_ = (int)Pipe_H;
PipeW_ = (int)PipeW_H;
ClientAnonRead = ClientAnonReadH;
ClientAnonWrite = ClientAnonWriteH;
Pipe_ = Pipe_H;
PipeW_ = PipeW_H;
return true;
}
#else
//*NIX implementation uses Unix Domain Sockets.
void TPipeServer::listen() {
dsrvsocket->listen();
}
shared_ptr<TTransport> TPipeServer::acceptImpl() {
// return boost::shared_dynamic_cast<apache::thrift::transport::TServerSocket>(dsrvsocket)->accept();
return dsrvsocket->accept();
}
void TPipeServer::interrupt() {
dsrvsocket->interrupt();
}
void TPipeServer::close() {
dsrvsocket->close();
}
bool TPipeServer::TCreateNamedPipe() {
return false; //placeholder
}
bool TPipeServer::TCreateAnonPipe() {
return false; //currently unimplemented
}
#endif //_WIN32
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
@ -347,8 +290,11 @@ string TPipeServer::getPipename() {
return pipename_;
}
void TPipeServer::setPipename(std::string pipename) {
pipename_ = pipename;
void TPipeServer::setPipename(const std::string &pipename) {
if(pipename.find("\\\\") == -1)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
int TPipeServer::getBufferSize() {
@ -359,21 +305,21 @@ void TPipeServer::setBufferSize(int bufsize) {
bufsize_ = bufsize;
}
int TPipeServer::getPipeHandle() {
HANDLE TPipeServer::getPipeHandle() {
return Pipe_;
}
int TPipeServer::getWrtPipeHandle()
HANDLE TPipeServer::getWrtPipeHandle()
{
return PipeW_;
}
int TPipeServer::getClientRdPipeHandle()
HANDLE TPipeServer::getClientRdPipeHandle()
{
return ClientAnonRead;
}
int TPipeServer::getClientWrtPipeHandle()
HANDLE TPipeServer::getClientWrtPipeHandle()
{
return ClientAnonWrite;
}
@ -385,5 +331,6 @@ bool TPipeServer::getAnonymous() {
void TPipeServer::setAnonymous(bool anon) {
isAnonymous = anon;
}
#endif //_WIN32
}}} // apache::thrift::transport

View File

@ -33,13 +33,14 @@ namespace apache { namespace thrift { namespace transport {
/**
* Windows Pipes implementation of TServerTransport.
*/
#ifdef _WIN32
class TPipeServer : public TServerTransport {
public:
//Constructors
// Named Pipe -
TPipeServer(std::string pipename, uint32_t bufsize);
TPipeServer(std::string pipename, uint32_t bufsize, uint32_t maxconnections);
TPipeServer(std::string pipename);
TPipeServer(const std::string &pipename, uint32_t bufsize);
TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections);
TPipeServer(const std::string &pipename);
// Anonymous pipe -
TPipeServer(int bufsize);
TPipeServer();
@ -59,33 +60,30 @@ class TPipeServer : public TServerTransport {
public:
//Accessors
std::string getPipename();
void setPipename(std::string pipename);
void setPipename(const std::string &pipename);
int getBufferSize();
void setBufferSize(int bufsize);
int getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle
int getWrtPipeHandle();
int getClientRdPipeHandle();
int getClientWrtPipeHandle();
HANDLE getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle
HANDLE getWrtPipeHandle();
HANDLE getClientRdPipeHandle();
HANDLE getClientWrtPipeHandle();
bool getAnonymous();
void setAnonymous(bool anon);
private:
std::string pipename_;
uint32_t bufsize_;
int Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R)
HANDLE Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R)
uint32_t maxconns_;
int PipeW_; //Anonymous Pipe (W)
int ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
HANDLE PipeW_; //Anonymous Pipe (W)
HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
//? Do we need duplicates to send to client?
bool isAnonymous;
public:
#ifndef _WIN32
//*NIX named pipe implementation uses domain socket
void listen(); //Only needed for domain sockets
boost::shared_ptr<TServerSocket> dsrvsocket;
#endif
};
#else //_WIN32
//*NIX named pipe implementation uses domain socket
typedef TServerSocket TPipeServer;
#endif
}}} // apache::thrift::transport

View File

@ -151,7 +151,7 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
}
void TServerSocket::listen() {
int sv[2];
SOCKET sv[2];
if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
intSock1_ = -1;
@ -243,7 +243,7 @@ void TServerSocket::listen() {
#ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6 && path_.empty()) {
int zero = 0;
if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
}
@ -321,7 +321,7 @@ void TServerSocket::listen() {
#endif
} else {
do {
if (0 == ::bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) {
break;
}
// use short circuit evaluation here to only sleep if we need to
@ -358,7 +358,7 @@ void TServerSocket::listen() {
}
shared_ptr<TTransport> TServerSocket::acceptImpl() {
if (serverSocket_ < 0) {
if (serverSocket_ == -1) {
throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
}
@ -371,7 +371,7 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
std::memset(fds, 0 , sizeof(fds));
fds[0].fd = serverSocket_;
fds[0].events = POLLIN;
if (intSock2_ >= 0) {
if (intSock2_ != -1) {
fds[1].fd = intSock2_;
fds[1].events = POLLIN;
}
@ -393,7 +393,7 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check for an interrupt signal
if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) {
if (intSock2_ != -1 && (fds[1].revents & POLLIN)) {
int8_t buf;
if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
@ -413,11 +413,11 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
struct sockaddr_storage clientAddress;
int size = sizeof(clientAddress);
int clientSocket = ::accept(serverSocket_,
SOCKET clientSocket = ::accept(serverSocket_,
(struct sockaddr *) &clientAddress,
(socklen_t *) &size);
if (clientSocket < 0) {
if (clientSocket == -1) {
int errno_copy = errno;
GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
@ -445,16 +445,16 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
client->setRecvTimeout(recvTimeout_);
}
client->setCachedAddress((sockaddr*) &clientAddress, size);
return client;
}
shared_ptr<TSocket> TServerSocket::createSocket(int clientSocket) {
shared_ptr<TSocket> TServerSocket::createSocket(SOCKET clientSocket) {
return shared_ptr<TSocket>(new TSocket(clientSocket));
}
void TServerSocket::interrupt() {
if (intSock1_ >= 0) {
if (intSock1_ != -1) {
int8_t byte = 0;
if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
@ -463,7 +463,7 @@ void TServerSocket::interrupt() {
}
void TServerSocket::close() {
if (serverSocket_ >= 0) {
if (serverSocket_ != -1) {
#ifdef _WIN32
shutdown(serverSocket_, SD_BOTH);
@ -474,10 +474,10 @@ void TServerSocket::close() {
#endif
}
if (intSock1_ >= 0) {
if (intSock1_ != -1) {
::close(intSock1_);
}
if (intSock2_ >= 0) {
if (intSock2_ != -1) {
::close(intSock2_);
}
serverSocket_ = -1;

View File

@ -22,6 +22,9 @@
#include "TServerTransport.h"
#include <boost/shared_ptr.hpp>
#ifdef __linux__
typedef int SOCKET;
#endif
namespace apache { namespace thrift { namespace transport {
@ -58,12 +61,12 @@ class TServerSocket : public TServerTransport {
protected:
boost::shared_ptr<TTransport> acceptImpl();
virtual boost::shared_ptr<TSocket> createSocket(int client);
virtual boost::shared_ptr<TSocket> createSocket(SOCKET client);
private:
int port_;
std::string path_;
int serverSocket_;
SOCKET serverSocket_;
int acceptBacklog_;
int sendTimeout_;
int recvTimeout_;
@ -73,8 +76,8 @@ class TServerSocket : public TServerTransport {
int tcpSendBuffer_;
int tcpRecvBuffer_;
int intSock1_;
int intSock2_;
SOCKET intSock1_;
SOCKET intSock2_;
};
}}} // apache::thrift::transport

View File

@ -129,7 +129,7 @@ TSocket::TSocket() :
cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::TSocket(int socket) :
TSocket::TSocket(SOCKET socket) :
host_(""),
port_(0),
path_(""),
@ -151,7 +151,7 @@ TSocket::~TSocket() {
}
bool TSocket::isOpen() {
return (socket_ >= 0);
return (socket_ != -1);
}
bool TSocket::peek() {
@ -164,7 +164,7 @@ bool TSocket::peek() {
int errno_copy = errno;
#if defined __FreeBSD__ || defined __MACH__
/* shigin:
* freebsd returns -1 and ECONNRESET if socket was closed by
* freebsd returns -1 and ECONNRESET if socket was closed by
* the other side
*/
if (errno_copy == ECONNRESET)
@ -264,7 +264,7 @@ void TSocket::openConnection(struct addrinfo *res) {
#endif
} else {
ret = connect(socket_, res->ai_addr, res->ai_addrlen);
ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
}
// success case
@ -319,7 +319,7 @@ void TSocket::openConnection(struct addrinfo *res) {
fcntl(socket_, F_SETFL, flags);
if (path_.empty()) {
setCachedAddress(res->ai_addr, res->ai_addrlen);
setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
}
}
@ -382,7 +382,7 @@ void TSocket::local_open(){
try {
openConnection(res);
break;
} catch (TTransportException& ttx) {
} catch (TTransportException&) {
if (res->ai_next) {
close();
} else {
@ -398,7 +398,7 @@ void TSocket::local_open(){
}
void TSocket::close() {
if (socket_ >= 0) {
if (socket_ != -1) {
#ifdef _WIN32
shutdown(socket_, SD_BOTH);
@ -413,14 +413,14 @@ void TSocket::close() {
}
void TSocket::setSocketFD(int socket) {
if (socket_ >= 0) {
if (socket_ != -1) {
close();
}
socket_ = socket;
}
uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
if (socket_ < 0) {
if (socket_ == -1) {
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
}
@ -489,7 +489,7 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
#if defined __FreeBSD__ || defined __MACH__
if (errno_copy == ECONNRESET) {
/* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
* ECONNRESET if peer performed shutdown
* ECONNRESET if peer performed shutdown
* edhall: eliminated close() since we do that in the destructor.
*/
return 0;
@ -551,7 +551,7 @@ void TSocket::write(const uint8_t* buf, uint32_t len) {
}
uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
if (socket_ < 0) {
if (socket_ == -1) {
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
}
@ -582,7 +582,7 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
}
// Fail on blocked send
if (b == 0) {
throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
@ -609,7 +609,7 @@ void TSocket::setPort(int port) {
void TSocket::setLinger(bool on, int linger) {
lingerOn_ = on;
lingerVal_ = linger;
if (socket_ < 0) {
if (socket_ == -1) {
return;
}
@ -623,7 +623,7 @@ void TSocket::setLinger(bool on, int linger) {
void TSocket::setNoDelay(bool noDelay) {
noDelay_ = noDelay;
if (socket_ < 0 || !path_.empty()) {
if (socket_ == -1 || !path_.empty()) {
return;
}
@ -649,7 +649,7 @@ void TSocket::setRecvTimeout(int ms) {
}
recvTimeout_ = ms;
if (socket_ < 0) {
if (socket_ == -1) {
return;
}
@ -674,7 +674,7 @@ void TSocket::setSendTimeout(int ms) {
}
sendTimeout_ = ms;
if (socket_ < 0) {
if (socket_ == -1) {
return;
}
@ -708,7 +708,7 @@ std::string TSocket::getPeerHost() {
struct sockaddr* addrPtr;
socklen_t addrLen;
if (socket_ < 0) {
if (socket_ == -1) {
return host_;
}
@ -742,7 +742,7 @@ std::string TSocket::getPeerAddress() {
struct sockaddr* addrPtr;
socklen_t addrLen;
if (socket_ < 0) {
if (socket_ == -1) {
return peerAddress_;
}
@ -810,7 +810,7 @@ sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
default:
return NULL;
}
}
}
bool TSocket::useLowMinRto_ = false;
void TSocket::setUseLowMinRto(bool useLowMinRto) {

View File

@ -32,6 +32,9 @@
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#ifdef __linux__
typedef int SOCKET;
#endif
namespace apache { namespace thrift { namespace transport {
@ -197,7 +200,7 @@ class TSocket : public TVirtualTransport<TSocket> {
/**
* Returns the underlying socket file descriptor.
*/
int getSocketFD() {
SOCKET getSocketFD() {
return socket_;
}
@ -228,7 +231,7 @@ class TSocket : public TVirtualTransport<TSocket> {
/**
* Constructor to create socket from raw UNIX handle.
*/
TSocket(int socket);
TSocket(SOCKET socket);
/**
* Set a cache of the peer address (used when trivially available: e.g.
@ -259,7 +262,7 @@ class TSocket : public TVirtualTransport<TSocket> {
std::string path_;
/** Underlying UNIX socket handle */
int socket_;
SOCKET socket_;
/** Connect timeout in ms */
int connTimeout_;

View File

@ -174,7 +174,7 @@ void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server)
/* TODO: without apc we ignore a lot of functionality from the php version */
void TSocketPool::open() {
unsigned int numServers = servers_.size();
size_t numServers = servers_.size();
if (numServers == 0) {
socket_ = -1;
throw TTransportException(TTransportException::NOT_OPEN);
@ -188,7 +188,7 @@ void TSocketPool::open() {
random_shuffle(servers_.begin(), servers_.end());
}
for (unsigned int i = 0; i < numServers; ++i) {
for (size_t i = 0; i < numServers; ++i) {
shared_ptr<TSocketPoolServer> &server = servers_[i];
// Impersonate the server socket
@ -204,7 +204,7 @@ void TSocketPool::open() {
if (server->lastFailTime_ > 0) {
// The server was marked as down, so check if enough time has elapsed to retry
int elapsedTime = time(NULL) - server->lastFailTime_;
time_t elapsedTime = time(NULL) - server->lastFailTime_;
if (elapsedTime > retryInterval_) {
retryIntervalPassed = true;
}

View File

@ -49,10 +49,10 @@ class TSocketPoolServer {
int port_;
// Socket for the server
int socket_;
SOCKET socket_;
// Last time connecting to this server failed
int lastFailTime_;
time_t lastFailTime_;
// Number of consecutive times connecting to this server failed
int consecutiveFailures_;
@ -178,7 +178,7 @@ class TSocketPool : public TSocket {
/** Retry interval in seconds, how long to not try a host if it has been
* marked as down.
*/
int retryInterval_;
time_t retryInterval_;
/** Max consecutive failures before marking a host down. */
int maxConsecutiveFailures_;

View File

@ -1,11 +1,11 @@
/* socketpair.c
* Copyright 2007 by Nathan C. Myers <ncm@cantrip.org>; some rights reserved.
* This code is Free Software. It may be copied freely, in original or
* This code is Free Software. It may be copied freely, in original or
* modified form, subject only to the restrictions that (1) the author is
* relieved from all responsibilities for any use for any purpose, and (2)
* this copyright notice must be retained, unchanged, in its entirety. If
* for any reason the author might be held responsible for any consequences
* of copying or use, license is withheld.
* of copying or use, license is withheld.
*/
/*
@ -33,10 +33,9 @@
#include <string.h>
// Win32
#include <Winsock2.h>
#include <WS2tcpip.h>
int socketpair(int d, int type, int protocol, int sv[2])
int socketpair(int d, int type, int protocol, SOCKET sv[2])
{
union {
struct sockaddr_in inaddr;
@ -54,17 +53,17 @@ int socketpair(int d, int type, int protocol, int sv[2])
}
listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listener == INVALID_SOCKET)
if (listener == INVALID_SOCKET)
return SOCKET_ERROR;
memset(&a, 0, sizeof(a));
a.inaddr.sin_family = AF_INET;
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
a.inaddr.sin_port = 0;
a.inaddr.sin_port = 0;
sv[0] = sv[1] = INVALID_SOCKET;
do {
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
(char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
break;
if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)

View File

@ -28,6 +28,9 @@
#error This is a MSVC header only.
#endif
int socketpair(int d, int type, int protocol, int sv[2]);
// Win32
#include <Winsock2.h>
int socketpair(int d, int type, int protocol, SOCKET sv[2]);
#endif // _THRIFT_WINDOWS_SOCKETPAIR_H_

View File

@ -32,7 +32,10 @@
#include "Config.h"
// Exclude rarely-used stuff from Windows headers
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <Windows.h>
#endif // _THRIFT_WINDOWS_STDAFX_H_

View File

@ -17,12 +17,9 @@
* under the License.
*/
#include "Fcntl.h"
#include "WinFcntl.h"
// Win32
#include <Winsock2.h>
int fcntl(int fd, int cmd, int flags)
int fcntl(SOCKET fd, int cmd, int flags)
{
if(cmd != F_GETFL && cmd != F_SETFL)
{

View File

@ -28,6 +28,9 @@
#error This is a MSVC header only.
#endif
// Win32
#include <Winsock2.h>
#define O_NONBLOCK 1
enum
@ -36,6 +39,6 @@ enum
F_SETFL,
};
int fcntl(int fd, int cmd, int flags);
int fcntl(SOCKET fd, int cmd, int flags);
#endif // _THRIFT_WINDOWS_FCNTL_H_

View File

@ -126,13 +126,12 @@ inline int poll_win32(LPWSAPOLLFD fdArray, ULONG nfds, INT timeout)
int sktready = select(1, read_fds_ptr, write_fds_ptr, NULL, time_out_ptr);
if(sktready > 0) {
for(ULONG i=0; i<read_fds.fd_count; i++) {
for(ULONG i=0; i<nfds; i++) {
fdArray[i].revents = 0;
if(FD_ISSET(fdArray[i].fd, &read_fds))
fdArray[i].revents = POLLIN;
}
for(ULONG i=0; i<write_fds.fd_count; i++) {
fdArray[i].revents |= POLLIN;
if(FD_ISSET(fdArray[i].fd, &write_fds))
fdArray[i].revents = POLLOUT;
fdArray[i].revents |= POLLOUT;
}
}
return sktready;

View File

@ -28,14 +28,22 @@
# error This is a MSVC header only.
#endif
#define NOMINMAX
#define BOOST_ALL_NO_LIB 1
#define BOOST_THREAD_NO_LIB 1
#ifndef NOMINMAX
# define NOMINMAX
#endif
#ifndef USE_BOOST_THREAD
# define BOOST_ALL_NO_LIB 1
# define BOOST_THREAD_NO_LIB 1
#endif
#define TARGET_WIN_XP
#ifdef TARGET_WIN_XP
# define WINVER 0x0501
# define _WIN32_WINNT 0x0501
# ifndef WINVER
# define WINVER 0x0501
# endif
# ifndef _WIN32_WINNT
# define _WIN32_WINNT 0x0501
# endif
#endif
#ifndef _WIN32_WINNT