Change Thrift c++ to new protocol wrapping transport model

Summary: Also cleaned up excessive .h/.cpp files into Utils files

Reviewed By: aditya


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664838 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Slee 2006-10-25 19:02:49 +00:00
parent b3bd81f479
commit 4af6ed71e1
26 changed files with 936 additions and 1018 deletions

View File

@ -10,13 +10,11 @@ libthrift_sources = src/concurrency/Monitor.cpp \
src/concurrency/ThreadManager.cpp \ src/concurrency/ThreadManager.cpp \
src/concurrency/TimerManager.cpp \ src/concurrency/TimerManager.cpp \
src/protocol/TBinaryProtocol.cpp \ src/protocol/TBinaryProtocol.cpp \
src/transport/TBufferedTransport.cpp \
src/transport/TBufferedFileWriter.cpp \ src/transport/TBufferedFileWriter.cpp \
src/transport/TBufferedRouterTransport.cpp \ src/transport/TBufferedRouterTransport.cpp \
src/transport/TFramedTransport.cpp \
src/transport/TMemoryBuffer.cpp \
src/transport/TSocket.cpp \ src/transport/TSocket.cpp \
src/transport/TServerSocket.cpp \ src/transport/TServerSocket.cpp \
src/transport/TTransportUtils.cpp \
src/server/TSimpleServer.cpp \ src/server/TSimpleServer.cpp \
src/server/TThreadPoolServer.cpp \ src/server/TThreadPoolServer.cpp \
src/server/TNonblockingServer.cpp src/server/TNonblockingServer.cpp
@ -52,19 +50,14 @@ include_protocol_HEADERS = \
include_transportdir = $(include_thriftdir)/transport include_transportdir = $(include_thriftdir)/transport
include_transport_HEADERS = \ include_transport_HEADERS = \
src/transport/TBufferedTransport.h \ src/transport/TBufferedFileWriter.h \
src/transport/TFramedTransport.h \ src/transport/TBufferedRouterTransport.h \
src/transport/TNullTransport.h \
src/transport/TMemoryBuffer.h \
src/transport/TServerSocket.h \ src/transport/TServerSocket.h \
src/transport/TServerTransport.h \ src/transport/TServerTransport.h \
src/transport/TSocket.h \ src/transport/TSocket.h \
src/transport/TTransport.h \ src/transport/TTransport.h \
src/transport/TTransportException.h \ src/transport/TTransportException.h \
src/transport/TTransportFactory.h \ src/transport/TTransportUtils.h
src/transport/TBufferedTransportFactory.h \
src/transport/TBufferedFileWriter.h \
src/transport/TBufferedRouterTransport.h
include_serverdir = $(include_thriftdir)/server include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \ include_server_HEADERS = \

View File

