Thrift: Modifications to PeekProcessor to be able to support nested PeekProcessors

Reviewed by: boz

Test Plan: Tested with Falcon


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665100 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Wang 2007-04-13 20:10:39 +00:00
parent 8d725a2738
commit 5f45207ee9
3 changed files with 44 additions and 17 deletions

View File

@ -1,45 +1,63 @@
#include "PeekProcessor.h"
using namespace facebook::thrift::transport;
using namespace facebook::thrift::protocol;
using namespace facebook::thrift;
namespace facebook { namespace thrift { namespace processor {
PeekProcessor::PeekProcessor() {
memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer());
memoryBuffer_.reset(new TMemoryBuffer());
targetTransport_ = memoryBuffer_;
}
PeekProcessor::~PeekProcessor() {}
void PeekProcessor::initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory) {
void PeekProcessor::initialize(boost::shared_ptr<TProcessor> actualProcessor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
boost::shared_ptr<TPipedTransportFactory> transportFactory) {
actualProcessor_ = actualProcessor;
pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_);
pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
transportFactory_ = transportFactory;
transportFactory_->initializeTargetTransport(memoryBuffer_);
transportFactory_->initializeTargetTransport(targetTransport_);
}
boost::shared_ptr<facebook::thrift::transport::TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in) {
boost::shared_ptr<TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<TTransport> in) {
return transportFactory_->getTransport(in);
}
bool PeekProcessor::process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
boost::shared_ptr<facebook::thrift::protocol::TProtocol> out) {
void PeekProcessor::setTargetTransport(boost::shared_ptr<TTransport> targetTransport) {
targetTransport_ = targetTransport;
if (boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
} else if (boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
}
if (!memoryBuffer_) {
throw TException("Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
}
}
bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
boost::shared_ptr<TProtocol> out) {
std::string fname;
facebook::thrift::protocol::TMessageType mtype;
TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid);
if (mtype != facebook::thrift::protocol::T_CALL) {
throw facebook::thrift::TException("Unexpected message type");
if (mtype != T_CALL) {
throw TException("Unexpected message type");
}
// Peek at the name
peekName(fname);
facebook::thrift::protocol::TType ftype;
TType ftype;
int16_t fid;
while (true) {
in->readFieldBegin(fname, ftype, fid);
if (ftype == facebook::thrift::protocol::T_STOP) {
if (ftype == T_STOP) {
break;
}
@ -73,9 +91,9 @@ void PeekProcessor::peekName(const std::string& fname) {
void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
}
void PeekProcessor::peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
facebook::thrift::protocol::TType ftype,
int16_t fid) {
void PeekProcessor::peek(boost::shared_ptr<TProtocol> in,
TType ftype,
int16_t fid) {
in->skip(ftype);
}

View File

@ -37,6 +37,8 @@ class PeekProcessor : public facebook::thrift::TProcessor {
boost::shared_ptr<facebook::thrift::transport::TTransport> getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in);
void setTargetTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport);
virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
boost::shared_ptr<facebook::thrift::protocol::TProtocol> out);
@ -54,6 +56,7 @@ class PeekProcessor : public facebook::thrift::TProcessor {
boost::shared_ptr<facebook::thrift::protocol::TProtocol> pipedProtocol_;
boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory_;
boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> memoryBuffer_;
boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport_;
};
}}} // facebook::thrift::processor

View File

@ -425,6 +425,7 @@ class TPipedTransport : virtual public TTransport {
void readEnd() {
if (pipeOnRead_) {
dstTrans_->write(rBuf_, rLen_);
dstTrans_->flush();
}
// reset state
@ -437,11 +438,16 @@ class TPipedTransport : virtual public TTransport {
void writeEnd() {
if (pipeOnWrite_) {
dstTrans_->write(wBuf_, wLen_);
dstTrans_->flush();
}
}
void flush();
boost::shared_ptr<TTransport> getTargetTransport() {
return dstTrans_;
}
protected:
boost::shared_ptr<TTransport> srcTrans_;
boost::shared_ptr<TTransport> dstTrans_;