From 4af6ed71e16884fd2e7fbec2de241209e0dec468 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Wed, 25 Oct 2006 19:02:49 +0000 Subject: [PATCH] Change Thrift c++ to new protocol wrapping transport model Summary: Also cleaned up excessive .h/.cpp files into Utils files Reviewed By: aditya git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664838 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 15 +- lib/cpp/src/TProcessor.h | 13 +- lib/cpp/src/protocol/TBinaryProtocol.cpp | 206 +++++------- lib/cpp/src/protocol/TBinaryProtocol.h | 151 ++++----- lib/cpp/src/protocol/TProtocol.h | 199 +++++------ lib/cpp/src/server/TNonblockingServer.cpp | 2 +- lib/cpp/src/server/TNonblockingServer.h | 24 +- lib/cpp/src/server/TServer.h | 52 +-- lib/cpp/src/server/TSimpleServer.cpp | 12 +- lib/cpp/src/server/TSimpleServer.h | 4 +- lib/cpp/src/server/TThreadPoolServer.cpp | 28 +- lib/cpp/src/server/TThreadPoolServer.h | 4 +- .../src/transport/TBufferedRouterTransport.h | 24 ++ .../TBufferedRouterTransportFactory.h | 35 -- lib/cpp/src/transport/TBufferedTransport.cpp | 65 ---- lib/cpp/src/transport/TBufferedTransport.h | 83 ----- .../src/transport/TBufferedTransportFactory.h | 33 -- lib/cpp/src/transport/TFramedTransport.cpp | 121 ------- lib/cpp/src/transport/TFramedTransport.h | 101 ------ lib/cpp/src/transport/TMemoryBuffer.cpp | 45 --- lib/cpp/src/transport/TMemoryBuffer.h | 116 ------- lib/cpp/src/transport/TNullTransport.h | 28 -- lib/cpp/src/transport/TTransport.h | 25 +- lib/cpp/src/transport/TTransportFactory.h | 33 -- lib/cpp/src/transport/TTransportUtils.cpp | 219 ++++++++++++ lib/cpp/src/transport/TTransportUtils.h | 316 ++++++++++++++++++ 26 files changed, 936 insertions(+), 1018 deletions(-) delete mode 100644 lib/cpp/src/transport/TBufferedRouterTransportFactory.h delete mode 100644 lib/cpp/src/transport/TBufferedTransport.cpp delete mode 100644 lib/cpp/src/transport/TBufferedTransport.h delete mode 100644 lib/cpp/src/transport/TBufferedTransportFactory.h delete mode 100644 lib/cpp/src/transport/TFramedTransport.cpp delete mode 100644 lib/cpp/src/transport/TFramedTransport.h delete mode 100644 lib/cpp/src/transport/TMemoryBuffer.cpp delete mode 100644 lib/cpp/src/transport/TMemoryBuffer.h delete mode 100644 lib/cpp/src/transport/TNullTransport.h delete mode 100644 lib/cpp/src/transport/TTransportFactory.h create mode 100644 lib/cpp/src/transport/TTransportUtils.cpp create mode 100644 lib/cpp/src/transport/TTransportUtils.h diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 134325c20..084404894 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -10,13 +10,11 @@ libthrift_sources = src/concurrency/Monitor.cpp \ src/concurrency/ThreadManager.cpp \ src/concurrency/TimerManager.cpp \ src/protocol/TBinaryProtocol.cpp \ - src/transport/TBufferedTransport.cpp \ src/transport/TBufferedFileWriter.cpp \ src/transport/TBufferedRouterTransport.cpp \ - src/transport/TFramedTransport.cpp \ - src/transport/TMemoryBuffer.cpp \ src/transport/TSocket.cpp \ src/transport/TServerSocket.cpp \ + src/transport/TTransportUtils.cpp \ src/server/TSimpleServer.cpp \ src/server/TThreadPoolServer.cpp \ src/server/TNonblockingServer.cpp @@ -52,19 +50,14 @@ include_protocol_HEADERS = \ include_transportdir = $(include_thriftdir)/transport include_transport_HEADERS = \ - src/transport/TBufferedTransport.h \ - src/transport/TFramedTransport.h \ - src/transport/TNullTransport.h \ - src/transport/TMemoryBuffer.h \ + src/transport/TBufferedFileWriter.h \ + src/transport/TBufferedRouterTransport.h \ src/transport/TServerSocket.h \ src/transport/TServerTransport.h \ src/transport/TSocket.h \ src/transport/TTransport.h \ src/transport/TTransportException.h \ - src/transport/TTransportFactory.h \ - src/transport/TBufferedTransportFactory.h \ - src/transport/TBufferedFileWriter.h \ - src/transport/TBufferedRouterTransport.h + src/transport/TTransportUtils.h include_serverdir = $(include_thriftdir)/server include_server_HEADERS = \ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index f905b1d7a..8ce92853b 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -2,14 +2,14 @@ #define _THRIFT_TPROCESSOR_H_ 1 #include -#include +#include #include namespace facebook { namespace thrift { using namespace boost; -using namespace facebook::thrift::transport; +using namespace facebook::thrift::protocol; /** * A processor is a generic object that acts upon two streams of data, one @@ -22,8 +22,13 @@ using namespace facebook::thrift::transport; class TProcessor { public: virtual ~TProcessor() {} - virtual bool process(shared_ptr in, shared_ptr out) = 0; - bool process(shared_ptr io) { return process(io, io); } + + virtual bool process(shared_ptr in, + shared_ptr out) = 0; + + bool process(shared_ptr io) { + return process(io, io); + } protected: TProcessor() {} diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cpp b/lib/cpp/src/protocol/TBinaryProtocol.cpp index 9fb2ec367..481012c8e 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.cpp +++ b/lib/cpp/src/protocol/TBinaryProtocol.cpp @@ -4,121 +4,109 @@ using std::string; namespace facebook { namespace thrift { namespace protocol { -uint32_t TBinaryProtocol::writeMessageBegin(shared_ptr out, - const std::string name, +uint32_t TBinaryProtocol::writeMessageBegin(const std::string name, const TMessageType messageType, - const int32_t seqid) const { + const int32_t seqid) { return - writeString(out, name) + - writeByte(out, (int8_t)messageType) + - writeI32(out, seqid); + writeString(name) + + writeByte((int8_t)messageType) + + writeI32(seqid); } -uint32_t TBinaryProtocol::writeMessageEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeMessageEnd() { return 0; } -uint32_t TBinaryProtocol::writeStructBegin(shared_ptr out, - const string& name) const { +uint32_t TBinaryProtocol::writeStructBegin(const string& name) { return 0; } -uint32_t TBinaryProtocol::writeStructEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeStructEnd() { return 0; } -uint32_t TBinaryProtocol::writeFieldBegin(shared_ptr out, - const string& name, +uint32_t TBinaryProtocol::writeFieldBegin(const string& name, const TType fieldType, - const int16_t fieldId) const { + const int16_t fieldId) { return - writeByte(out, (int8_t)fieldType) + - writeI16(out, fieldId); + writeByte((int8_t)fieldType) + + writeI16(fieldId); } -uint32_t TBinaryProtocol::writeFieldEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeFieldEnd() { return 0; } -uint32_t TBinaryProtocol::writeFieldStop(shared_ptr out) const { +uint32_t TBinaryProtocol::writeFieldStop() { return - writeByte(out, (int8_t)T_STOP); + writeByte((int8_t)T_STOP); } -uint32_t TBinaryProtocol::writeMapBegin(shared_ptr out, - const TType keyType, +uint32_t TBinaryProtocol::writeMapBegin(const TType keyType, const TType valType, - const uint32_t size) const { + const uint32_t size) { return - writeByte(out, (int8_t)keyType) + - writeByte(out, (int8_t)valType) + - writeI32(out, (int32_t)size); + writeByte((int8_t)keyType) + + writeByte((int8_t)valType) + + writeI32((int32_t)size); } -uint32_t TBinaryProtocol::writeMapEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeMapEnd() { return 0; } -uint32_t TBinaryProtocol::writeListBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const { +uint32_t TBinaryProtocol::writeListBegin(const TType elemType, + const uint32_t size) { return - writeByte(out, (int8_t) elemType) + - writeI32(out, (int32_t)size); + writeByte((int8_t) elemType) + + writeI32((int32_t)size); } -uint32_t TBinaryProtocol::writeListEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeListEnd() { return 0; } -uint32_t TBinaryProtocol::writeSetBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const { +uint32_t TBinaryProtocol::writeSetBegin(const TType elemType, + const uint32_t size) { return - writeByte(out, (int8_t)elemType) + - writeI32(out, (int32_t)size); + writeByte((int8_t)elemType) + + writeI32((int32_t)size); } -uint32_t TBinaryProtocol::writeSetEnd(shared_ptr out) const { +uint32_t TBinaryProtocol::writeSetEnd() { return 0; } -uint32_t TBinaryProtocol::writeBool(shared_ptr out, - const bool value) const { +uint32_t TBinaryProtocol::writeBool(const bool value) { uint8_t tmp = value ? 1 : 0; - out->write(&tmp, 1); + outputTransport_->write(&tmp, 1); return 1; } -uint32_t TBinaryProtocol::writeByte(shared_ptr out, - const int8_t byte) const { - out->write((uint8_t*)&byte, 1); +uint32_t TBinaryProtocol::writeByte(const int8_t byte) { + outputTransport_->write((uint8_t*)&byte, 1); return 1; } -uint32_t TBinaryProtocol::writeI16(shared_ptr out, - const int16_t i16) const { +uint32_t TBinaryProtocol::writeI16(const int16_t i16) { int16_t net = (int16_t)htons(i16); - out->write((uint8_t*)&net, 2); + outputTransport_->write((uint8_t*)&net, 2); return 2; } -uint32_t TBinaryProtocol::writeI32(shared_ptr out, - const int32_t i32) const { +uint32_t TBinaryProtocol::writeI32(const int32_t i32) { int32_t net = (int32_t)htonl(i32); - out->write((uint8_t*)&net, 4); + outputTransport_->write((uint8_t*)&net, 4); return 4; } -uint32_t TBinaryProtocol::writeI64(shared_ptr out, - const int64_t i64) const { +uint32_t TBinaryProtocol::writeI64(const int64_t i64) { int64_t net = (int64_t)htonll(i64); - out->write((uint8_t*)&net, 8); + outputTransport_->write((uint8_t*)&net, 8); return 8; } -uint32_t TBinaryProtocol::writeDouble(shared_ptr out, - const double dub) const { +uint32_t TBinaryProtocol::writeDouble(const double dub) { uint8_t b[8]; uint8_t* d = (uint8_t*)&dub; b[0] = d[7]; @@ -129,15 +117,14 @@ uint32_t TBinaryProtocol::writeDouble(shared_ptr out, b[5] = d[2]; b[6] = d[1]; b[7] = d[0]; - out->write((uint8_t*)b, 8); + outputTransport_->write((uint8_t*)b, 8); return 8; } -uint32_t TBinaryProtocol::writeString(shared_ptr out, - const string& str) const { - uint32_t result = writeI32(out, str.size()); - out->write((uint8_t*)str.data(), str.size()); +uint32_t TBinaryProtocol::writeString(const string& str) { + uint32_t result = writeI32(str.size()); + outputTransport_->write((uint8_t*)str.data(), str.size()); return result + str.size(); } @@ -145,159 +132,147 @@ uint32_t TBinaryProtocol::writeString(shared_ptr out, * Reading functions */ -uint32_t TBinaryProtocol::readMessageBegin(shared_ptr in, - std::string& name, +uint32_t TBinaryProtocol::readMessageBegin(std::string& name, TMessageType& messageType, - int32_t& seqid) const { + int32_t& seqid) { uint32_t result = 0; int8_t type; - result+= readString(in, name); - result+= readByte(in, type); + result+= readString(name); + result+= readByte(type); messageType = (TMessageType)type; - result+= readI32(in, seqid); + result+= readI32(seqid); return result; } -uint32_t TBinaryProtocol::readMessageEnd(shared_ptr in) const{ +uint32_t TBinaryProtocol::readMessageEnd() { return 0; } -uint32_t TBinaryProtocol::readStructBegin(shared_ptr in, - string& name) const { +uint32_t TBinaryProtocol::readStructBegin(string& name) { name = ""; return 0; } -uint32_t TBinaryProtocol::readStructEnd(shared_ptr in) const { +uint32_t TBinaryProtocol::readStructEnd() { return 0; } -uint32_t TBinaryProtocol::readFieldBegin(shared_ptr in, - string& name, +uint32_t TBinaryProtocol::readFieldBegin(string& name, TType& fieldType, - int16_t& fieldId) const { + int16_t& fieldId) { uint32_t result = 0; int8_t type; - result += readByte(in, type); + result += readByte(type); fieldType = (TType)type; if (fieldType == T_STOP) { fieldId = 0; return result; } - result += readI16(in, fieldId); + result += readI16(fieldId); return result; } -uint32_t TBinaryProtocol::readFieldEnd(shared_ptr in) const { +uint32_t TBinaryProtocol::readFieldEnd() { return 0; } -uint32_t TBinaryProtocol::readMapBegin(shared_ptr in, - TType& keyType, +uint32_t TBinaryProtocol::readMapBegin(TType& keyType, TType& valType, - uint32_t& size) const { + uint32_t& size) { int8_t k, v; uint32_t result = 0; int32_t sizei; - result += readByte(in, k); + result += readByte(k); keyType = (TType)k; - result += readByte(in, v); + result += readByte(v); valType = (TType)v; - result += readI32(in, sizei); + result += readI32(sizei); // TODO(mcslee): check for negative size size = (uint32_t)sizei; return result; } -uint32_t TBinaryProtocol::readMapEnd(shared_ptr in) const { +uint32_t TBinaryProtocol::readMapEnd() { return 0; } -uint32_t TBinaryProtocol::readListBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const { +uint32_t TBinaryProtocol::readListBegin(TType& elemType, + uint32_t& size) { int8_t e; uint32_t result = 0; int32_t sizei; - result += readByte(in, e); + result += readByte(e); elemType = (TType)e; - result += readI32(in, sizei); + result += readI32(sizei); // TODO(mcslee): check for negative size size = (uint32_t)sizei; return result; } -uint32_t TBinaryProtocol::readListEnd(shared_ptr in) const { +uint32_t TBinaryProtocol::readListEnd() { return 0; } -uint32_t TBinaryProtocol::readSetBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const { +uint32_t TBinaryProtocol::readSetBegin(TType& elemType, + uint32_t& size) { int8_t e; uint32_t result = 0; int32_t sizei; - result += readByte(in, e); + result += readByte(e); elemType = (TType)e; - result += readI32(in, sizei); + result += readI32(sizei); // TODO(mcslee): check for negative size size = (uint32_t)sizei; return result; } -uint32_t TBinaryProtocol::readSetEnd(shared_ptr in) const { +uint32_t TBinaryProtocol::readSetEnd() { return 0; } -uint32_t TBinaryProtocol::readBool(shared_ptr in, - bool& value) const { +uint32_t TBinaryProtocol::readBool(bool& value) { uint8_t b[1]; - in->readAll(b, 1); + inputTransport_->readAll(b, 1); value = *(int8_t*)b != 0; return 1; } -uint32_t TBinaryProtocol::readByte(shared_ptr in, - int8_t& byte) const { +uint32_t TBinaryProtocol::readByte(int8_t& byte) { uint8_t b[1]; - in->readAll(b, 1); + inputTransport_->readAll(b, 1); byte = *(int8_t*)b; return 1; } -uint32_t TBinaryProtocol::readI16(shared_ptr in, - int16_t& i16) const { +uint32_t TBinaryProtocol::readI16(int16_t& i16) { uint8_t b[2]; - in->readAll(b, 2); + inputTransport_->readAll(b, 2); i16 = *(int16_t*)b; i16 = (int16_t)ntohs(i16); return 2; } -uint32_t TBinaryProtocol::readI32(shared_ptr in, - int32_t& i32) const { +uint32_t TBinaryProtocol::readI32(int32_t& i32) { uint8_t b[4]; - in->readAll(b, 4); + inputTransport_->readAll(b, 4); i32 = *(int32_t*)b; i32 = (int32_t)ntohl(i32); return 4; } -uint32_t TBinaryProtocol::readI64(shared_ptr in, - int64_t& i64) const { +uint32_t TBinaryProtocol::readI64(int64_t& i64) { uint8_t b[8]; - in->readAll(b, 8); + inputTransport_->readAll(b, 8); i64 = *(int64_t*)b; i64 = (int64_t)ntohll(i64); return 8; } -uint32_t TBinaryProtocol::readDouble(shared_ptr in, - double& dub) const { +uint32_t TBinaryProtocol::readDouble(double& dub) { uint8_t b[8]; uint8_t d[8]; - in->readAll(b, 8); + inputTransport_->readAll(b, 8); d[0] = b[7]; d[1] = b[6]; d[2] = b[5]; @@ -310,17 +285,16 @@ uint32_t TBinaryProtocol::readDouble(shared_ptr in, return 8; } -uint32_t TBinaryProtocol::readString(shared_ptr in, - string& str) const { +uint32_t TBinaryProtocol::readString(string& str) { uint32_t result; int32_t size; - result = readI32(in, size); + result = readI32(size); // TODO(mcslee): check for negative size // Use the heap here to prevent stack overflow for v. large strings uint8_t *b = new uint8_t[size]; - in->readAll(b, size); + inputTransport_->readAll(b, size); str = string((char*)b, size); delete [] b; diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h index 7f36a5788..de9a836f1 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.h +++ b/lib/cpp/src/protocol/TBinaryProtocol.h @@ -17,139 +17,130 @@ using namespace boost; */ class TBinaryProtocol : public TProtocol { public: - TBinaryProtocol() {} + TBinaryProtocol(shared_ptr in, shared_ptr out) : + TProtocol(in, out) {} + ~TBinaryProtocol() {} /** * Writing functions. */ - virtual uint32_t writeMessageBegin(shared_ptr out, - const std::string name, + virtual uint32_t writeMessageBegin(const std::string name, const TMessageType messageType, - const int32_t seqid) const; + const int32_t seqid); - virtual uint32_t writeMessageEnd(shared_ptr out) const; + virtual uint32_t writeMessageEnd(); - uint32_t writeStructBegin(shared_ptr out, - const std::string& name) const; + uint32_t writeStructBegin(const std::string& name); - uint32_t writeStructEnd(shared_ptr out) const; + uint32_t writeStructEnd(); - uint32_t writeFieldBegin(shared_ptr out, - const std::string& name, - const TType fieldType, - const int16_t fieldId) const; + uint32_t writeFieldBegin(const std::string& name, + const TType fieldType, + const int16_t fieldId); - uint32_t writeFieldEnd(shared_ptr out) const; + uint32_t writeFieldEnd(); - uint32_t writeFieldStop(shared_ptr out) const; + uint32_t writeFieldStop(); - uint32_t writeMapBegin(shared_ptr out, - const TType keyType, - const TType valType, - const uint32_t size) const; + uint32_t writeMapBegin(const TType keyType, + const TType valType, + const uint32_t size); - uint32_t writeMapEnd(shared_ptr out) const; + uint32_t writeMapEnd(); - uint32_t writeListBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const; + uint32_t writeListBegin(const TType elemType, + const uint32_t size); - uint32_t writeListEnd(shared_ptr out) const; + uint32_t writeListEnd(); - uint32_t writeSetBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const; + uint32_t writeSetBegin(const TType elemType, + const uint32_t size); - uint32_t writeSetEnd(shared_ptr out) const; + uint32_t writeSetEnd(); - uint32_t writeBool(shared_ptr out, - const bool value) const; + uint32_t writeBool(const bool value); - uint32_t writeByte(shared_ptr out, - const int8_t byte) const; + uint32_t writeByte(const int8_t byte); - uint32_t writeI16(shared_ptr out, - const int16_t i16) const; + uint32_t writeI16(const int16_t i16); - uint32_t writeI32(shared_ptr out, - const int32_t i32) const; + uint32_t writeI32(const int32_t i32); - uint32_t writeI64(shared_ptr out, - const int64_t i64) const; + uint32_t writeI64(const int64_t i64); - uint32_t writeDouble(shared_ptr out, - const double dub) const; + uint32_t writeDouble(const double dub); - uint32_t writeString(shared_ptr out, - const std::string& str) const; + uint32_t writeString(const std::string& str); /** * Reading functions */ - uint32_t readMessageBegin(shared_ptr in, - std::string& name, + uint32_t readMessageBegin(std::string& name, TMessageType& messageType, - int32_t& seqid) const; + int32_t& seqid); - uint32_t readMessageEnd(shared_ptr in) const; + uint32_t readMessageEnd(); - uint32_t readStructBegin(shared_ptr in, - std::string& name) const; + uint32_t readStructBegin(std::string& name); - uint32_t readStructEnd(shared_ptr in) const; + uint32_t readStructEnd(); - uint32_t readFieldBegin(shared_ptr in, - std::string& name, - TType& fieldType, - int16_t& fieldId) const; + uint32_t readFieldBegin(std::string& name, + TType& fieldType, + int16_t& fieldId); - uint32_t readFieldEnd(shared_ptr in) const; + uint32_t readFieldEnd(); - uint32_t readMapBegin(shared_ptr in, - TType& keyType, - TType& valType, - uint32_t& size) const; + uint32_t readMapBegin(TType& keyType, + TType& valType, + uint32_t& size); - uint32_t readMapEnd(shared_ptr in) const; + uint32_t readMapEnd(); - uint32_t readListBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const; + uint32_t readListBegin(TType& elemType, + uint32_t& size); - uint32_t readListEnd(shared_ptr in) const; + uint32_t readListEnd(); - uint32_t readSetBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const; + uint32_t readSetBegin(TType& elemType, + uint32_t& size); - uint32_t readSetEnd(shared_ptr in) const; + uint32_t readSetEnd(); - uint32_t readBool(shared_ptr in, - bool& value) const; + uint32_t readBool(bool& value); - uint32_t readByte(shared_ptr in, - int8_t& byte) const; + uint32_t readByte(int8_t& byte); - uint32_t readI16(shared_ptr in, - int16_t& i16) const; + uint32_t readI16(int16_t& i16); - uint32_t readI32(shared_ptr in, - int32_t& i32) const; + uint32_t readI32(int32_t& i32); - uint32_t readI64(shared_ptr in, - int64_t& i64) const; + uint32_t readI64(int64_t& i64); - uint32_t readDouble(shared_ptr in, - double& dub) const; + uint32_t readDouble(double& dub); - uint32_t readString(shared_ptr in, - std::string& str) const; + uint32_t readString(std::string& str); +}; + +/** + * Constructs binary protocol handlers + */ +class TBinaryProtocolFactory : public TProtocolFactory { + public: + TBinaryProtocolFactory() {} + + virtual ~TBinaryProtocolFactory() {} + + std::pair, boost::shared_ptr > getIOProtocols(boost::shared_ptr in, boost::shared_ptr out) { + boost::shared_ptr prot(new TBinaryProtocol(in, out)); + return std::make_pair(prot, prot); + } }; }}} // facebook::thrift::protocol diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h index 60d85dcdb..8077b27e6 100644 --- a/lib/cpp/src/protocol/TProtocol.h +++ b/lib/cpp/src/protocol/TProtocol.h @@ -82,170 +82,144 @@ class TProtocol { * Writing functions. */ - virtual uint32_t writeMessageBegin(shared_ptr out, - const std::string name, + virtual uint32_t writeMessageBegin(const std::string name, const TMessageType messageType, - const int32_t seqid) const = 0; + const int32_t seqid) = 0; - virtual uint32_t writeMessageEnd(shared_ptr out) const = 0; + virtual uint32_t writeMessageEnd() = 0; - virtual uint32_t writeStructBegin(shared_ptr out, - const std::string& name) const = 0; + virtual uint32_t writeStructBegin(const std::string& name) = 0; - virtual uint32_t writeStructEnd(shared_ptr out) const = 0; + virtual uint32_t writeStructEnd() = 0; - virtual uint32_t writeFieldBegin(shared_ptr out, - const std::string& name, + virtual uint32_t writeFieldBegin(const std::string& name, const TType fieldType, - const int16_t fieldId) const = 0; + const int16_t fieldId) = 0; - virtual uint32_t writeFieldEnd(shared_ptr out) const = 0; + virtual uint32_t writeFieldEnd() = 0; - virtual uint32_t writeFieldStop(shared_ptr out) const = 0; + virtual uint32_t writeFieldStop() = 0; - virtual uint32_t writeMapBegin(shared_ptr out, - const TType keyType, + virtual uint32_t writeMapBegin(const TType keyType, const TType valType, - const uint32_t size) const = 0; + const uint32_t size) = 0; - virtual uint32_t writeMapEnd(shared_ptr out) const = 0; + virtual uint32_t writeMapEnd() = 0; - virtual uint32_t writeListBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const = 0; + virtual uint32_t writeListBegin(const TType elemType, + const uint32_t size) = 0; - virtual uint32_t writeListEnd(shared_ptr out) const = 0; + virtual uint32_t writeListEnd() = 0; - virtual uint32_t writeSetBegin(shared_ptr out, - const TType elemType, - const uint32_t size) const = 0; + virtual uint32_t writeSetBegin(const TType elemType, + const uint32_t size) = 0; - virtual uint32_t writeSetEnd(shared_ptr out) const = 0; + virtual uint32_t writeSetEnd() = 0; - virtual uint32_t writeBool(shared_ptr out, - const bool value) const = 0; + virtual uint32_t writeBool(const bool value) = 0; - virtual uint32_t writeByte(shared_ptr out, - const int8_t byte) const = 0; + virtual uint32_t writeByte(const int8_t byte) = 0; - virtual uint32_t writeI16(shared_ptr out, - const int16_t i16) const = 0; + virtual uint32_t writeI16(const int16_t i16) = 0; - virtual uint32_t writeI32(shared_ptr out, - const int32_t i32) const = 0; + virtual uint32_t writeI32(const int32_t i32) = 0; - virtual uint32_t writeI64(shared_ptr out, - const int64_t i64) const = 0; + virtual uint32_t writeI64(const int64_t i64) = 0; - virtual uint32_t writeDouble(shared_ptr out, - const double dub) const = 0; + virtual uint32_t writeDouble(const double dub) = 0; - virtual uint32_t writeString(shared_ptr out, - const std::string& str) const = 0; + virtual uint32_t writeString(const std::string& str) = 0; /** * Reading functions */ - virtual uint32_t readMessageBegin(shared_ptr in, - std::string& name, + virtual uint32_t readMessageBegin(std::string& name, TMessageType& messageType, - int32_t& seqid) const = 0; + int32_t& seqid) = 0; - virtual uint32_t readMessageEnd(shared_ptr in) const = 0; + virtual uint32_t readMessageEnd() = 0; - virtual uint32_t readStructBegin(shared_ptr in, - std::string& name) const = 0; + virtual uint32_t readStructBegin(std::string& name) = 0; - virtual uint32_t readStructEnd(shared_ptr in) const = 0; + virtual uint32_t readStructEnd() = 0; - virtual uint32_t readFieldBegin(shared_ptr in, - std::string& name, + virtual uint32_t readFieldBegin(std::string& name, TType& fieldType, - int16_t& fieldId) const = 0; + int16_t& fieldId) = 0; - virtual uint32_t readFieldEnd(shared_ptr in) const = 0; + virtual uint32_t readFieldEnd() = 0; - virtual uint32_t readMapBegin(shared_ptr in, - TType& keyType, + virtual uint32_t readMapBegin(TType& keyType, TType& valType, - uint32_t& size) const = 0; + uint32_t& size) = 0; - virtual uint32_t readMapEnd(shared_ptr in) const = 0; + virtual uint32_t readMapEnd() = 0; - virtual uint32_t readListBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const = 0; + virtual uint32_t readListBegin(TType& elemType, + uint32_t& size) = 0; - virtual uint32_t readListEnd(shared_ptr in) const = 0; + virtual uint32_t readListEnd() = 0; - virtual uint32_t readSetBegin(shared_ptr in, - TType& elemType, - uint32_t& size) const = 0; + virtual uint32_t readSetBegin(TType& elemType, + uint32_t& size) = 0; - virtual uint32_t readSetEnd(shared_ptr in) const = 0; + virtual uint32_t readSetEnd() = 0; - virtual uint32_t readBool(shared_ptr in, - bool& value) const = 0; + virtual uint32_t readBool(bool& value) = 0; - virtual uint32_t readByte(shared_ptr in, - int8_t& byte) const = 0; + virtual uint32_t readByte(int8_t& byte) = 0; - virtual uint32_t readI16(shared_ptr in, - int16_t& i16) const = 0; + virtual uint32_t readI16(int16_t& i16) = 0; - virtual uint32_t readI32(shared_ptr in, - int32_t& i32) const = 0; + virtual uint32_t readI32(int32_t& i32) = 0; - virtual uint32_t readI64(shared_ptr in, - int64_t& i64) const = 0; + virtual uint32_t readI64(int64_t& i64) = 0; - virtual uint32_t readDouble(shared_ptr in, - double& dub) const = 0; + virtual uint32_t readDouble(double& dub) = 0; - virtual uint32_t readString(shared_ptr in, - std::string& str) const = 0; + virtual uint32_t readString(std::string& str) = 0; /** * Method to arbitrarily skip over data. */ - uint32_t skip(shared_ptr in, TType type) const { + uint32_t skip(TType type) { switch (type) { case T_BOOL: { bool boolv; - return readBool(in, boolv); + return readBool(boolv); } case T_BYTE: { int8_t bytev; - return readByte(in, bytev); + return readByte(bytev); } case T_I16: { int16_t i16; - return readI16(in, i16); + return readI16(i16); } case T_I32: { int32_t i32; - return readI32(in, i32); + return readI32(i32); } case T_I64: { int64_t i64; - return readI64(in, i64); + return readI64(i64); } case T_DOUBLE: { double dub; - return readDouble(in, dub); + return readDouble(dub); } case T_STRING: { std::string str; - return readString(in, str); + return readString(str); } case T_STRUCT: { @@ -253,16 +227,16 @@ class TProtocol { std::string name; int16_t fid; TType ftype; - result += readStructBegin(in, name); + result += readStructBegin(name); while (true) { - result += readFieldBegin(in, name, ftype, fid); + result += readFieldBegin(name, ftype, fid); if (ftype == T_STOP) { break; } - result += skip(in, ftype); - result += readFieldEnd(in); + result += skip(ftype); + result += readFieldEnd(); } - result += readStructEnd(in); + result += readStructEnd(); return result; } case T_MAP: @@ -271,12 +245,12 @@ class TProtocol { TType keyType; TType valType; uint32_t i, size; - result += readMapBegin(in, keyType, valType, size); + result += readMapBegin(keyType, valType, size); for (i = 0; i < size; i++) { - result += skip(in, keyType); - result += skip(in, valType); + result += skip(keyType); + result += skip(valType); } - result += readMapEnd(in); + result += readMapEnd(); return result; } case T_SET: @@ -284,11 +258,11 @@ class TProtocol { uint32_t result = 0; TType elemType; uint32_t i, size; - result += readSetBegin(in, elemType, size); + result += readSetBegin(elemType, size); for (i = 0; i < size; i++) { - result += skip(in, elemType); + result += skip(elemType); } - result += readSetEnd(in); + result += readSetEnd(); return result; } case T_LIST: @@ -296,11 +270,11 @@ class TProtocol { uint32_t result = 0; TType elemType; uint32_t i, size; - result += readListBegin(in, elemType, size); + result += readListBegin(elemType, size); for (i = 0; i < size; i++) { - result += skip(in, elemType); + result += skip(elemType); } - result += readListEnd(in); + result += readListEnd(); return result; } default: @@ -308,10 +282,39 @@ class TProtocol { } } + shared_ptr getInputTransport() { + return inputTransport_; + } + + shared_ptr getOutputTransport() { + return outputTransport_; + } + protected: + TProtocol(shared_ptr in, shared_ptr out) : + inputTransport_(in), + outputTransport_(out) {} + + shared_ptr inputTransport_; + + shared_ptr outputTransport_; + + private: TProtocol() {} }; +/** + * Constructs input and output protocol objects given transports. + */ +class TProtocolFactory { + public: + TProtocolFactory() {} + + virtual ~TProtocolFactory() {} + + virtual std::pair, boost::shared_ptr > getIOProtocols(boost::shared_ptr in, boost::shared_ptr out) = 0; +}; + }}} // facebook::thrift::protocol #endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1 diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index bc39877a0..75a209ea8 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -153,7 +153,7 @@ void TConnection::transition() { try { // Invoke the processor - server_->getProcessor()->process(inputTransport_, outputTransport_); + server_->getProcessor()->process(inputProtocol_, outputProtocol_); } catch (TTransportException &x) { fprintf(stderr, "Server::process %s\n", x.getMessage().c_str()); close(); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index ec024c031..faa572ff3 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -1,9 +1,9 @@ #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1 -#include "Thrift.h" -#include "server/TServer.h" -#include "transport/TMemoryBuffer.h" +#include +#include +#include #include #include @@ -50,10 +50,8 @@ class TNonblockingServer : public TServer { void handleEvent(int fd, short which); public: - TNonblockingServer(shared_ptr processor, - shared_ptr options, - int port) : - TServer(processor, options), + TNonblockingServer(shared_ptr processor, int port) : + TServer(processor), serverSocket_(0), port_(port), frameResponses_(true) {} @@ -157,6 +155,12 @@ class TConnection { // Transport that processor writes to shared_ptr outputTransport_; + // Protocol encoder + shared_ptr outputProtocol_; + + // Protocol decoder + shared_ptr inputProtocol_; + // Go into read mode void setRead() { setFlags(EV_READ | EV_PERSIST); @@ -190,6 +194,12 @@ class TConnection { inputTransport_ = shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_ = shared_ptr(new TMemoryBuffer()); + // Create protocol + std::pair,shared_ptr > iop; + iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_); + inputProtocol_ = iop.first; + outputProtocol_ = iop.second; + init(socket, eventFlags, s); } diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 293aa2852..4b20432dc 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -14,8 +14,6 @@ using namespace facebook::thrift; using namespace facebook::thrift::transport; using namespace boost; -class TServerOptions; - /** * Thrift server. * @@ -24,45 +22,61 @@ class TServerOptions; class TServer : public concurrency::Runnable { public: virtual ~TServer() {} + virtual void serve() = 0; // Allows running the server as a Runnable thread - virtual void run() { serve(); } + virtual void run() { + serve(); + } shared_ptr getProcessor() { return processor_; } + shared_ptr getProtocolFactory() { + return protocolFactory_; + } + protected: TServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, - shared_ptr options) : + shared_ptr protocolFactory) : processor_(processor), serverTransport_(serverTransport), transportFactory_(transportFactory), - options_(options) {} + protocolFactory_(protocolFactory) {} TServer(shared_ptr processor, - shared_ptr options) : - processor_(processor), options_(options) {} + shared_ptr serverTransport, + shared_ptr transportFactory) : + processor_(processor), + serverTransport_(serverTransport), + transportFactory_(transportFactory) { + protocolFactory_ = boost::shared_ptr(new TBinaryProtocolFactory()); + } + + TServer(shared_ptr processor, + shared_ptr serverTransport) : + processor_(processor), + serverTransport_(serverTransport) { + transportFactory_ = boost::shared_ptr(new TTransportFactory()); + protocolFactory_ = boost::shared_ptr(new TBinaryProtocolFactory()); + } + + TServer(shared_ptr processor) : + processor_(processor) { + transportFactory_ = boost::shared_ptr(new TTransportFactory()); + protocolFactory_ = boost::shared_ptr(new TBinaryProtocolFactory()); + } shared_ptr processor_; shared_ptr serverTransport_; shared_ptr transportFactory_; - shared_ptr options_; + shared_ptr protocolFactory_; }; -/** - * Class to encapsulate all generic server options. - */ -class TServerOptions { - public: - // TODO(mcslee): Fill in getters/setters here - protected: - // TODO(mcslee): Fill data members in here -}; - }}} // facebook::thrift::server #endif // #ifndef _THRIFT_SERVER_TSERVER_H_ diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp index 63b6c6be3..3eb035eed 100644 --- a/lib/cpp/src/server/TSimpleServer.cpp +++ b/lib/cpp/src/server/TSimpleServer.cpp @@ -14,7 +14,8 @@ namespace facebook { namespace thrift { namespace server { void TSimpleServer::serve() { shared_ptr client; - pair,shared_ptr > io; + pair,shared_ptr > iot; + pair,shared_ptr > iop; try { // Start the server listening @@ -28,14 +29,15 @@ void TSimpleServer::serve() { try { while (true) { client = serverTransport_->accept(); - io = transportFactory_->getIOTransports(client); + iot = transportFactory_->getIOTransports(client); + iop = protocolFactory_->getIOProtocols(iot.first, iot.second); try { - while (processor_->process(io.first, io.second)) {} + while (processor_->process(iop.first, iop.second)) {} } catch (TTransportException& ttx) { cerr << "TSimpleServer client died: " << ttx.getMessage() << endl; } - io.first->close(); - io.second->close(); + iot.first->close(); + iot.second->close(); client->close(); } } catch (TTransportException& ttx) { diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h index a0d22a76d..647051970 100644 --- a/lib/cpp/src/server/TSimpleServer.h +++ b/lib/cpp/src/server/TSimpleServer.h @@ -19,8 +19,8 @@ class TSimpleServer : public TServer { TSimpleServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, - shared_ptr options) : - TServer(processor, serverTransport, transportFactory, options) {} + shared_ptr protocolFactory) : + TServer(processor, serverTransport, transportFactory, protocolFactory) {} ~TSimpleServer() {} diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 43f746353..7885f0f5a 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -16,8 +16,8 @@ class TThreadPoolServer::Task: public Runnable { public: Task(shared_ptr processor, - shared_ptr input, - shared_ptr output) : + shared_ptr input, + shared_ptr output) : processor_(processor), input_(input), output_(output) { @@ -35,23 +35,24 @@ public: break; } } - input_->close(); - output_->close(); + input_->getInputTransport()->close(); + output_->getOutputTransport()->close(); } private: shared_ptr processor_; - shared_ptr input_; - shared_ptr output_; + shared_ptr input_; + shared_ptr output_; }; TThreadPoolServer::TThreadPoolServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, - shared_ptr threadManager, - shared_ptr options) : - TServer(processor, serverTransport, transportFactory, options), + shared_ptr protocolFactory, + + shared_ptr threadManager) : + TServer(processor, serverTransport, transportFactory, protocolFactory), threadManager_(threadManager) { } @@ -60,7 +61,8 @@ TThreadPoolServer::~TThreadPoolServer() {} void TThreadPoolServer::serve() { shared_ptr client; - pair,shared_ptr > io; + pair,shared_ptr > iot; + pair,shared_ptr > iop; try { // Start the server listening @@ -75,9 +77,11 @@ void TThreadPoolServer::serve() { // Fetch client from server client = serverTransport_->accept(); // Make IO transports - io = transportFactory_->getIOTransports(client); + iot = transportFactory_->getIOTransports(client); + iop = protocolFactory_->getIOProtocols(iot.first, iot.second); + // Add to threadmanager pool - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, io.first, io.second))); + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, iop.first, iop.second))); } catch (TTransportException& ttx) { break; } diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index b8f8b47fc..5c5899e93 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -21,8 +21,8 @@ public: TThreadPoolServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, - shared_ptr threadManager, - shared_ptr options); + shared_ptr protocolFactory, + shared_ptr threadManager); virtual ~TThreadPoolServer(); diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h index b01faac8f..b89198e11 100644 --- a/lib/cpp/src/transport/TBufferedRouterTransport.h +++ b/lib/cpp/src/transport/TBufferedRouterTransport.h @@ -87,6 +87,30 @@ class TBufferedRouterTransport : public TTransport { uint32_t wLen_; }; + +/** + * Wraps a transport into a bufferedRouter instance. + * + * @author Aditya Agarwal + */ +class TBufferedRouterTransportFactory : public TTransportFactory { + public: + TBufferedRouterTransportFactory(boost::shared_ptr rTrans): rTrans_(rTrans) {} + + virtual ~TBufferedRouterTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + boost::shared_ptr buffered(new TBufferedRouterTransport(trans, rTrans_)); + return std::make_pair(buffered, buffered); + } + + private: + boost::shared_ptr rTrans_; +}; + }}} // facebook::thrift::transport #endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h deleted file mode 100644 index 2b82570c8..000000000 --- a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ -#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1 - -#include -#include -#include - -namespace facebook { namespace thrift { namespace transport { - -/** - * Wraps a transport into a bufferedRouter instance. - * - * @author Aditya Agarwal - */ -class TBufferedRouterTransportFactory : public TTransportFactory { - public: - TBufferedRouterTransportFactory(boost::shared_ptr rTrans): rTrans_(rTrans) {} - - virtual ~TBufferedRouterTransportFactory() {} - - /** - * Wraps the transport into a buffered one. - */ - virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { - boost::shared_ptr buffered(new TBufferedRouterTransport(trans, rTrans_)); - return std::make_pair(buffered, buffered); - } - - private: - boost::shared_ptr rTrans_; -}; - -}}} - -#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ diff --git a/lib/cpp/src/transport/TBufferedTransport.cpp b/lib/cpp/src/transport/TBufferedTransport.cpp deleted file mode 100644 index 9dfd63a39..000000000 --- a/lib/cpp/src/transport/TBufferedTransport.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "TBufferedTransport.h" -using std::string; - -namespace facebook { namespace thrift { namespace transport { - -uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { - uint32_t need = len; - - // We don't have enough data yet - if (rLen_-rPos_ < need) { - // Copy out whatever we have - if (rLen_-rPos_ > 0) { - memcpy(buf, rBuf_+rPos_, rLen_-rPos_); - need -= rLen_-rPos_; - buf += rLen_-rPos_; - } - // Get more from underlying transport up to buffer size - // TODO: should this be a readAll? - rLen_ = transport_->read(rBuf_, rBufSize_); - rPos_ = 0; - } - - // Hand over whatever we have - uint32_t give = need; - if (rLen_-rPos_ < give) { - give = rLen_-rPos_; - } - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; - return (len - need); -} - -void TBufferedTransport::write(const uint8_t* buf, uint32_t len) { - if (len == 0) { - return; - } - - if (len + wLen_ >= wBufSize_) { - uint32_t copy = wBufSize_ - wLen_; - memcpy(wBuf_ + wLen_, buf, copy); - transport_->write(wBuf_, wBufSize_); - - wLen_ = len - copy; - if (wLen_ > 0) { - memcpy(wBuf_, buf+copy, wLen_); - } - } else { - memcpy(wBuf_+wLen_, buf, len); - wLen_ += len; - } -} - -void TBufferedTransport::flush() { - // Write out any data waiting in the write buffer - if (wLen_ > 0) { - transport_->write(wBuf_, wLen_); - wLen_ = 0; - } - - // Flush the underlying transport - transport_->flush(); -} - -}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h deleted file mode 100644 index 9992777ba..000000000 --- a/lib/cpp/src/transport/TBufferedTransport.h +++ /dev/null @@ -1,83 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1 - -#include "TTransport.h" -#include - -#include - -namespace facebook { namespace thrift { namespace transport { - -using namespace boost; - -/** - * Buffered transport. For reads it will read more data than is requested - * and will serve future data out of a local buffer. For writes, data is - * stored to an in memory buffer before being written out. - * - * @author Mark Slee - */ -class TBufferedTransport : public TTransport { - public: - TBufferedTransport(shared_ptr transport) : - transport_(transport), - rBufSize_(512), rPos_(0), rLen_(0), - wBufSize_(512), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - TBufferedTransport(shared_ptr transport, uint32_t sz) : - transport_(transport), - rBufSize_(sz), rPos_(0), rLen_(0), - wBufSize_(sz), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - TBufferedTransport(shared_ptr transport, uint32_t rsz, uint32_t wsz) : - transport_(transport), - rBufSize_(rsz), rPos_(0), rLen_(0), - wBufSize_(wsz), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - ~TBufferedTransport() { - delete [] rBuf_; - delete [] wBuf_; - } - - bool isOpen() { - return transport_->isOpen(); - } - - void open() { - transport_->open(); - } - - void close() { - transport_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - void flush(); - - protected: - shared_ptr transport_; - uint8_t* rBuf_; - uint32_t rBufSize_; - uint32_t rPos_; - uint32_t rLen_; - - uint8_t* wBuf_; - uint32_t wBufSize_; - uint32_t wLen_; -}; - -}}} // facebook::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TBufferedTransportFactory.h b/lib/cpp/src/transport/TBufferedTransportFactory.h deleted file mode 100644 index c6e87b18d..000000000 --- a/lib/cpp/src/transport/TBufferedTransportFactory.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ -#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1 - -#include -#include -#include - -namespace facebook { namespace thrift { namespace transport { - -/** - * Wraps a transport into a buffered one. - * - * @author Mark Slee - */ -class TBufferedTransportFactory : public TTransportFactory { - public: - TBufferedTransportFactory() {} - - virtual ~TBufferedTransportFactory() {} - - /** - * Wraps the transport into a buffered one. - */ - virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { - boost::shared_ptr buffered(new TBufferedTransport(trans)); - return std::make_pair(buffered, buffered); - } - -}; - -}}} - -#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ diff --git a/lib/cpp/src/transport/TFramedTransport.cpp b/lib/cpp/src/transport/TFramedTransport.cpp deleted file mode 100644 index a05fd27e2..000000000 --- a/lib/cpp/src/transport/TFramedTransport.cpp +++ /dev/null @@ -1,121 +0,0 @@ -#include -#include - -using std::string; - -namespace facebook { namespace thrift { namespace transport { - -uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) { - if (!read_) { - return transport_->read(buf, len); - } - - uint32_t need = len; - - // We don't have enough data yet - if (rLen_-rPos_ < need) { - // Copy out whatever we have - if (rLen_-rPos_ > 0) { - memcpy(buf, rBuf_+rPos_, rLen_-rPos_); - need -= rLen_-rPos_; - buf += rLen_-rPos_; - } - - // Read another chunk - readFrame(); - } - - // Hand over whatever we have - uint32_t give = need; - if (rLen_-rPos_ < give) { - give = rLen_-rPos_; - } - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; - return (len - need); -} - -void TFramedTransport::readFrame() { - // Get rid of the old frame - if (rBuf_ != NULL) { - delete [] rBuf_; - rBuf_ = NULL; - } - - // Read in the next chunk size - int32_t sz; - transport_->readAll((uint8_t*)&sz, 4); - sz = (int32_t)ntohl(sz); - - if (sz < 0) { - throw new TTransportException("Frame size has negative value"); - } - - // Read the frame payload, reset markers - rBuf_ = new uint8_t[sz]; - transport_->readAll(rBuf_, sz); - rPos_ = 0; - rLen_ = sz; -} - -void TFramedTransport::write(const uint8_t* buf, uint32_t len) { - if (len == 0) { - return; - } - - // Shortcut out if not write mode - if (!write_) { - transport_->write(buf, len); - return; - } - - // Need to grow the buffer - if (len + wLen_ >= wBufSize_) { - - // Double buffer size until sufficient - while (wBufSize_ < len + wLen_) { - wBufSize_ *= 2; - } - - // Allocate new buffer - uint8_t* wBuf2 = new uint8_t[wBufSize_]; - - // Copy the old buffer to the new one - memcpy(wBuf2, wBuf_, wLen_); - - // Now point buf to the new one - delete [] wBuf_; - wBuf_ = wBuf2; - } - - // Copy data into buffer - memcpy(wBuf_ + wLen_, buf, len); - wLen_ += len; -} - -void TFramedTransport::flush() { - if (!write_) { - transport_->flush(); - return; - } - - // Write frame size - int32_t sz = wLen_; - sz = (int32_t)htonl(sz); - - transport_->write((const uint8_t*)&sz, 4); - - // Write frame body - if (sz > 0) { - transport_->write(wBuf_, wLen_); - } - - // All done - wLen_ = 0; - - // Flush the underlying - transport_->flush(); -} - -}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TFramedTransport.h b/lib/cpp/src/transport/TFramedTransport.h deleted file mode 100644 index 37b82b428..000000000 --- a/lib/cpp/src/transport/TFramedTransport.h +++ /dev/null @@ -1,101 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_ 1 - -#include "TTransport.h" -#include -#include - -namespace facebook { namespace thrift { namespace transport { - -using namespace boost; - -/** - * Framed transport. All writes go into an in-memory buffer until flush is - * called, at which point the transport writes the length of the entire - * binary chunk followed by the data payload. This allows the receiver on the - * other end to always do fixed-length reads. - * - * @author Mark Slee - */ -class TFramedTransport : public TTransport { - public: - TFramedTransport(shared_ptr transport) : - transport_(transport), - rPos_(0), - rLen_(0), - read_(true), - wBufSize_(512), - wLen_(0), - write_(true) { - rBuf_ = NULL; - wBuf_ = new uint8_t[wBufSize_]; - } - - TFramedTransport(shared_ptr transport, uint32_t sz) : - transport_(transport), - rPos_(0), - rLen_(0), - read_(true), - wBufSize_(sz), - wLen_(0), - write_(true) { - rBuf_ = NULL; - wBuf_ = new uint8_t[wBufSize_]; - } - - ~TFramedTransport() { - if (rBuf_ != NULL) { - delete [] rBuf_; - } - if (wBuf_ != NULL) { - delete [] wBuf_; - } - } - - void setRead(bool read) { - read_ = read; - } - - void setWrite(bool write) { - write_ = write; - } - - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - void close() { - transport_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - void flush(); - - protected: - shared_ptr transport_; - uint8_t* rBuf_; - uint32_t rPos_; - uint32_t rLen_; - bool read_; - - uint8_t* wBuf_; - uint32_t wBufSize_; - uint32_t wLen_; - bool write_; - - /** - * Reads a frame of input from the underlying stream. - */ - void readFrame(); -}; - -}}} // facebook::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TMemoryBuffer.cpp b/lib/cpp/src/transport/TMemoryBuffer.cpp deleted file mode 100644 index 084f29787..000000000 --- a/lib/cpp/src/transport/TMemoryBuffer.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "TMemoryBuffer.h" - -namespace facebook { namespace thrift { namespace transport { - -uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) { - // Check avaible data for reading - uint32_t avail = wPos_ - rPos_; - - // Device how much to give - uint32_t give = len; - if (avail < len) { - give = avail; - } - - // Copy into buffer and increment rPos_ - memcpy(buf, buffer_ + rPos_, give); - rPos_ += give; - - return give; -} - -void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) { - // Check available space - uint32_t avail = bufferSize_ - wPos_; - - // Grow the buffer - if (len > avail) { - if (!owner_) { - throw TTransportException("Insufficient space in external MemoryBuffer"); - } - while (len > avail) { - bufferSize_ *= 2; - buffer_ = (uint8_t*)realloc(buffer_, bufferSize_); - if (buffer_ == NULL) { - throw TTransportException("Out of memory."); - } - } - } - - // Copy into the buffer and increment wPos_ - memcpy(buffer_ + wPos_, buf, len); - wPos_ += len; -} - -}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TMemoryBuffer.h b/lib/cpp/src/transport/TMemoryBuffer.h deleted file mode 100644 index 9aa31352b..000000000 --- a/lib/cpp/src/transport/TMemoryBuffer.h +++ /dev/null @@ -1,116 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ -#define _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ 1 - -#include "TTransport.h" -#include - -namespace facebook { namespace thrift { namespace transport { - -/** - * A memory buffer is a tranpsort that simply reads from and writes to an - * in memory buffer. Anytime you call write on it, the data is simply placed - * into a buffer, and anytime you call read, data is read from that buffer. - * - * The buffers are allocated using C constructs malloc,realloc, and the size - * doubles as necessary. - * - * @author Mark Slee - */ -class TMemoryBuffer : public TTransport { - public: - TMemoryBuffer() { - owner_ = true; - bufferSize_ = 1024; - buffer_ = (uint8_t*)malloc(bufferSize_); - if (buffer_ == NULL) { - throw TTransportException("Out of memory"); - } - wPos_ = 0; - rPos_ = 0; - } - - TMemoryBuffer(uint32_t sz) { - owner_ = true; - bufferSize_ = sz; - buffer_ = (uint8_t*)malloc(bufferSize_); - if (buffer_ == NULL) { - throw TTransportException("Out of memory"); - } - wPos_ = 0; - rPos_ = 0; - } - - TMemoryBuffer(uint8_t* buf, int sz) { - owner_ = false; - buffer_ = buf; - bufferSize_ = sz; - wPos_ = sz; - rPos_ = 0; - } - - ~TMemoryBuffer() { - if (owner_) { - if (buffer_ != NULL) { - free(buffer_); - buffer_ = NULL; - } - } - } - - bool isOpen() { - return true; - } - - - void open() {} - - void close() {} - - void getBuffer(uint8_t** bufPtr, uint32_t* sz) { - *bufPtr = buffer_; - *sz = wPos_; - } - - void resetBuffer() { - wPos_ = 0; - rPos_ = 0; - } - - void resetBuffer(uint8_t* buf, uint32_t sz) { - if (owner_) { - if (buffer_ != NULL) { - free(buffer_); - } - } - owner_ = false; - buffer_ = buf; - bufferSize_ = sz; - wPos_ = sz; - rPos_ = 0; - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - private: - // Data buffer - uint8_t* buffer_; - - // Allocated buffer size - uint32_t bufferSize_; - - // Where the write is at - uint32_t wPos_; - - // Where the reader is at - uint32_t rPos_; - - // Is this object the owner of the buffer? - bool owner_; - -}; - -}}} // facebook::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TNullTransport.h b/lib/cpp/src/transport/TNullTransport.h deleted file mode 100644 index dd2999f59..000000000 --- a/lib/cpp/src/transport/TNullTransport.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1 - -#include "TTransport.h" - -namespace facebook { namespace thrift { namespace transport { - -/** - * The null transport is a dummy transport that doesn't actually do anything. - * It's sort of an analogy to /dev/null, you can never read anything from it - * and it will let you write anything you want to it, though it won't actually - * go anywhere. - * - * @author Mark Slee - */ -class TNullTransport : public TTransport { - public: - TNullTransport() {} - ~TNullTransport() {} - - bool isOpen() { return true; } - void open() { } - void write(const std::string& s) {} -}; - -}}} // facebook::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index c5456d53c..7b4cbe14c 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -1,7 +1,8 @@ #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 -#include "Thrift.h" +#include +#include #include #include @@ -124,6 +125,28 @@ class TTransport { TTransport() {} }; +/** + * Generic factory class to make an input and output transport out of a + * source transport. Commonly used inside servers to make input and output + * streams out of raw clients. + * + * @author Mark Slee + */ +class TTransportFactory { + public: + TTransportFactory() {} + + virtual ~TTransportFactory() {} + + /** + * Default implementation does nothing, just returns the transport given. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + return std::make_pair(trans, trans); + } + +}; + }}} // facebook::thrift::transport #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransportFactory.h b/lib/cpp/src/transport/TTransportFactory.h deleted file mode 100644 index abd1048d8..000000000 --- a/lib/cpp/src/transport/TTransportFactory.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ -#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1 - -#include -#include - -namespace facebook { namespace thrift { namespace transport { - -/** - * Generic factory class to make an input and output transport out of a - * source transport. Commonly used inside servers to make input and output - * streams out of raw clients. - * - * @author Mark Slee - */ -class TTransportFactory { - public: - TTransportFactory() {} - - virtual ~TTransportFactory() {} - - /** - * Default implementation does nothing, just returns the transport given. - */ - virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { - return std::make_pair(trans, trans); - } - -}; - -}}} - -#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp new file mode 100644 index 000000000..d9f477509 --- /dev/null +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -0,0 +1,219 @@ +#include + +using std::string; + +namespace facebook { namespace thrift { namespace transport { + +uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { + uint32_t need = len; + + // We don't have enough data yet + if (rLen_-rPos_ < need) { + // Copy out whatever we have + if (rLen_-rPos_ > 0) { + memcpy(buf, rBuf_+rPos_, rLen_-rPos_); + need -= rLen_-rPos_; + buf += rLen_-rPos_; + } + // Get more from underlying transport up to buffer size + // TODO: should this be a readAll? + rLen_ = transport_->read(rBuf_, rBufSize_); + rPos_ = 0; + } + + // Hand over whatever we have + uint32_t give = need; + if (rLen_-rPos_ < give) { + give = rLen_-rPos_; + } + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + return (len - need); +} + +void TBufferedTransport::write(const uint8_t* buf, uint32_t len) { + if (len == 0) { + return; + } + + if (len + wLen_ >= wBufSize_) { + uint32_t copy = wBufSize_ - wLen_; + memcpy(wBuf_ + wLen_, buf, copy); + transport_->write(wBuf_, wBufSize_); + + wLen_ = len - copy; + if (wLen_ > 0) { + memcpy(wBuf_, buf+copy, wLen_); + } + } else { + memcpy(wBuf_+wLen_, buf, len); + wLen_ += len; + } +} + +void TBufferedTransport::flush() { + // Write out any data waiting in the write buffer + if (wLen_ > 0) { + transport_->write(wBuf_, wLen_); + wLen_ = 0; + } + + // Flush the underlying transport + transport_->flush(); +} + +uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) { + if (!read_) { + return transport_->read(buf, len); + } + + uint32_t need = len; + + // We don't have enough data yet + if (rLen_-rPos_ < need) { + // Copy out whatever we have + if (rLen_-rPos_ > 0) { + memcpy(buf, rBuf_+rPos_, rLen_-rPos_); + need -= rLen_-rPos_; + buf += rLen_-rPos_; + } + + // Read another chunk + readFrame(); + } + + // Hand over whatever we have + uint32_t give = need; + if (rLen_-rPos_ < give) { + give = rLen_-rPos_; + } + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + return (len - need); +} + +void TFramedTransport::readFrame() { + // Get rid of the old frame + if (rBuf_ != NULL) { + delete [] rBuf_; + rBuf_ = NULL; + } + + // Read in the next chunk size + int32_t sz; + transport_->readAll((uint8_t*)&sz, 4); + sz = (int32_t)ntohl(sz); + + if (sz < 0) { + throw new TTransportException("Frame size has negative value"); + } + + // Read the frame payload, reset markers + rBuf_ = new uint8_t[sz]; + transport_->readAll(rBuf_, sz); + rPos_ = 0; + rLen_ = sz; +} + +void TFramedTransport::write(const uint8_t* buf, uint32_t len) { + if (len == 0) { + return; + } + + // Shortcut out if not write mode + if (!write_) { + transport_->write(buf, len); + return; + } + + // Need to grow the buffer + if (len + wLen_ >= wBufSize_) { + + // Double buffer size until sufficient + while (wBufSize_ < len + wLen_) { + wBufSize_ *= 2; + } + + // Allocate new buffer + uint8_t* wBuf2 = new uint8_t[wBufSize_]; + + // Copy the old buffer to the new one + memcpy(wBuf2, wBuf_, wLen_); + + // Now point buf to the new one + delete [] wBuf_; + wBuf_ = wBuf2; + } + + // Copy data into buffer + memcpy(wBuf_ + wLen_, buf, len); + wLen_ += len; +} + +void TFramedTransport::flush() { + if (!write_) { + transport_->flush(); + return; + } + + // Write frame size + int32_t sz = wLen_; + sz = (int32_t)htonl(sz); + + transport_->write((const uint8_t*)&sz, 4); + + // Write frame body + if (sz > 0) { + transport_->write(wBuf_, wLen_); + } + + // All done + wLen_ = 0; + + // Flush the underlying + transport_->flush(); +} + +uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) { + // Check avaible data for reading + uint32_t avail = wPos_ - rPos_; + + // Device how much to give + uint32_t give = len; + if (avail < len) { + give = avail; + } + + // Copy into buffer and increment rPos_ + memcpy(buf, buffer_ + rPos_, give); + rPos_ += give; + + return give; +} + +void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) { + // Check available space + uint32_t avail = bufferSize_ - wPos_; + + // Grow the buffer + if (len > avail) { + if (!owner_) { + throw TTransportException("Insufficient space in external MemoryBuffer"); + } + while (len > avail) { + bufferSize_ *= 2; + buffer_ = (uint8_t*)realloc(buffer_, bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory."); + } + } + } + + // Copy into the buffer and increment wPos_ + memcpy(buffer_ + wPos_, buf, len); + wPos_ += len; +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h new file mode 100644 index 000000000..8d8d093e4 --- /dev/null +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -0,0 +1,316 @@ +#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ +#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1 + +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * The null transport is a dummy transport that doesn't actually do anything. + * It's sort of an analogy to /dev/null, you can never read anything from it + * and it will let you write anything you want to it, though it won't actually + * go anywhere. + * + * @author Mark Slee + */ +class TNullTransport : public TTransport { + public: + TNullTransport() {} + + ~TNullTransport() {} + + bool isOpen() { + return true; + } + + void open() {} + + void write(const std::string& s) {} +}; + + +/** + * Buffered transport. For reads it will read more data than is requested + * and will serve future data out of a local buffer. For writes, data is + * stored to an in memory buffer before being written out. + * + * @author Mark Slee + */ +class TBufferedTransport : public TTransport { + public: + TBufferedTransport(boost::shared_ptr transport) : + transport_(transport), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(512), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + TBufferedTransport(boost::shared_ptr transport, uint32_t sz) : + transport_(transport), + rBufSize_(sz), rPos_(0), rLen_(0), + wBufSize_(sz), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + TBufferedTransport(boost::shared_ptr transport, uint32_t rsz, uint32_t wsz) : + transport_(transport), + rBufSize_(rsz), rPos_(0), rLen_(0), + wBufSize_(wsz), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + ~TBufferedTransport() { + delete [] rBuf_; + delete [] wBuf_; + } + + bool isOpen() { + return transport_->isOpen(); + } + + void open() { + transport_->open(); + } + + void close() { + transport_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + void flush(); + + protected: + boost::shared_ptr transport_; + uint8_t* rBuf_; + uint32_t rBufSize_; + uint32_t rPos_; + uint32_t rLen_; + + uint8_t* wBuf_; + uint32_t wBufSize_; + uint32_t wLen_; +}; + +/** + * Wraps a transport into a buffered one. + * + * @author Mark Slee + */ +class TBufferedTransportFactory : public TTransportFactory { + public: + TBufferedTransportFactory() {} + + virtual ~TBufferedTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + boost::shared_ptr buffered(new TBufferedTransport(trans)); + return std::make_pair(buffered, buffered); + } + +}; + +/** + * Framed transport. All writes go into an in-memory buffer until flush is + * called, at which point the transport writes the length of the entire + * binary chunk followed by the data payload. This allows the receiver on the + * other end to always do fixed-length reads. + * + * @author Mark Slee + */ +class TFramedTransport : public TTransport { + public: + TFramedTransport(boost::shared_ptr transport) : + transport_(transport), + rPos_(0), + rLen_(0), + read_(true), + wBufSize_(512), + wLen_(0), + write_(true) { + rBuf_ = NULL; + wBuf_ = new uint8_t[wBufSize_]; + } + + TFramedTransport(boost::shared_ptr transport, uint32_t sz) : + transport_(transport), + rPos_(0), + rLen_(0), + read_(true), + wBufSize_(sz), + wLen_(0), + write_(true) { + rBuf_ = NULL; + wBuf_ = new uint8_t[wBufSize_]; + } + + ~TFramedTransport() { + if (rBuf_ != NULL) { + delete [] rBuf_; + } + if (wBuf_ != NULL) { + delete [] wBuf_; + } + } + + void setRead(bool read) { + read_ = read; + } + + void setWrite(bool write) { + write_ = write; + } + + void open() { + transport_->open(); + } + + bool isOpen() { + return transport_->isOpen(); + } + + void close() { + transport_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + void flush(); + + protected: + boost::shared_ptr transport_; + uint8_t* rBuf_; + uint32_t rPos_; + uint32_t rLen_; + bool read_; + + uint8_t* wBuf_; + uint32_t wBufSize_; + uint32_t wLen_; + bool write_; + + /** + * Reads a frame of input from the underlying stream. + */ + void readFrame(); +}; + +/** + * A memory buffer is a tranpsort that simply reads from and writes to an + * in memory buffer. Anytime you call write on it, the data is simply placed + * into a buffer, and anytime you call read, data is read from that buffer. + * + * The buffers are allocated using C constructs malloc,realloc, and the size + * doubles as necessary. + * + * @author Mark Slee + */ +class TMemoryBuffer : public TTransport { + public: + TMemoryBuffer() { + owner_ = true; + bufferSize_ = 1024; + buffer_ = (uint8_t*)malloc(bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory"); + } + wPos_ = 0; + rPos_ = 0; + } + + TMemoryBuffer(uint32_t sz) { + owner_ = true; + bufferSize_ = sz; + buffer_ = (uint8_t*)malloc(bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory"); + } + wPos_ = 0; + rPos_ = 0; + } + + TMemoryBuffer(uint8_t* buf, int sz) { + owner_ = false; + buffer_ = buf; + bufferSize_ = sz; + wPos_ = sz; + rPos_ = 0; + } + + ~TMemoryBuffer() { + if (owner_) { + if (buffer_ != NULL) { + free(buffer_); + buffer_ = NULL; + } + } + } + + bool isOpen() { + return true; + } + + + void open() {} + + void close() {} + + void getBuffer(uint8_t** bufPtr, uint32_t* sz) { + *bufPtr = buffer_; + *sz = wPos_; + } + + void resetBuffer() { + wPos_ = 0; + rPos_ = 0; + } + + void resetBuffer(uint8_t* buf, uint32_t sz) { + if (owner_) { + if (buffer_ != NULL) { + free(buffer_); + } + } + owner_ = false; + buffer_ = buf; + bufferSize_ = sz; + wPos_ = sz; + rPos_ = 0; + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + private: + // Data buffer + uint8_t* buffer_; + + // Allocated buffer size + uint32_t bufferSize_; + + // Where the write is at + uint32_t wPos_; + + // Where the reader is at + uint32_t rPos_; + + // Is this object the owner of the buffer? + bool owner_; + +}; + +}}} // facebook::thrift::transport + + +#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_