@ -2,14 +2,14 @@
#define _THRIFT_TPROCESSOR_H_ 1 #define _THRIFT_TPROCESSOR_H_ 1
#include <string> #include <string>
#include <transport/TTransport.h> #include <protocol/TProtocol.h>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace facebook { namespace thrift {
using namespace boost; using namespace boost;
using namespace facebook::thrift::transport; using namespace facebook::thrift::protocol;
/** /**
* A processor is a generic object that acts upon two streams of data, one * A processor is a generic object that acts upon two streams of data, one
@ -22,8 +22,13 @@ using namespace facebook::thrift::transport;
class TProcessor { class TProcessor {
public: public:
virtual ~TProcessor() {} virtual ~TProcessor() {}
virtual bool process(shared_ptr<TTransport> in, shared_ptr<TTransport> out) = 0;
bool process(shared_ptr<TTransport> io) { return process(io, io); } virtual bool process(shared_ptr<TProtocol> in,
shared_ptr<TProtocol> out) = 0;
bool process(shared_ptr<TProtocol> io) {
return process(io, io);
}
protected: protected:
TProcessor() {} TProcessor() {}

View File

@ -4,121 +4,109 @@ using std::string;
namespace facebook { namespace thrift { namespace protocol { namespace facebook { namespace thrift { namespace protocol {
uint32_t TBinaryProtocol::writeMessageBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeMessageBegin(const std::string name,
const std::string name,
const TMessageType messageType, const TMessageType messageType,
const int32_t seqid) const { const int32_t seqid) {
return return
writeString(out, name) + writeString(name) +
writeByte(out, (int8_t)messageType) + writeByte((int8_t)messageType) +
writeI32(out, seqid); writeI32(seqid);
} }
uint32_t TBinaryProtocol::writeMessageEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeMessageEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeStructBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeStructBegin(const string& name) {
const string& name) const {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeStructEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeStructEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeFieldBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeFieldBegin(const string& name,
const string& name,
const TType fieldType, const TType fieldType,
const int16_t fieldId) const { const int16_t fieldId) {
return return
writeByte(out, (int8_t)fieldType) + writeByte((int8_t)fieldType) +
writeI16(out, fieldId); writeI16(fieldId);
} }
uint32_t TBinaryProtocol::writeFieldEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeFieldEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeFieldStop(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeFieldStop() {
return return
writeByte(out, (int8_t)T_STOP); writeByte((int8_t)T_STOP);
} }
uint32_t TBinaryProtocol::writeMapBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeMapBegin(const TType keyType,
const TType keyType,
const TType valType, const TType valType,
const uint32_t size) const { const uint32_t size) {
return return
writeByte(out, (int8_t)keyType) + writeByte((int8_t)keyType) +
writeByte(out, (int8_t)valType) + writeByte((int8_t)valType) +
writeI32(out, (int32_t)size); writeI32((int32_t)size);
} }
uint32_t TBinaryProtocol::writeMapEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeMapEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeListBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeListBegin(const TType elemType,
const TType elemType, const uint32_t size) {
const uint32_t size) const {
return return
writeByte(out, (int8_t) elemType) + writeByte((int8_t) elemType) +
writeI32(out, (int32_t)size); writeI32((int32_t)size);
} }
uint32_t TBinaryProtocol::writeListEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeListEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeSetBegin(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeSetBegin(const TType elemType,
const TType elemType, const uint32_t size) {
const uint32_t size) const {
return return
writeByte(out, (int8_t)elemType) + writeByte((int8_t)elemType) +
writeI32(out, (int32_t)size); writeI32((int32_t)size);
} }
uint32_t TBinaryProtocol::writeSetEnd(shared_ptr<TTransport> out) const { uint32_t TBinaryProtocol::writeSetEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::writeBool(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeBool(const bool value) {
const bool value) const {
uint8_t tmp = value ? 1 : 0; uint8_t tmp = value ? 1 : 0;
out->write(&tmp, 1); outputTransport_->write(&tmp, 1);
return 1; return 1;
} }
uint32_t TBinaryProtocol::writeByte(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
const int8_t byte) const { outputTransport_->write((uint8_t*)&byte, 1);
out->write((uint8_t*)&byte, 1);
return 1; return 1;
} }
uint32_t TBinaryProtocol::writeI16(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
const int16_t i16) const {
int16_t net = (int16_t)htons(i16); int16_t net = (int16_t)htons(i16);
out->write((uint8_t*)&net, 2); outputTransport_->write((uint8_t*)&net, 2);
return 2; return 2;
} }
uint32_t TBinaryProtocol::writeI32(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
const int32_t i32) const {
int32_t net = (int32_t)htonl(i32); int32_t net = (int32_t)htonl(i32);
out->write((uint8_t*)&net, 4); outputTransport_->write((uint8_t*)&net, 4);
return 4; return 4;
} }
uint32_t TBinaryProtocol::writeI64(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
const int64_t i64) const {
int64_t net = (int64_t)htonll(i64); int64_t net = (int64_t)htonll(i64);
out->write((uint8_t*)&net, 8); outputTransport_->write((uint8_t*)&net, 8);
return 8; return 8;
} }
uint32_t TBinaryProtocol::writeDouble(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeDouble(const double dub) {
const double dub) const {
uint8_t b[8]; uint8_t b[8];
uint8_t* d = (uint8_t*)&dub; uint8_t* d = (uint8_t*)&dub;
b[0] = d[7]; b[0] = d[7];
@ -129,15 +117,14 @@ uint32_t TBinaryProtocol::writeDouble(shared_ptr<TTransport> out,
b[5] = d[2]; b[5] = d[2];
b[6] = d[1]; b[6] = d[1];
b[7] = d[0]; b[7] = d[0];
out->write((uint8_t*)b, 8); outputTransport_->write((uint8_t*)b, 8);
return 8; return 8;
} }
uint32_t TBinaryProtocol::writeString(shared_ptr<TTransport> out, uint32_t TBinaryProtocol::writeString(const string& str) {
const string& str) const { uint32_t result = writeI32(str.size());
uint32_t result = writeI32(out, str.size()); outputTransport_->write((uint8_t*)str.data(), str.size());
out->write((uint8_t*)str.data(), str.size());
return result + str.size(); return result + str.size();
} }
@ -145,159 +132,147 @@ uint32_t TBinaryProtocol::writeString(shared_ptr<TTransport> out,
* Reading functions * Reading functions
*/ */
uint32_t TBinaryProtocol::readMessageBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readMessageBegin(std::string& name,
std::string& name,
TMessageType& messageType, TMessageType& messageType,
int32_t& seqid) const { int32_t& seqid) {
uint32_t result = 0; uint32_t result = 0;
int8_t type; int8_t type;
result+= readString(in, name); result+= readString(name);
result+= readByte(in, type); result+= readByte(type);
messageType = (TMessageType)type; messageType = (TMessageType)type;
result+= readI32(in, seqid); result+= readI32(seqid);
return result; return result;
} }
uint32_t TBinaryProtocol::readMessageEnd(shared_ptr<TTransport> in) const{ uint32_t TBinaryProtocol::readMessageEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readStructBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readStructBegin(string& name) {
string& name) const {
name = ""; name = "";
return 0; return 0;
} }
uint32_t TBinaryProtocol::readStructEnd(shared_ptr<TTransport> in) const { uint32_t TBinaryProtocol::readStructEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readFieldBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readFieldBegin(string& name,
string& name,
TType& fieldType, TType& fieldType,
int16_t& fieldId) const { int16_t& fieldId) {
uint32_t result = 0; uint32_t result = 0;
int8_t type; int8_t type;
result += readByte(in, type); result += readByte(type);
fieldType = (TType)type; fieldType = (TType)type;
if (fieldType == T_STOP) { if (fieldType == T_STOP) {
fieldId = 0; fieldId = 0;
return result; return result;
} }
result += readI16(in, fieldId); result += readI16(fieldId);
return result; return result;
} }
uint32_t TBinaryProtocol::readFieldEnd(shared_ptr<TTransport> in) const { uint32_t TBinaryProtocol::readFieldEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readMapBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readMapBegin(TType& keyType,
TType& keyType,
TType& valType, TType& valType,
uint32_t& size) const { uint32_t& size) {
int8_t k, v; int8_t k, v;
uint32_t result = 0; uint32_t result = 0;
int32_t sizei; int32_t sizei;
result += readByte(in, k); result += readByte(k);
keyType = (TType)k; keyType = (TType)k;
result += readByte(in, v); result += readByte(v);
valType = (TType)v; valType = (TType)v;
result += readI32(in, sizei); result += readI32(sizei);
// TODO(mcslee): check for negative size // TODO(mcslee): check for negative size
size = (uint32_t)sizei; size = (uint32_t)sizei;
return result; return result;
} }
uint32_t TBinaryProtocol::readMapEnd(shared_ptr<TTransport> in) const { uint32_t TBinaryProtocol::readMapEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readListBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readListBegin(TType& elemType,
TType& elemType, uint32_t& size) {
uint32_t& size) const {
int8_t e; int8_t e;
uint32_t result = 0; uint32_t result = 0;
int32_t sizei; int32_t sizei;
result += readByte(in, e); result += readByte(e);
elemType = (TType)e; elemType = (TType)e;
result += readI32(in, sizei); result += readI32(sizei);
// TODO(mcslee): check for negative size // TODO(mcslee): check for negative size
size = (uint32_t)sizei; size = (uint32_t)sizei;
return result; return result;
} }
uint32_t TBinaryProtocol::readListEnd(shared_ptr<TTransport> in) const { uint32_t TBinaryProtocol::readListEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readSetBegin(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readSetBegin(TType& elemType,
TType& elemType, uint32_t& size) {
uint32_t& size) const {
int8_t e; int8_t e;
uint32_t result = 0; uint32_t result = 0;
int32_t sizei; int32_t sizei;
result += readByte(in, e); result += readByte(e);
elemType = (TType)e; elemType = (TType)e;
result += readI32(in, sizei); result += readI32(sizei);
// TODO(mcslee): check for negative size // TODO(mcslee): check for negative size
size = (uint32_t)sizei; size = (uint32_t)sizei;
return result; return result;
} }
uint32_t TBinaryProtocol::readSetEnd(shared_ptr<TTransport> in) const { uint32_t TBinaryProtocol::readSetEnd() {
return 0; return 0;
} }
uint32_t TBinaryProtocol::readBool(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readBool(bool& value) {
bool& value) const {
uint8_t b[1]; uint8_t b[1];
in->readAll(b, 1); inputTransport_->readAll(b, 1);
value = *(int8_t*)b != 0; value = *(int8_t*)b != 0;
return 1; return 1;
} }
uint32_t TBinaryProtocol::readByte(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readByte(int8_t& byte) {
int8_t& byte) const {
uint8_t b[1]; uint8_t b[1];
in->readAll(b, 1); inputTransport_->readAll(b, 1);
byte = *(int8_t*)b; byte = *(int8_t*)b;
return 1; return 1;
} }
uint32_t TBinaryProtocol::readI16(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readI16(int16_t& i16) {
int16_t& i16) const {
uint8_t b[2]; uint8_t b[2];
in->readAll(b, 2); inputTransport_->readAll(b, 2);
i16 = *(int16_t*)b; i16 = *(int16_t*)b;
i16 = (int16_t)ntohs(i16); i16 = (int16_t)ntohs(i16);
return 2; return 2;
} }
uint32_t TBinaryProtocol::readI32(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readI32(int32_t& i32) {
int32_t& i32) const {
uint8_t b[4]; uint8_t b[4];
in->readAll(b, 4); inputTransport_->readAll(b, 4);
i32 = *(int32_t*)b; i32 = *(int32_t*)b;
i32 = (int32_t)ntohl(i32); i32 = (int32_t)ntohl(i32);
return 4; return 4;
} }
uint32_t TBinaryProtocol::readI64(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readI64(int64_t& i64) {
int64_t& i64) const {
uint8_t b[8]; uint8_t b[8];
in->readAll(b, 8); inputTransport_->readAll(b, 8);
i64 = *(int64_t*)b; i64 = *(int64_t*)b;
i64 = (int64_t)ntohll(i64); i64 = (int64_t)ntohll(i64);
return 8; return 8;
} }
uint32_t TBinaryProtocol::readDouble(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readDouble(double& dub) {
double& dub) const {
uint8_t b[8]; uint8_t b[8];
uint8_t d[8]; uint8_t d[8];
in->readAll(b, 8); inputTransport_->readAll(b, 8);
d[0] = b[7]; d[0] = b[7];
d[1] = b[6]; d[1] = b[6];
d[2] = b[5]; d[2] = b[5];
@ -310,17 +285,16 @@ uint32_t TBinaryProtocol::readDouble(shared_ptr<TTransport> in,
return 8; return 8;
} }
uint32_t TBinaryProtocol::readString(shared_ptr<TTransport> in, uint32_t TBinaryProtocol::readString(string& str) {
string& str) const {
uint32_t result; uint32_t result;
int32_t size; int32_t size;
result = readI32(in, size); result = readI32(size);
// TODO(mcslee): check for negative size // TODO(mcslee): check for negative size
// Use the heap here to prevent stack overflow for v. large strings // Use the heap here to prevent stack overflow for v. large strings
uint8_t *b = new uint8_t[size]; uint8_t *b = new uint8_t[size];
in->readAll(b, size); inputTransport_->readAll(b, size);
str = string((char*)b, size); str = string((char*)b, size);
delete [] b; delete [] b;

View File

@ -17,139 +17,130 @@ using namespace boost;
*/ */
class TBinaryProtocol : public TProtocol { class TBinaryProtocol : public TProtocol {
public: public:
TBinaryProtocol() {} TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
TProtocol(in, out) {}
~TBinaryProtocol() {} ~TBinaryProtocol() {}
/** /**
* Writing functions. * Writing functions.
*/ */
virtual uint32_t writeMessageBegin(shared_ptr<TTransport> out, virtual uint32_t writeMessageBegin(const std::string name,
const std::string name,
const TMessageType messageType, const TMessageType messageType,
const int32_t seqid) const; const int32_t seqid);
virtual uint32_t writeMessageEnd(shared_ptr<TTransport> out) const; virtual uint32_t writeMessageEnd();
uint32_t writeStructBegin(shared_ptr<TTransport> out, uint32_t writeStructBegin(const std::string& name);
const std::string& name) const;
uint32_t writeStructEnd(shared_ptr<TTransport> out) const; uint32_t writeStructEnd();
uint32_t writeFieldBegin(shared_ptr<TTransport> out, uint32_t writeFieldBegin(const std::string& name,
const std::string& name, const TType fieldType,
const TType fieldType, const int16_t fieldId);
const int16_t fieldId) const;
uint32_t writeFieldEnd(shared_ptr<TTransport> out) const; uint32_t writeFieldEnd();
uint32_t writeFieldStop(shared_ptr<TTransport> out) const; uint32_t writeFieldStop();
uint32_t writeMapBegin(shared_ptr<TTransport> out, uint32_t writeMapBegin(const TType keyType,
const TType keyType, const TType valType,
const TType valType, const uint32_t size);
const uint32_t size) const;
uint32_t writeMapEnd(shared_ptr<TTransport> out) const; uint32_t writeMapEnd();
uint32_t writeListBegin(shared_ptr<TTransport> out, uint32_t writeListBegin(const TType elemType,
const TType elemType, const uint32_t size);
const uint32_t size) const;
uint32_t writeListEnd(shared_ptr<TTransport> out) const; uint32_t writeListEnd();
uint32_t writeSetBegin(shared_ptr<TTransport> out, uint32_t writeSetBegin(const TType elemType,
const TType elemType, const uint32_t size);
const uint32_t size) const;
uint32_t writeSetEnd(shared_ptr<TTransport> out) const; uint32_t writeSetEnd();
uint32_t writeBool(shared_ptr<TTransport> out, uint32_t writeBool(const bool value);
const bool value) const;
uint32_t writeByte(shared_ptr<TTransport> out, uint32_t writeByte(const int8_t byte);
const int8_t byte) const;
uint32_t writeI16(shared_ptr<TTransport> out, uint32_t writeI16(const int16_t i16);
const int16_t i16) const;
uint32_t writeI32(shared_ptr<TTransport> out, uint32_t writeI32(const int32_t i32);
const int32_t i32) const;
uint32_t writeI64(shared_ptr<TTransport> out, uint32_t writeI64(const int64_t i64);
const int64_t i64) const;
uint32_t writeDouble(shared_ptr<TTransport> out, uint32_t writeDouble(const double dub);
const double dub) const;
uint32_t writeString(shared_ptr<TTransport> out, uint32_t writeString(const std::string& str);
const std::string& str) const;
/** /**
* Reading functions * Reading functions
*/ */
uint32_t readMessageBegin(shared_ptr<TTransport> in, uint32_t readMessageBegin(std::string& name,
std::string& name,
TMessageType& messageType, TMessageType& messageType,
int32_t& seqid) const; int32_t& seqid);
uint32_t readMessageEnd(shared_ptr<TTransport> in) const; uint32_t readMessageEnd();
uint32_t readStructBegin(shared_ptr<TTransport> in, uint32_t readStructBegin(std::string& name);
std::string& name) const;
uint32_t readStructEnd(shared_ptr<TTransport> in) const; uint32_t readStructEnd();
uint32_t readFieldBegin(shared_ptr<TTransport> in, uint32_t readFieldBegin(std::string& name,
std::string& name, TType& fieldType,
TType& fieldType, int16_t& fieldId);
int16_t& fieldId) const;
uint32_t readFieldEnd(shared_ptr<TTransport> in) const; uint32_t readFieldEnd();
uint32_t readMapBegin(shared_ptr<TTransport> in, uint32_t readMapBegin(TType& keyType,
TType& keyType, TType& valType,
TType& valType, uint32_t& size);
uint32_t& size) const;
uint32_t readMapEnd(shared_ptr<TTransport> in) const; uint32_t readMapEnd();
uint32_t readListBegin(shared_ptr<TTransport> in, uint32_t readListBegin(TType& elemType,
TType& elemType, uint32_t& size);
uint32_t& size) const;
uint32_t readListEnd(shared_ptr<TTransport> in) const; uint32_t readListEnd();
uint32_t readSetBegin(shared_ptr<TTransport> in, uint32_t readSetBegin(TType& elemType,
TType& elemType, uint32_t& size);
uint32_t& size) const;
uint32_t readSetEnd(shared_ptr<TTransport> in) const; uint32_t readSetEnd();
uint32_t readBool(shared_ptr<TTransport> in, uint32_t readBool(bool& value);
bool& value) const;
uint32_t readByte(shared_ptr<TTransport> in, uint32_t readByte(int8_t& byte);
int8_t& byte) const;
uint32_t readI16(shared_ptr<TTransport> in, uint32_t readI16(int16_t& i16);
int16_t& i16) const;
uint32_t readI32(shared_ptr<TTransport> in, uint32_t readI32(int32_t& i32);
int32_t& i32) const;
uint32_t readI64(shared_ptr<TTransport> in, uint32_t readI64(int64_t& i64);
int64_t& i64) const;
uint32_t readDouble(shared_ptr<TTransport> in, uint32_t readDouble(double& dub);
double& dub) const;
uint32_t readString(shared_ptr<TTransport> in, uint32_t readString(std::string& str);
std::string& str) const; };
/**
* Constructs binary protocol handlers
*/
class TBinaryProtocolFactory : public TProtocolFactory {
public:
TBinaryProtocolFactory() {}
virtual ~TBinaryProtocolFactory() {}
std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
return std::make_pair(prot, prot);
}
}; };
}}} // facebook::thrift::protocol }}} // facebook::thrift::protocol

