mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
-- Change concept of protocol and transport factory
Summary: - Transport factories now wrap around one transport - Protocol factories now wrap around one transport (as opposed to a pair of input/output transports) - TServer now takes input/output transport and protocol factories The motivation for this change is that you could concievably want to use a different protocol or transport for input and output. An example is that incoming data is encoded using binary protocol but outgoing data is encrypted XML (with encryption being done on the transport level). This change should be mostly backwards compatible because the TServer classes have constructors that take a transport factory and use that for both the input and transport factories. The only change might be for anyone who is using the C++ client code directly i.e. instantiating TBinaryProtocol() directly because the constructor now only accepts one transport. Reviewed By: Slee Test Plan: Everything compiles (for both thrift and search). Notes: I am going to make the same changes in all the supported languages after this... git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664940 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bf05a6069e
commit
9abb0d69b7
@ -79,30 +79,30 @@ uint32_t TBinaryProtocol::writeSetEnd() {
|
||||
|
||||
uint32_t TBinaryProtocol::writeBool(const bool value) {
|
||||
uint8_t tmp = value ? 1 : 0;
|
||||
outputTransport_->write(&tmp, 1);
|
||||
trans_->write(&tmp, 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
|
||||
outputTransport_->write((uint8_t*)&byte, 1);
|
||||
trans_->write((uint8_t*)&byte, 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
|
||||
int16_t net = (int16_t)htons(i16);
|
||||
outputTransport_->write((uint8_t*)&net, 2);
|
||||
trans_->write((uint8_t*)&net, 2);
|
||||
return 2;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
|
||||
int32_t net = (int32_t)htonl(i32);
|
||||
outputTransport_->write((uint8_t*)&net, 4);
|
||||
trans_->write((uint8_t*)&net, 4);
|
||||
return 4;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
|
||||
int64_t net = (int64_t)htonll(i64);
|
||||
outputTransport_->write((uint8_t*)&net, 8);
|
||||
trans_->write((uint8_t*)&net, 8);
|
||||
return 8;
|
||||
}
|
||||
|
||||
@ -117,14 +117,14 @@ uint32_t TBinaryProtocol::writeDouble(const double dub) {
|
||||
b[5] = d[2];
|
||||
b[6] = d[1];
|
||||
b[7] = d[0];
|
||||
outputTransport_->write((uint8_t*)b, 8);
|
||||
trans_->write((uint8_t*)b, 8);
|
||||
return 8;
|
||||
}
|
||||
|
||||
|
||||
uint32_t TBinaryProtocol::writeString(const string& str) {
|
||||
uint32_t result = writeI32(str.size());
|
||||
outputTransport_->write((uint8_t*)str.data(), str.size());
|
||||
trans_->write((uint8_t*)str.data(), str.size());
|
||||
return result + str.size();
|
||||
}
|
||||
|
||||
@ -233,21 +233,21 @@ uint32_t TBinaryProtocol::readSetEnd() {
|
||||
|
||||
uint32_t TBinaryProtocol::readBool(bool& value) {
|
||||
uint8_t b[1];
|
||||
inputTransport_->readAll(b, 1);
|
||||
trans_->readAll(b, 1);
|
||||
value = *(int8_t*)b != 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::readByte(int8_t& byte) {
|
||||
uint8_t b[1];
|
||||
inputTransport_->readAll(b, 1);
|
||||
trans_->readAll(b, 1);
|
||||
byte = *(int8_t*)b;
|
||||
return 1;
|
||||
}
|
||||
|
||||
uint32_t TBinaryProtocol::readI16(int16_t& i16) {
|
||||
uint8_t b[2];
|
||||
inputTransport_->readAll(b, 2);
|
||||
trans_->readAll(b, 2);
|
||||
i16 = *(int16_t*)b;
|
||||
i16 = (int16_t)ntohs(i16);
|
||||
return 2;
|
||||
@ -255,7 +255,7 @@ uint32_t TBinaryProtocol::readI16(int16_t& i16) {
|
||||
|
||||
uint32_t TBinaryProtocol::readI32(int32_t& i32) {
|
||||
uint8_t b[4];
|
||||
inputTransport_->readAll(b, 4);
|
||||
trans_->readAll(b, 4);
|
||||
i32 = *(int32_t*)b;
|
||||
i32 = (int32_t)ntohl(i32);
|
||||
return 4;
|
||||
@ -263,7 +263,7 @@ uint32_t TBinaryProtocol::readI32(int32_t& i32) {
|
||||
|
||||
uint32_t TBinaryProtocol::readI64(int64_t& i64) {
|
||||
uint8_t b[8];
|
||||
inputTransport_->readAll(b, 8);
|
||||
trans_->readAll(b, 8);
|
||||
i64 = *(int64_t*)b;
|
||||
i64 = (int64_t)ntohll(i64);
|
||||
return 8;
|
||||
@ -272,7 +272,7 @@ uint32_t TBinaryProtocol::readI64(int64_t& i64) {
|
||||
uint32_t TBinaryProtocol::readDouble(double& dub) {
|
||||
uint8_t b[8];
|
||||
uint8_t d[8];
|
||||
inputTransport_->readAll(b, 8);
|
||||
trans_->readAll(b, 8);
|
||||
d[0] = b[7];
|
||||
d[1] = b[6];
|
||||
d[2] = b[5];
|
||||
@ -294,7 +294,7 @@ uint32_t TBinaryProtocol::readString(string& str) {
|
||||
|
||||
// Use the heap here to prevent stack overflow for v. large strings
|
||||
uint8_t *b = new uint8_t[size];
|
||||
inputTransport_->readAll(b, size);
|
||||
trans_->readAll(b, size);
|
||||
str = string((char*)b, size);
|
||||
delete [] b;
|
||||
|
||||
|
@ -17,8 +17,8 @@ using namespace boost;
|
||||
*/
|
||||
class TBinaryProtocol : public TProtocol {
|
||||
public:
|
||||
TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
|
||||
TProtocol(in, out) {}
|
||||
TBinaryProtocol(shared_ptr<TTransport> trans) :
|
||||
TProtocol(trans) {}
|
||||
|
||||
~TBinaryProtocol() {}
|
||||
|
||||
@ -137,9 +137,8 @@ class TBinaryProtocolFactory : public TProtocolFactory {
|
||||
|
||||
virtual ~TBinaryProtocolFactory() {}
|
||||
|
||||
std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
|
||||
boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
|
||||
return std::make_pair(prot, prot);
|
||||
boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TProtocol>(new TBinaryProtocol(trans));
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -282,22 +282,24 @@ class TProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
shared_ptr<TTransport> getInputTransport() {
|
||||
return inputTransport_;
|
||||
inline shared_ptr<TTransport> getTransport() {
|
||||
return trans_;
|
||||
}
|
||||
|
||||
shared_ptr<TTransport> getOutputTransport() {
|
||||
return outputTransport_;
|
||||
// TODO: remove these two calls, they are for backwards
|
||||
// compatibility
|
||||
inline shared_ptr<TTransport> getInputTransport() {
|
||||
return trans_;
|
||||
}
|
||||
inline shared_ptr<TTransport> getOutputTransport() {
|
||||
return trans_;
|
||||
}
|
||||
|
||||
protected:
|
||||
TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
|
||||
inputTransport_(in),
|
||||
outputTransport_(out) {}
|
||||
TProtocol(shared_ptr<TTransport> trans):
|
||||
trans_(trans) {}
|
||||
|
||||
shared_ptr<TTransport> inputTransport_;
|
||||
|
||||
shared_ptr<TTransport> outputTransport_;
|
||||
shared_ptr<TTransport> trans_;
|
||||
|
||||
private:
|
||||
TProtocol() {}
|
||||
@ -312,7 +314,7 @@ class TProtocolFactory {
|
||||
|
||||
virtual ~TProtocolFactory() {}
|
||||
|
||||
virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
|
||||
virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::protocol
|
||||
|
@ -28,17 +28,13 @@ namespace facebook { namespace thrift { namespace server {
|
||||
// Set flags, which also registers the event
|
||||
setFlags(eventFlags);
|
||||
|
||||
// TODO: this needs to be replaced by the new version of TTransportFactory
|
||||
factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
|
||||
// factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
|
||||
// get input/transports
|
||||
factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
|
||||
factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
|
||||
|
||||
// Create protocol
|
||||
std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
|
||||
iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
|
||||
outputTransport_);
|
||||
inputProtocol_ = iop.first;
|
||||
outputProtocol_ = iop.second;
|
||||
|
||||
inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
|
||||
outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
|
||||
}
|
||||
|
||||
void TConnection::workSocket() {
|
||||
@ -353,7 +349,7 @@ void TConnection::close() {
|
||||
|
||||
// close any factory produced transports
|
||||
factoryInputTransport_->close();
|
||||
// factoryOutputTransport_->close();
|
||||
factoryOutputTransport_->close();
|
||||
|
||||
// Give this object back to the server that owns it
|
||||
server_->returnConnection(this);
|
||||
@ -366,7 +362,7 @@ void TConnection::close() {
|
||||
TConnection* TNonblockingServer::createConnection(int socket, short flags) {
|
||||
// Check the stack
|
||||
if (connectionStack_.empty()) {
|
||||
return new TConnection(socket, flags, this, this->getTransportFactory());
|
||||
return new TConnection(socket, flags, this);
|
||||
} else {
|
||||
TConnection* result = connectionStack_.top();
|
||||
connectionStack_.pop();
|
||||
|
@ -60,19 +60,31 @@ class TNonblockingServer : public TServer {
|
||||
TNonblockingServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocolFactory> protocolFactory,
|
||||
int port) :
|
||||
TServer(processor, protocolFactory),
|
||||
TServer(processor),
|
||||
serverSocket_(0),
|
||||
port_(port),
|
||||
frameResponses_(true) {}
|
||||
frameResponses_(true) {
|
||||
setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setInputProtocolFactory(protocolFactory);
|
||||
setOutputProtocolFactory(protocolFactory);
|
||||
}
|
||||
|
||||
TNonblockingServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocolFactory> protocolFactory,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
TNonblockingServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TTransportFactory> inputTransportFactory,
|
||||
shared_ptr<TTransportFactory> outputTransportFactory,
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory,
|
||||
int port) :
|
||||
TServer(processor, protocolFactory, transportFactory),
|
||||
TServer(processor),
|
||||
serverSocket_(0),
|
||||
port_(port),
|
||||
frameResponses_(true) {}
|
||||
frameResponses_(true) {
|
||||
setInputTransportFactory(inputTransportFactory);
|
||||
setOutputTransportFactory(outputTransportFactory);
|
||||
setInputProtocolFactory(inputProtocolFactory);
|
||||
setOutputProtocolFactory(outputProtocolFactory);
|
||||
}
|
||||
|
||||
~TNonblockingServer() {}
|
||||
|
||||
@ -175,13 +187,13 @@ class TConnection {
|
||||
|
||||
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
|
||||
shared_ptr<TTransport> factoryInputTransport_;
|
||||
// shared_ptr<TTransport> factoryOutputTransport_;
|
||||
|
||||
// Protocol encoder
|
||||
shared_ptr<TProtocol> outputProtocol_;
|
||||
shared_ptr<TTransport> factoryOutputTransport_;
|
||||
|
||||
// Protocol decoder
|
||||
shared_ptr<TProtocol> inputProtocol_;
|
||||
|
||||
// Protocol encoder
|
||||
shared_ptr<TProtocol> outputProtocol_;
|
||||
|
||||
// Go into read mode
|
||||
void setRead() {
|
||||
@ -205,8 +217,7 @@ class TConnection {
|
||||
public:
|
||||
|
||||
// Constructor
|
||||
TConnection(int socket, short eventFlags, TNonblockingServer *s,
|
||||
shared_ptr<TTransportFactory> transportFactory) {
|
||||
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
|
||||
readBuffer_ = (uint8_t*)malloc(1024);
|
||||
if (readBuffer_ == NULL) {
|
||||
throw new facebook::thrift::TException("Out of memory.");
|
||||
|
@ -38,73 +38,92 @@ public:
|
||||
return serverTransport_;
|
||||
}
|
||||
|
||||
shared_ptr<TTransportFactory> getTransportFactory() {
|
||||
return transportFactory_;
|
||||
shared_ptr<TTransportFactory> getInputTransportFactory() {
|
||||
return inputTransportFactory_;
|
||||
}
|
||||
|
||||
shared_ptr<TTransportFactory> getOutputTransportFactory() {
|
||||
return outputTransportFactory_;
|
||||
}
|
||||
|
||||
shared_ptr<TProtocolFactory> getProtocolFactory() {
|
||||
return protocolFactory_;
|
||||
shared_ptr<TProtocolFactory> getInputProtocolFactory() {
|
||||
return inputProtocolFactory_;
|
||||
}
|
||||
|
||||
shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
|
||||
return outputProtocolFactory_;
|
||||
}
|
||||
|
||||
protected:
|
||||
TServer(shared_ptr<TProcessor> processor):
|
||||
processor_(processor) {
|
||||
setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
|
||||
setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport):
|
||||
processor_(processor),
|
||||
serverTransport_(serverTransport) {
|
||||
setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
|
||||
setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
|
||||
setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<TProtocolFactory> protocolFactory) :
|
||||
shared_ptr<TProtocolFactory> protocolFactory):
|
||||
processor_(processor),
|
||||
serverTransport_(serverTransport),
|
||||
transportFactory_(transportFactory),
|
||||
protocolFactory_(protocolFactory) {}
|
||||
inputTransportFactory_(transportFactory),
|
||||
outputTransportFactory_(transportFactory),
|
||||
inputProtocolFactory_(protocolFactory),
|
||||
outputProtocolFactory_(protocolFactory) {}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory) :
|
||||
shared_ptr<TTransportFactory> inputTransportFactory,
|
||||
shared_ptr<TTransportFactory> outputTransportFactory,
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory):
|
||||
processor_(processor),
|
||||
serverTransport_(serverTransport),
|
||||
transportFactory_(transportFactory) {
|
||||
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
|
||||
}
|
||||
inputTransportFactory_(inputTransportFactory),
|
||||
outputTransportFactory_(outputTransportFactory),
|
||||
inputProtocolFactory_(inputProtocolFactory),
|
||||
outputProtocolFactory_(outputProtocolFactory) {}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport) :
|
||||
processor_(processor),
|
||||
serverTransport_(serverTransport) {
|
||||
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
|
||||
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor) :
|
||||
processor_(processor) {
|
||||
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
|
||||
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TTransportFactory> transportFactory) :
|
||||
processor_(processor),
|
||||
transportFactory_(transportFactory) {
|
||||
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocolFactory> protocolFactory) :
|
||||
processor_(processor) {
|
||||
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
|
||||
protocolFactory_ = protocolFactory;
|
||||
}
|
||||
|
||||
TServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TProtocolFactory> protocolFactory,
|
||||
shared_ptr<TTransportFactory> transportFactory):
|
||||
processor_(processor),
|
||||
transportFactory_(transportFactory),
|
||||
protocolFactory_(protocolFactory) {}
|
||||
|
||||
// Class variables
|
||||
shared_ptr<TProcessor> processor_;
|
||||
shared_ptr<TServerTransport> serverTransport_;
|
||||
shared_ptr<TTransportFactory> transportFactory_;
|
||||
shared_ptr<TProtocolFactory> protocolFactory_;
|
||||
|
||||
shared_ptr<TTransportFactory> inputTransportFactory_;
|
||||
shared_ptr<TTransportFactory> outputTransportFactory_;
|
||||
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory_;
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory_;
|
||||
|
||||
void setInputTransportFactory(shared_ptr<TTransportFactory> inputTransportFactory) {
|
||||
inputTransportFactory_ = inputTransportFactory;
|
||||
}
|
||||
|
||||
void setOutputTransportFactory(shared_ptr<TTransportFactory> outputTransportFactory) {
|
||||
outputTransportFactory_ = outputTransportFactory;
|
||||
}
|
||||
|
||||
void setInputProtocolFactory(shared_ptr<TProtocolFactory> inputProtocolFactory) {
|
||||
inputProtocolFactory_ = inputProtocolFactory;
|
||||
}
|
||||
|
||||
void setOutputProtocolFactory(shared_ptr<TProtocolFactory> outputProtocolFactory) {
|
||||
outputProtocolFactory_ = outputProtocolFactory;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::server
|
||||
|
@ -14,8 +14,10 @@ namespace facebook { namespace thrift { namespace server {
|
||||
void TSimpleServer::serve() {
|
||||
|
||||
shared_ptr<TTransport> client;
|
||||
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
|
||||
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
|
||||
shared_ptr<TTransport> inputTransport;
|
||||
shared_ptr<TTransport> outputTransport;
|
||||
shared_ptr<TProtocol> inputProtocol;
|
||||
shared_ptr<TProtocol> outputProtocol;
|
||||
|
||||
try {
|
||||
// Start the server listening
|
||||
@ -29,12 +31,14 @@ void TSimpleServer::serve() {
|
||||
try {
|
||||
while (true) {
|
||||
client = serverTransport_->accept();
|
||||
iot = transportFactory_->getIOTransports(client);
|
||||
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
|
||||
inputTransport = inputTransportFactory_->getTransport(client);
|
||||
outputTransport = outputTransportFactory_->getTransport(client);
|
||||
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
|
||||
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
|
||||
try {
|
||||
while (processor_->process(iop.first, iop.second)) {
|
||||
while (processor_->process(inputProtocol, outputProtocol)) {
|
||||
// Peek ahead, is the remote side closed?
|
||||
if (!iot.first->peek()) {
|
||||
if (!inputTransport->peek()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -43,8 +47,8 @@ void TSimpleServer::serve() {
|
||||
} catch (TException& tx) {
|
||||
cerr << "TSimpleServer exception: " << tx.what() << endl;
|
||||
}
|
||||
iot.first->close();
|
||||
iot.second->close();
|
||||
inputTransport->close();
|
||||
outputTransport->close();
|
||||
client->close();
|
||||
}
|
||||
} catch (TTransportException& ttx) {
|
||||
|
@ -21,6 +21,16 @@ class TSimpleServer : public TServer {
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<TProtocolFactory> protocolFactory) :
|
||||
TServer(processor, serverTransport, transportFactory, protocolFactory) {}
|
||||
|
||||
TSimpleServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> inputTransportFactory,
|
||||
shared_ptr<TTransportFactory> outputTransportFactory,
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory):
|
||||
TServer(processor, serverTransport,
|
||||
inputTransportFactory, outputTransportFactory,
|
||||
inputProtocolFactory, outputProtocolFactory) {}
|
||||
|
||||
~TSimpleServer() {}
|
||||
|
||||
|
@ -29,7 +29,7 @@ public:
|
||||
void run() {
|
||||
try {
|
||||
while (processor_->process(input_, output_)) {
|
||||
if (!input_->getInputTransport()->peek()) {
|
||||
if (!input_->getTransport()->peek()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -40,8 +40,8 @@ public:
|
||||
} catch (...) {
|
||||
cerr << "TThreadPoolServer uncaught exception." << endl;
|
||||
}
|
||||
input_->getInputTransport()->close();
|
||||
output_->getOutputTransport()->close();
|
||||
input_->getTransport()->close();
|
||||
output_->getTransport()->close();
|
||||
}
|
||||
|
||||
private:
|
||||
@ -55,19 +55,31 @@ TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> transportFactory,
|
||||
shared_ptr<TProtocolFactory> protocolFactory,
|
||||
|
||||
shared_ptr<ThreadManager> threadManager) :
|
||||
TServer(processor, serverTransport, transportFactory, protocolFactory),
|
||||
threadManager_(threadManager) {
|
||||
}
|
||||
threadManager_(threadManager) {}
|
||||
|
||||
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> inputTransportFactory,
|
||||
shared_ptr<TTransportFactory> outputTransportFactory,
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory,
|
||||
shared_ptr<ThreadManager> threadManager) :
|
||||
TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
|
||||
inputProtocolFactory, outputProtocolFactory),
|
||||
threadManager_(threadManager) {}
|
||||
|
||||
|
||||
TThreadPoolServer::~TThreadPoolServer() {}
|
||||
|
||||
void TThreadPoolServer::serve() {
|
||||
|
||||
shared_ptr<TTransport> client;
|
||||
pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
|
||||
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
|
||||
shared_ptr<TTransport> inputTransport;
|
||||
shared_ptr<TTransport> outputTransport;
|
||||
shared_ptr<TProtocol> inputProtocol;
|
||||
shared_ptr<TProtocol> outputProtocol;
|
||||
|
||||
try {
|
||||
// Start the server listening
|
||||
@ -82,11 +94,13 @@ void TThreadPoolServer::serve() {
|
||||
// Fetch client from server
|
||||
client = serverTransport_->accept();
|
||||
// Make IO transports
|
||||
iot = transportFactory_->getIOTransports(client);
|
||||
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
|
||||
inputTransport = inputTransportFactory_->getTransport(client);
|
||||
outputTransport = outputTransportFactory_->getTransport(client);
|
||||
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
|
||||
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
|
||||
|
||||
// Add to threadmanager pool
|
||||
threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
|
||||
threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
|
||||
} catch (TTransportException& ttx) {
|
||||
break;
|
||||
}
|
||||
|
@ -24,6 +24,14 @@ public:
|
||||
shared_ptr<TProtocolFactory> protocolFactory,
|
||||
shared_ptr<ThreadManager> threadManager);
|
||||
|
||||
TThreadPoolServer(shared_ptr<TProcessor> processor,
|
||||
shared_ptr<TServerTransport> serverTransport,
|
||||
shared_ptr<TTransportFactory> inputTransportFactory,
|
||||
shared_ptr<TTransportFactory> outputTransportFactory,
|
||||
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
||||
shared_ptr<TProtocolFactory> outputProtocolFactory,
|
||||
shared_ptr<ThreadManager> threadManager);
|
||||
|
||||
virtual ~TThreadPoolServer();
|
||||
|
||||
virtual void serve();
|
||||
|
@ -117,9 +117,8 @@ class TBufferedRouterTransportFactory : public TTransportFactory {
|
||||
/**
|
||||
* Wraps the transport into a buffered one.
|
||||
*/
|
||||
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
|
||||
boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
|
||||
return std::make_pair(buffered, buffered);
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -155,8 +155,8 @@ class TTransportFactory {
|
||||
/**
|
||||
* Default implementation does nothing, just returns the transport given.
|
||||
*/
|
||||
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
|
||||
return std::make_pair(trans, trans);
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return trans;
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -122,9 +122,8 @@ class TBufferedTransportFactory : public TTransportFactory {
|
||||
/**
|
||||
* Wraps the transport into a buffered one.
|
||||
*/
|
||||
virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
|
||||
boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
|
||||
return std::make_pair(buffered, buffered);
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
|
||||
}
|
||||
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user