View File

@ -82,170 +82,144 @@ class TProtocol {
* Writing functions. * Writing functions.
*/ */
virtual uint32_t writeMessageBegin(shared_ptr<TTransport> out, virtual uint32_t writeMessageBegin(const std::string name,
const std::string name,
const TMessageType messageType, const TMessageType messageType,
const int32_t seqid) const = 0; const int32_t seqid) = 0;
virtual uint32_t writeMessageEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeMessageEnd() = 0;
virtual uint32_t writeStructBegin(shared_ptr<TTransport> out, virtual uint32_t writeStructBegin(const std::string& name) = 0;
const std::string& name) const = 0;
virtual uint32_t writeStructEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeStructEnd() = 0;
virtual uint32_t writeFieldBegin(shared_ptr<TTransport> out, virtual uint32_t writeFieldBegin(const std::string& name,
const std::string& name,
const TType fieldType, const TType fieldType,
const int16_t fieldId) const = 0; const int16_t fieldId) = 0;
virtual uint32_t writeFieldEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeFieldEnd() = 0;
virtual uint32_t writeFieldStop(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeFieldStop() = 0;
virtual uint32_t writeMapBegin(shared_ptr<TTransport> out, virtual uint32_t writeMapBegin(const TType keyType,
const TType keyType,
const TType valType, const TType valType,
const uint32_t size) const = 0; const uint32_t size) = 0;
virtual uint32_t writeMapEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeMapEnd() = 0;
virtual uint32_t writeListBegin(shared_ptr<TTransport> out, virtual uint32_t writeListBegin(const TType elemType,
const TType elemType, const uint32_t size) = 0;
const uint32_t size) const = 0;
virtual uint32_t writeListEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeListEnd() = 0;
virtual uint32_t writeSetBegin(shared_ptr<TTransport> out, virtual uint32_t writeSetBegin(const TType elemType,
const TType elemType, const uint32_t size) = 0;
const uint32_t size) const = 0;
virtual uint32_t writeSetEnd(shared_ptr<TTransport> out) const = 0; virtual uint32_t writeSetEnd() = 0;
virtual uint32_t writeBool(shared_ptr<TTransport> out, virtual uint32_t writeBool(const bool value) = 0;
const bool value) const = 0;
virtual uint32_t writeByte(shared_ptr<TTransport> out, virtual uint32_t writeByte(const int8_t byte) = 0;
const int8_t byte) const = 0;
virtual uint32_t writeI16(shared_ptr<TTransport> out, virtual uint32_t writeI16(const int16_t i16) = 0;
const int16_t i16) const = 0;
virtual uint32_t writeI32(shared_ptr<TTransport> out, virtual uint32_t writeI32(const int32_t i32) = 0;
const int32_t i32) const = 0;
virtual uint32_t writeI64(shared_ptr<TTransport> out, virtual uint32_t writeI64(const int64_t i64) = 0;
const int64_t i64) const = 0;
virtual uint32_t writeDouble(shared_ptr<TTransport> out, virtual uint32_t writeDouble(const double dub) = 0;
const double dub) const = 0;
virtual uint32_t writeString(shared_ptr<TTransport> out, virtual uint32_t writeString(const std::string& str) = 0;
const std::string& str) const = 0;
/** /**
* Reading functions * Reading functions
*/ */
virtual uint32_t readMessageBegin(shared_ptr<TTransport> in, virtual uint32_t readMessageBegin(std::string& name,
std::string& name,
TMessageType& messageType, TMessageType& messageType,
int32_t& seqid) const = 0; int32_t& seqid) = 0;
virtual uint32_t readMessageEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readMessageEnd() = 0;
virtual uint32_t readStructBegin(shared_ptr<TTransport> in, virtual uint32_t readStructBegin(std::string& name) = 0;
std::string& name) const = 0;
virtual uint32_t readStructEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readStructEnd() = 0;
virtual uint32_t readFieldBegin(shared_ptr<TTransport> in, virtual uint32_t readFieldBegin(std::string& name,
std::string& name,
TType& fieldType, TType& fieldType,
int16_t& fieldId) const = 0; int16_t& fieldId) = 0;
virtual uint32_t readFieldEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readFieldEnd() = 0;
virtual uint32_t readMapBegin(shared_ptr<TTransport> in, virtual uint32_t readMapBegin(TType& keyType,
TType& keyType,
TType& valType, TType& valType,
uint32_t& size) const = 0; uint32_t& size) = 0;
virtual uint32_t readMapEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readMapEnd() = 0;
virtual uint32_t readListBegin(shared_ptr<TTransport> in, virtual uint32_t readListBegin(TType& elemType,
TType& elemType, uint32_t& size) = 0;
uint32_t& size) const = 0;
virtual uint32_t readListEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readListEnd() = 0;
virtual uint32_t readSetBegin(shared_ptr<TTransport> in, virtual uint32_t readSetBegin(TType& elemType,
TType& elemType, uint32_t& size) = 0;
uint32_t& size) const = 0;
virtual uint32_t readSetEnd(shared_ptr<TTransport> in) const = 0; virtual uint32_t readSetEnd() = 0;
virtual uint32_t readBool(shared_ptr<TTransport> in, virtual uint32_t readBool(bool& value) = 0;
bool& value) const = 0;
virtual uint32_t readByte(shared_ptr<TTransport> in, virtual uint32_t readByte(int8_t& byte) = 0;
int8_t& byte) const = 0;
virtual uint32_t readI16(shared_ptr<TTransport> in, virtual uint32_t readI16(int16_t& i16) = 0;
int16_t& i16) const = 0;
virtual uint32_t readI32(shared_ptr<TTransport> in, virtual uint32_t readI32(int32_t& i32) = 0;
int32_t& i32) const = 0;
virtual uint32_t readI64(shared_ptr<TTransport> in, virtual uint32_t readI64(int64_t& i64) = 0;
int64_t& i64) const = 0;
virtual uint32_t readDouble(shared_ptr<TTransport> in, virtual uint32_t readDouble(double& dub) = 0;
double& dub) const = 0;
virtual uint32_t readString(shared_ptr<TTransport> in, virtual uint32_t readString(std::string& str) = 0;
std::string& str) const = 0;
/** /**
* Method to arbitrarily skip over data. * Method to arbitrarily skip over data.
*/ */
uint32_t skip(shared_ptr<TTransport> in, TType type) const { uint32_t skip(TType type) {
switch (type) { switch (type) {
case T_BOOL: case T_BOOL:
{ {
bool boolv; bool boolv;
return readBool(in, boolv); return readBool(boolv);
} }
case T_BYTE: case T_BYTE:
{ {
int8_t bytev; int8_t bytev;
return readByte(in, bytev); return readByte(bytev);
} }
case T_I16: case T_I16:
{ {
int16_t i16; int16_t i16;
return readI16(in, i16); return readI16(i16);
} }
case T_I32: case T_I32:
{ {
int32_t i32; int32_t i32;
return readI32(in, i32); return readI32(i32);
} }
case T_I64: case T_I64:
{ {
int64_t i64; int64_t i64;
return readI64(in, i64); return readI64(i64);
} }
case T_DOUBLE: case T_DOUBLE:
{ {
double dub; double dub;
return readDouble(in, dub); return readDouble(dub);
} }
case T_STRING: case T_STRING:
{ {
std::string str; std::string str;
return readString(in, str); return readString(str);
} }
case T_STRUCT: case T_STRUCT:
{ {
@ -253,16 +227,16 @@ class TProtocol {
std::string name; std::string name;
int16_t fid; int16_t fid;
TType ftype; TType ftype;
result += readStructBegin(in, name); result += readStructBegin(name);
while (true) { while (true) {
result += readFieldBegin(in, name, ftype, fid); result += readFieldBegin(name, ftype, fid);
if (ftype == T_STOP) { if (ftype == T_STOP) {
break; break;
} }
result += skip(in, ftype); result += skip(ftype);
result += readFieldEnd(in); result += readFieldEnd();
} }
result += readStructEnd(in); result += readStructEnd();
return result; return result;
} }
case T_MAP: case T_MAP:
@ -271,12 +245,12 @@ class TProtocol {
TType keyType; TType keyType;
TType valType; TType valType;
uint32_t i, size; uint32_t i, size;
result += readMapBegin(in, keyType, valType, size); result += readMapBegin(keyType, valType, size);
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
result += skip(in, keyType); result += skip(keyType);
result += skip(in, valType); result += skip(valType);
} }
result += readMapEnd(in); result += readMapEnd();
return result; return result;
} }
case T_SET: case T_SET:
@ -284,11 +258,11 @@ class TProtocol {
uint32_t result = 0; uint32_t result = 0;
TType elemType; TType elemType;
uint32_t i, size; uint32_t i, size;
result += readSetBegin(in, elemType, size); result += readSetBegin(elemType, size);
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
result += skip(in, elemType); result += skip(elemType);
} }
result += readSetEnd(in); result += readSetEnd();
return result; return result;
} }
case T_LIST: case T_LIST:
@ -296,11 +270,11 @@ class TProtocol {
uint32_t result = 0; uint32_t result = 0;
TType elemType; TType elemType;
uint32_t i, size; uint32_t i, size;
result += readListBegin(in, elemType, size); result += readListBegin(elemType, size);
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
result += skip(in, elemType); result += skip(elemType);
} }
result += readListEnd(in); result += readListEnd();
return result; return result;
} }
default: default:
@ -308,10 +282,39 @@ class TProtocol {
} }
} }
shared_ptr<TTransport> getInputTransport() {
return inputTransport_;
}
shared_ptr<TTransport> getOutputTransport() {
return outputTransport_;
}
protected: protected:
TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
inputTransport_(in),
outputTransport_(out) {}
shared_ptr<TTransport> inputTransport_;
shared_ptr<TTransport> outputTransport_;
private:
TProtocol() {} TProtocol() {}
}; };
/**
* Constructs input and output protocol objects given transports.
*/
class TProtocolFactory {
public:
TProtocolFactory() {}
virtual ~TProtocolFactory() {}
virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
};
}}} // facebook::thrift::protocol }}} // facebook::thrift::protocol
#endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1 #endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1

View File

@ -153,7 +153,7 @@ void TConnection::transition() {
try { try {
// Invoke the processor // Invoke the processor
server_->getProcessor()->process(inputTransport_, outputTransport_); server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &x) { } catch (TTransportException &x) {
fprintf(stderr, "Server::process %s\n", x.getMessage().c_str()); fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
close(); close();

View File

@ -1,9 +1,9 @@
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
#include "Thrift.h" #include <Thrift.h>
#include "server/TServer.h" #include <server/TServer.h>
#include "transport/TMemoryBuffer.h" #include <transport/TTransportUtils.h>
#include <stack> #include <stack>
#include <event.h> #include <event.h>
@ -50,10 +50,8 @@ class TNonblockingServer : public TServer {
void handleEvent(int fd, short which); void handleEvent(int fd, short which);
public: public:
TNonblockingServer(shared_ptr<TProcessor> processor, TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
shared_ptr<TServerOptions> options, TServer(processor),
int port) :
TServer(processor, options),
serverSocket_(0), serverSocket_(0),
port_(port), port_(port),
frameResponses_(true) {} frameResponses_(true) {}
@ -157,6 +155,12 @@ class TConnection {
// Transport that processor writes to // Transport that processor writes to
shared_ptr<TMemoryBuffer> outputTransport_; shared_ptr<TMemoryBuffer> outputTransport_;
// Protocol encoder
shared_ptr<TProtocol> outputProtocol_;
// Protocol decoder
shared_ptr<TProtocol> inputProtocol_;
// Go into read mode // Go into read mode
void setRead() { void setRead() {
setFlags(EV_READ | EV_PERSIST); setFlags(EV_READ | EV_PERSIST);
@ -190,6 +194,12 @@ class TConnection {
inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_)); inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer()); outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
// Create protocol
std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
inputProtocol_ = iop.first;
outputProtocol_ = iop.second;
init(socket, eventFlags, s); init(socket, eventFlags, s);
} }

View File

@ -3,7 +3,7 @@
#include <TProcessor.h> #include <TProcessor.h>
#include <transport/TServerTransport.h> #include <transport/TServerTransport.h>
#include <transport/TTransportFactory.h> #include <protocol/TBinaryProtocol.h>
#include <concurrency/Thread.h> #include <concurrency/Thread.h>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
@ -14,8 +14,6 @@ using namespace facebook::thrift;
using namespace facebook::thrift::transport; using namespace facebook::thrift::transport;
using namespace boost; using namespace boost;
class TServerOptions;
/** /**
* Thrift server. * Thrift server.
* *
@ -24,45 +22,61 @@ class TServerOptions;
class TServer : public concurrency::Runnable { class TServer : public concurrency::Runnable {
public: public:
virtual ~TServer() {} virtual ~TServer() {}
virtual void serve() = 0; virtual void serve() = 0;
// Allows running the server as a Runnable thread // Allows running the server as a Runnable thread
virtual void run() { serve(); } virtual void run() {
serve();
}
shared_ptr<TProcessor> getProcessor() { shared_ptr<TProcessor> getProcessor() {
return processor_; return processor_;
} }
shared_ptr<TProtocolFactory> getProtocolFactory() {
return protocolFactory_;
}
protected: protected:
TServer(shared_ptr<TProcessor> processor, TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport, shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory, shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TServerOptions> options) : shared_ptr<TProtocolFactory> protocolFactory) :
processor_(processor), processor_(processor),
serverTransport_(serverTransport), serverTransport_(serverTransport),
transportFactory_(transportFactory), transportFactory_(transportFactory),
options_(options) {} protocolFactory_(protocolFactory) {}
TServer(shared_ptr<TProcessor> processor, TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerOptions> options) : shared_ptr<TServerTransport> serverTransport,
processor_(processor), options_(options) {} shared_ptr<TTransportFactory> transportFactory) :
processor_(processor),
serverTransport_(serverTransport),
transportFactory_(transportFactory) {
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
}
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport) :
processor_(processor),
serverTransport_(serverTransport) {
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
}
TServer(shared_ptr<TProcessor> processor) :
processor_(processor) {
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
}
shared_ptr<TProcessor> processor_; shared_ptr<TProcessor> processor_;
shared_ptr<TServerTransport> serverTransport_; shared_ptr<TServerTransport> serverTransport_;
shared_ptr<TTransportFactory> transportFactory_; shared_ptr<TTransportFactory> transportFactory_;
shared_ptr<TServerOptions> options_; shared_ptr<TProtocolFactory> protocolFactory_;
}; };
/**
* Class to encapsulate all generic server options.
*/
class TServerOptions {
public:
// TODO(mcslee): Fill in getters/setters here
protected:
// TODO(mcslee): Fill data members in here
};
}}} // facebook::thrift::server }}} // facebook::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSERVER_H_ #endif // #ifndef _THRIFT_SERVER_TSERVER_H_

View File

@ -14,7 +14,8 @@ namespace facebook { namespace thrift { namespace server {
void TSimpleServer::serve() { void TSimpleServer::serve() {
shared_ptr<TTransport> client; shared_ptr<TTransport> client;
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io; pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try { try {
// Start the server listening // Start the server listening
@ -28,14 +29,15 @@ void TSimpleServer::serve() {
try { try {
while (true) { while (true) {
client = serverTransport_->accept(); client = serverTransport_->accept();
io = transportFactory_->getIOTransports(client); iot = transportFactory_->getIOTransports(client);
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
try { try {
while (processor_->process(io.first, io.second)) {} while (processor_->process(iop.first, iop.second)) {}
} catch (TTransportException& ttx) { } catch (TTransportException& ttx) {
cerr << "TSimpleServer client died: " << ttx.getMessage() << endl; cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
} }
io.first->close(); iot.first->close();
io.second->close(); iot.second->close();
client->close(); client->close();
} }
} catch (TTransportException& ttx) { } catch (TTransportException& ttx) {

View File

@ -19,8 +19,8 @@ class TSimpleServer : public TServer {
TSimpleServer(shared_ptr<TProcessor> processor, TSimpleServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport, shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory, shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TServerOptions> options) : shared_ptr<TProtocolFactory> protocolFactory) :
TServer(processor, serverTransport, transportFactory, options) {} TServer(processor, serverTransport, transportFactory, protocolFactory) {}
~TSimpleServer() {} ~TSimpleServer() {}

View File

@ -16,8 +16,8 @@ class TThreadPoolServer::Task: public Runnable {
public: public:
Task(shared_ptr<TProcessor> processor, Task(shared_ptr<TProcessor> processor,
shared_ptr<TTransport> input, shared_ptr<TProtocol> input,
shared_ptr<TTransport> output) : shared_ptr<TProtocol> output) :
processor_(processor), processor_(processor),
input_(input), input_(input),
output_(output) { output_(output) {
@ -35,23 +35,24 @@ public:
break; break;
} }
} }
input_->close(); input_->getInputTransport()->close();
output_->close(); output_->getOutputTransport()->close();
} }
private: private:
shared_ptr<TProcessor> processor_; shared_ptr<TProcessor> processor_;
shared_ptr<TTransport> input_; shared_ptr<TProtocol> input_;
shared_ptr<TTransport> output_; shared_ptr<TProtocol> output_;
}; };
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor, TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport, shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory, shared_ptr<TTransportFactory> transportFactory,
shared_ptr<ThreadManager> threadManager, shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TServerOptions> options) :
TServer(processor, serverTransport, transportFactory, options), shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager) { threadManager_(threadManager) {
} }
@ -60,7 +61,8 @@ TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() { void TThreadPoolServer::serve() {
shared_ptr<TTransport> client; shared_ptr<TTransport> client;
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io; pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try { try {
// Start the server listening // Start the server listening
@ -75,9 +77,11 @@ void TThreadPoolServer::serve() {
// Fetch client from server // Fetch client from server
client = serverTransport_->accept(); client = serverTransport_->accept();
// Make IO transports // Make IO transports
io = transportFactory_->getIOTransports(client); iot = transportFactory_->getIOTransports(client);
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
// Add to threadmanager pool // Add to threadmanager pool
threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second))); threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
} catch (TTransportException& ttx) { } catch (TTransportException& ttx) {
break; break;
} }

View File

@ -21,8 +21,8 @@ public:
TThreadPoolServer(shared_ptr<TProcessor> processor, TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport, shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory, shared_ptr<TTransportFactory> transportFactory,
shared_ptr<ThreadManager> threadManager, shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TServerOptions> options); shared_ptr<ThreadManager> threadManager);
virtual ~TThreadPoolServer(); virtual ~TThreadPoolServer();

View File

@ -87,6 +87,30 @@ class TBufferedRouterTransport : public TTransport {
uint32_t wLen_; uint32_t wLen_;
}; };
/**
* Wraps a transport into a bufferedRouter instance.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
class TBufferedRouterTransportFactory : public TTransportFactory {
public:
TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
virtual ~TBufferedRouterTransportFactory() {}
/**
* Wraps the transport into a buffered one.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
return std::make_pair(buffered, buffered);
}
private:
boost::shared_ptr<TTransport> rTrans_;
};
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ #endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_

View File

@ -1,35 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_
#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1
#include <transport/TTransportFactory.h>
#include <transport/TBufferedRouterTransport.h>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
/**
* Wraps a transport into a bufferedRouter instance.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
class TBufferedRouterTransportFactory : public TTransportFactory {
public:
TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
virtual ~TBufferedRouterTransportFactory() {}
/**
* Wraps the transport into a buffered one.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
return std::make_pair(buffered, buffered);
}
private:
boost::shared_ptr<TTransport> rTrans_;
};
}}}
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_

View File

@ -1,65 +0,0 @@
#include "TBufferedTransport.h"
using std::string;
namespace facebook { namespace thrift { namespace transport {
uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
uint32_t need = len;
// We don't have enough data yet
if (rLen_-rPos_ < need) {
// Copy out whatever we have
if (rLen_-rPos_ > 0) {
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
need -= rLen_-rPos_;
buf += rLen_-rPos_;
}
// Get more from underlying transport up to buffer size
// TODO: should this be a readAll?
rLen_ = transport_->read(rBuf_, rBufSize_);
rPos_ = 0;
}
// Hand over whatever we have
uint32_t give = need;
if (rLen_-rPos_ < give) {
give = rLen_-rPos_;
}
memcpy(buf, rBuf_+rPos_, give);
rPos_ += give;
need -= give;
return (len - need);
}
void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
if (len == 0) {
return;
}
if (len + wLen_ >= wBufSize_) {
uint32_t copy = wBufSize_ - wLen_;
memcpy(wBuf_ + wLen_, buf, copy);
transport_->write(wBuf_, wBufSize_);
wLen_ = len - copy;
if (wLen_ > 0) {
memcpy(wBuf_, buf+copy, wLen_);
}
} else {
memcpy(wBuf_+wLen_, buf, len);
wLen_ += len;
}
}
void TBufferedTransport::flush() {
// Write out any data waiting in the write buffer
if (wLen_ > 0) {
transport_->write(wBuf_, wLen_);
wLen_ = 0;
}
// Flush the underlying transport
transport_->flush();
}
}}} // facebook::thrift::transport

View File

@ -1,83 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1
#include "TTransport.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
using namespace boost;
/**
* Buffered transport. For reads it will read more data than is requested
* and will serve future data out of a local buffer. For writes, data is
* stored to an in memory buffer before being written out.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TBufferedTransport : public TTransport {
public:
TBufferedTransport(shared_ptr<TTransport> transport) :
transport_(transport),
rBufSize_(512), rPos_(0), rLen_(0),
wBufSize_(512), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
TBufferedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rBufSize_(sz), rPos_(0), rLen_(0),
wBufSize_(sz), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
TBufferedTransport(shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
transport_(transport),
rBufSize_(rsz), rPos_(0), rLen_(0),
wBufSize_(wsz), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
~TBufferedTransport() {
delete [] rBuf_;
delete [] wBuf_;
}
bool isOpen() {
return transport_->isOpen();
}
void open() {
transport_->open();
}
void close() {
transport_->close();
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
void flush();
protected:
shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rBufSize_;
uint32_t rPos_;
uint32_t rLen_;
uint8_t* wBuf_;
uint32_t wBufSize_;
uint32_t wLen_;
};
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_

View File

@ -1,33 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_
#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1
#include <transport/TTransportFactory.h>
#include <transport/TBufferedTransport.h>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
/**
* Wraps a transport into a buffered one.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TBufferedTransportFactory : public TTransportFactory {
public:
TBufferedTransportFactory() {}
virtual ~TBufferedTransportFactory() {}
/**
* Wraps the transport into a buffered one.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
return std::make_pair(buffered, buffered);
}
};
}}}
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_

View File

@ -1,121 +0,0 @@
#include <transport/TFramedTransport.h>
#include <netinet/in.h>
using std::string;
namespace facebook { namespace thrift { namespace transport {
uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
if (!read_) {
return transport_->read(buf, len);
}
uint32_t need = len;
// We don't have enough data yet
if (rLen_-rPos_ < need) {
// Copy out whatever we have
if (rLen_-rPos_ > 0) {
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
need -= rLen_-rPos_;
buf += rLen_-rPos_;
}
// Read another chunk
readFrame();
}
// Hand over whatever we have
uint32_t give = need;
if (rLen_-rPos_ < give) {
give = rLen_-rPos_;
}
memcpy(buf, rBuf_+rPos_, give);
rPos_ += give;
need -= give;
return (len - need);
}
void TFramedTransport::readFrame() {
// Get rid of the old frame
if (rBuf_ != NULL) {
delete [] rBuf_;
rBuf_ = NULL;
}
// Read in the next chunk size
int32_t sz;
transport_->readAll((uint8_t*)&sz, 4);
sz = (int32_t)ntohl(sz);
if (sz < 0) {
throw new TTransportException("Frame size has negative value");
}
// Read the frame payload, reset markers
rBuf_ = new uint8_t[sz];
transport_->readAll(rBuf_, sz);
rPos_ = 0;
rLen_ = sz;
}
void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
if (len == 0) {
return;
}
// Shortcut out if not write mode
if (!write_) {
transport_->write(buf, len);
return;
}
// Need to grow the buffer
if (len + wLen_ >= wBufSize_) {
// Double buffer size until sufficient
while (wBufSize_ < len + wLen_) {
wBufSize_ *= 2;
}
// Allocate new buffer
uint8_t* wBuf2 = new uint8_t[wBufSize_];
// Copy the old buffer to the new one
memcpy(wBuf2, wBuf_, wLen_);
// Now point buf to the new one
delete [] wBuf_;
wBuf_ = wBuf2;
}
// Copy data into buffer
memcpy(wBuf_ + wLen_, buf, len);
wLen_ += len;
}
void TFramedTransport::flush() {
if (!write_) {
transport_->flush();
return;
}
// Write frame size
int32_t sz = wLen_;
sz = (int32_t)htonl(sz);
transport_->write((const uint8_t*)&sz, 4);
// Write frame body
if (sz > 0) {
transport_->write(wBuf_, wLen_);
}
// All done
wLen_ = 0;
// Flush the underlying
transport_->flush();
}
}}} // facebook::thrift::transport

View File

@ -1,101 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_
#define _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_ 1
#include "TTransport.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
using namespace boost;
/**
* Framed transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
* binary chunk followed by the data payload. This allows the receiver on the
* other end to always do fixed-length reads.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TFramedTransport : public TTransport {
public:
TFramedTransport(shared_ptr<TTransport> transport) :
transport_(transport),
rPos_(0),
rLen_(0),
read_(true),
wBufSize_(512),
wLen_(0),
write_(true) {
rBuf_ = NULL;
wBuf_ = new uint8_t[wBufSize_];
}
TFramedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rPos_(0),
rLen_(0),
read_(true),
wBufSize_(sz),
wLen_(0),
write_(true) {
rBuf_ = NULL;
wBuf_ = new uint8_t[wBufSize_];
}
~TFramedTransport() {
if (rBuf_ != NULL) {
delete [] rBuf_;
}
if (wBuf_ != NULL) {
delete [] wBuf_;
}
}
void setRead(bool read) {
read_ = read;
}
void setWrite(bool write) {
write_ = write;
}
void open() {
transport_->open();
}
bool isOpen() {
return transport_->isOpen();
}
void close() {
transport_->close();
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
void flush();
protected:
shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rPos_;
uint32_t rLen_;
bool read_;
uint8_t* wBuf_;
uint32_t wBufSize_;
uint32_t wLen_;
bool write_;
/**
* Reads a frame of input from the underlying stream.
*/
void readFrame();
};
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_

View File

@ -1,45 +0,0 @@
#include "TMemoryBuffer.h"
namespace facebook { namespace thrift { namespace transport {
uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
// Check avaible data for reading
uint32_t avail = wPos_ - rPos_;
// Device how much to give
uint32_t give = len;
if (avail < len) {
give = avail;
}
// Copy into buffer and increment rPos_
memcpy(buf, buffer_ + rPos_, give);
rPos_ += give;
return give;
}
void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
// Check available space
uint32_t avail = bufferSize_ - wPos_;
// Grow the buffer
if (len > avail) {
if (!owner_) {
throw TTransportException("Insufficient space in external MemoryBuffer");
}
while (len > avail) {
bufferSize_ *= 2;
buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory.");
}
}
}
// Copy into the buffer and increment wPos_
memcpy(buffer_ + wPos_, buf, len);
wPos_ += len;
}
}}} // facebook::thrift::transport

View File

@ -1,116 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TMEMORYBUFFER_H_
#define _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ 1
#include "TTransport.h"
#include <string>
namespace facebook { namespace thrift { namespace transport {
/**
* A memory buffer is a tranpsort that simply reads from and writes to an
* in memory buffer. Anytime you call write on it, the data is simply placed
* into a buffer, and anytime you call read, data is read from that buffer.
*
* The buffers are allocated using C constructs malloc,realloc, and the size
* doubles as necessary.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TMemoryBuffer : public TTransport {
public:
TMemoryBuffer() {
owner_ = true;
bufferSize_ = 1024;
buffer_ = (uint8_t*)malloc(bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory");
}
wPos_ = 0;
rPos_ = 0;
}
TMemoryBuffer(uint32_t sz) {
owner_ = true;
bufferSize_ = sz;
buffer_ = (uint8_t*)malloc(bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory");
}
wPos_ = 0;
rPos_ = 0;
}
TMemoryBuffer(uint8_t* buf, int sz) {
owner_ = false;
buffer_ = buf;
bufferSize_ = sz;
wPos_ = sz;
rPos_ = 0;
}
~TMemoryBuffer() {
if (owner_) {
if (buffer_ != NULL) {
free(buffer_);
buffer_ = NULL;
}
}
}
bool isOpen() {
return true;
}
void open() {}
void close() {}
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
*bufPtr = buffer_;
*sz = wPos_;
}
void resetBuffer() {
wPos_ = 0;
rPos_ = 0;
}
void resetBuffer(uint8_t* buf, uint32_t sz) {
if (owner_) {
if (buffer_ != NULL) {
free(buffer_);
}
}
owner_ = false;
buffer_ = buf;
bufferSize_ = sz;
wPos_ = sz;
rPos_ = 0;
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
private:
// Data buffer
uint8_t* buffer_;
// Allocated buffer size
uint32_t bufferSize_;
// Where the write is at
uint32_t wPos_;
// Where the reader is at
uint32_t rPos_;
// Is this object the owner of the buffer?
bool owner_;
};
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_

View File

@ -1,28 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
#define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1
#include "TTransport.h"
namespace facebook { namespace thrift { namespace transport {
/**
* The null transport is a dummy transport that doesn't actually do anything.
* It's sort of an analogy to /dev/null, you can never read anything from it
* and it will let you write anything you want to it, though it won't actually
* go anywhere.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TNullTransport : public TTransport {
public:
TNullTransport() {}
~TNullTransport() {}
bool isOpen() { return true; }
void open() { }
void write(const std::string& s) {}
};
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_

View File

@ -1,7 +1,8 @@
#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
#include "Thrift.h" #include <Thrift.h>
#include <boost/shared_ptr.hpp>
#include <transport/TTransportException.h> #include <transport/TTransportException.h>
#include <string> #include <string>
@ -124,6 +125,28 @@ class TTransport {
TTransport() {} TTransport() {}
}; };
/**
* Generic factory class to make an input and output transport out of a
* source transport. Commonly used inside servers to make input and output
* streams out of raw clients.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TTransportFactory {
public:
TTransportFactory() {}
virtual ~TTransportFactory() {}
/**
* Default implementation does nothing, just returns the transport given.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
return std::make_pair(trans, trans);
}
};
}}} // facebook::thrift::transport }}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_

View File

@ -1,33 +0,0 @@
#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1
#include <transport/TTransport.h>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
/**
* Generic factory class to make an input and output transport out of a
* source transport. Commonly used inside servers to make input and output
* streams out of raw clients.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TTransportFactory {
public:
TTransportFactory() {}
virtual ~TTransportFactory() {}
/**
* Default implementation does nothing, just returns the transport given.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
return std::make_pair(trans, trans);
}
};
}}}
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_

View File

@ -0,0 +1,219 @@
#include <transport/TTransportUtils.h>
using std::string;
namespace facebook { namespace thrift { namespace transport {
uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
uint32_t need = len;
// We don't have enough data yet
if (rLen_-rPos_ < need) {
// Copy out whatever we have
if (rLen_-rPos_ > 0) {
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
need -= rLen_-rPos_;
buf += rLen_-rPos_;
}
// Get more from underlying transport up to buffer size
// TODO: should this be a readAll?
rLen_ = transport_->read(rBuf_, rBufSize_);
rPos_ = 0;
}
// Hand over whatever we have
uint32_t give = need;
if (rLen_-rPos_ < give) {
give = rLen_-rPos_;
}
memcpy(buf, rBuf_+rPos_, give);
rPos_ += give;
need -= give;
return (len - need);
}
void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
if (len == 0) {
return;
}
if (len + wLen_ >= wBufSize_) {
uint32_t copy = wBufSize_ - wLen_;
memcpy(wBuf_ + wLen_, buf, copy);
transport_->write(wBuf_, wBufSize_);
wLen_ = len - copy;
if (wLen_ > 0) {
memcpy(wBuf_, buf+copy, wLen_);
}
} else {
memcpy(wBuf_+wLen_, buf, len);
wLen_ += len;
}
}
void TBufferedTransport::flush() {
// Write out any data waiting in the write buffer
if (wLen_ > 0) {
transport_->write(wBuf_, wLen_);
wLen_ = 0;
}
// Flush the underlying transport
transport_->flush();
}
uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
if (!read_) {
return transport_->read(buf, len);
}
uint32_t need = len;
// We don't have enough data yet
if (rLen_-rPos_ < need) {
// Copy out whatever we have
if (rLen_-rPos_ > 0) {
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
need -= rLen_-rPos_;
buf += rLen_-rPos_;
}
// Read another chunk
readFrame();
}
// Hand over whatever we have
uint32_t give = need;
if (rLen_-rPos_ < give) {
give = rLen_-rPos_;
}
memcpy(buf, rBuf_+rPos_, give);
rPos_ += give;
need -= give;
return (len - need);
}
void TFramedTransport::readFrame() {
// Get rid of the old frame
if (rBuf_ != NULL) {
delete [] rBuf_;
rBuf_ = NULL;
}
// Read in the next chunk size
int32_t sz;
transport_->readAll((uint8_t*)&sz, 4);
sz = (int32_t)ntohl(sz);
if (sz < 0) {
throw new TTransportException("Frame size has negative value");
}
// Read the frame payload, reset markers
rBuf_ = new uint8_t[sz];
transport_->readAll(rBuf_, sz);
rPos_ = 0;
rLen_ = sz;
}
void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
if (len == 0) {
return;
}
// Shortcut out if not write mode
if (!write_) {
transport_->write(buf, len);
return;
}
// Need to grow the buffer
if (len + wLen_ >= wBufSize_) {
// Double buffer size until sufficient
while (wBufSize_ < len + wLen_) {
wBufSize_ *= 2;
}
// Allocate new buffer
uint8_t* wBuf2 = new uint8_t[wBufSize_];
// Copy the old buffer to the new one
memcpy(wBuf2, wBuf_, wLen_);
// Now point buf to the new one
delete [] wBuf_;
wBuf_ = wBuf2;
}
// Copy data into buffer
memcpy(wBuf_ + wLen_, buf, len);
wLen_ += len;
}
void TFramedTransport::flush() {
if (!write_) {
transport_->flush();
return;
}
// Write frame size
int32_t sz = wLen_;
sz = (int32_t)htonl(sz);
transport_->write((const uint8_t*)&sz, 4);
// Write frame body
if (sz > 0) {
transport_->write(wBuf_, wLen_);
}
// All done
wLen_ = 0;
// Flush the underlying
transport_->flush();
}
uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
// Check avaible data for reading
uint32_t avail = wPos_ - rPos_;
// Device how much to give
uint32_t give = len;
if (avail < len) {
give = avail;
}
// Copy into buffer and increment rPos_
memcpy(buf, buffer_ + rPos_, give);
rPos_ += give;
return give;
}
void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
// Check available space
uint32_t avail = bufferSize_ - wPos_;
// Grow the buffer
if (len > avail) {
if (!owner_) {
throw TTransportException("Insufficient space in external MemoryBuffer");
}
while (len > avail) {
bufferSize_ *= 2;
buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory.");
}
}
}
// Copy into the buffer and increment wPos_
memcpy(buffer_ + wPos_, buf, len);
wPos_ += len;
}
}}} // facebook::thrift::transport

View File

@ -0,0 +1,316 @@
#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
#include <transport/TTransport.h>
namespace facebook { namespace thrift { namespace transport {
/**
* The null transport is a dummy transport that doesn't actually do anything.
* It's sort of an analogy to /dev/null, you can never read anything from it
* and it will let you write anything you want to it, though it won't actually
* go anywhere.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TNullTransport : public TTransport {
public:
TNullTransport() {}
~TNullTransport() {}
bool isOpen() {
return true;
}
void open() {}
void write(const std::string& s) {}
};
/**
* Buffered transport. For reads it will read more data than is requested
* and will serve future data out of a local buffer. For writes, data is
* stored to an in memory buffer before being written out.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TBufferedTransport : public TTransport {
public:
TBufferedTransport(boost::shared_ptr<TTransport> transport) :
transport_(transport),
rBufSize_(512), rPos_(0), rLen_(0),
wBufSize_(512), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rBufSize_(sz), rPos_(0), rLen_(0),
wBufSize_(sz), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
transport_(transport),
rBufSize_(rsz), rPos_(0), rLen_(0),
wBufSize_(wsz), wLen_(0) {
rBuf_ = new uint8_t[rBufSize_];
wBuf_ = new uint8_t[wBufSize_];
}
~TBufferedTransport() {
delete [] rBuf_;
delete [] wBuf_;
}
bool isOpen() {
return transport_->isOpen();
}
void open() {
transport_->open();
}
void close() {
transport_->close();
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
void flush();
protected:
boost::shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rBufSize_;
uint32_t rPos_;
uint32_t rLen_;
uint8_t* wBuf_;
uint32_t wBufSize_;
uint32_t wLen_;
};
/**
* Wraps a transport into a buffered one.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TBufferedTransportFactory : public TTransportFactory {
public:
TBufferedTransportFactory() {}
virtual ~TBufferedTransportFactory() {}
/**
* Wraps the transport into a buffered one.
*/
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
return std::make_pair(buffered, buffered);
}
};
/**
* Framed transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
* binary chunk followed by the data payload. This allows the receiver on the
* other end to always do fixed-length reads.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TFramedTransport : public TTransport {
public:
TFramedTransport(boost::shared_ptr<TTransport> transport) :
transport_(transport),
rPos_(0),
rLen_(0),
read_(true),
wBufSize_(512),
wLen_(0),
write_(true) {
rBuf_ = NULL;
wBuf_ = new uint8_t[wBufSize_];
}
TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rPos_(0),
rLen_(0),
read_(true),
wBufSize_(sz),
wLen_(0),
write_(true) {
rBuf_ = NULL;
wBuf_ = new uint8_t[wBufSize_];
}
~TFramedTransport() {
if (rBuf_ != NULL) {
delete [] rBuf_;
}
if (wBuf_ != NULL) {
delete [] wBuf_;
}
}
void setRead(bool read) {
read_ = read;
}
void setWrite(bool write) {
write_ = write;
}
void open() {
transport_->open();
}
bool isOpen() {
return transport_->isOpen();
}
void close() {
transport_->close();
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
void flush();
protected:
boost::shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rPos_;
uint32_t rLen_;
bool read_;
uint8_t* wBuf_;
uint32_t wBufSize_;
uint32_t wLen_;
bool write_;
/**
* Reads a frame of input from the underlying stream.
*/
void readFrame();
};
/**
* A memory buffer is a tranpsort that simply reads from and writes to an
* in memory buffer. Anytime you call write on it, the data is simply placed
* into a buffer, and anytime you call read, data is read from that buffer.
*
* The buffers are allocated using C constructs malloc,realloc, and the size
* doubles as necessary.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TMemoryBuffer : public TTransport {
public:
TMemoryBuffer() {
owner_ = true;
bufferSize_ = 1024;
buffer_ = (uint8_t*)malloc(bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory");
}
wPos_ = 0;
rPos_ = 0;
}
TMemoryBuffer(uint32_t sz) {
owner_ = true;
bufferSize_ = sz;
buffer_ = (uint8_t*)malloc(bufferSize_);
if (buffer_ == NULL) {
throw TTransportException("Out of memory");
}
wPos_ = 0;
rPos_ = 0;
}
TMemoryBuffer(uint8_t* buf, int sz) {
owner_ = false;
buffer_ = buf;
bufferSize_ = sz;
wPos_ = sz;
rPos_ = 0;
}
~TMemoryBuffer() {
if (owner_) {
if (buffer_ != NULL) {
free(buffer_);
buffer_ = NULL;
}
}
}
bool isOpen() {
return true;
}
void open() {}
void close() {}
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
*bufPtr = buffer_;
*sz = wPos_;
}
void resetBuffer() {
wPos_ = 0;
rPos_ = 0;
}
void resetBuffer(uint8_t* buf, uint32_t sz) {
if (owner_) {
if (buffer_ != NULL) {
free(buffer_);
}
}
owner_ = false;
buffer_ = buf;
bufferSize_ = sz;
wPos_ = sz;
rPos_ = 0;
}
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
private:
// Data buffer
uint8_t* buffer_;
// Allocated buffer size
uint32_t bufferSize_;
// Where the write is at
uint32_t wPos_;
// Where the reader is at
uint32_t rPos_;
// Is this object the owner of the buffer?
bool owner_;
};
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_