From ef7200f6e825db54edfa4736192446c96b2ae1d4 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:09:42 +0000 Subject: [PATCH] THRIFT-928. cpp: Include request/response size in processor callbacks Required updating transport interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005129 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 8 ++++---- lib/cpp/src/TProcessor.h | 4 ++-- lib/cpp/src/transport/TBufferTransports.cpp | 8 ++++++++ lib/cpp/src/transport/TBufferTransports.h | 14 +++++++++++++- lib/cpp/src/transport/THttpTransport.cpp | 3 ++- lib/cpp/src/transport/THttpTransport.h | 2 +- lib/cpp/src/transport/TTransport.h | 14 +++++++++----- lib/cpp/src/transport/TTransportUtils.cpp | 8 ++++---- lib/cpp/src/transport/TTransportUtils.h | 12 ++++++++---- test/cpp/src/TestServer.cpp | 4 ++-- 10 files changed, 53 insertions(+), 24 deletions(-) diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index c2f9e2e76..e372b9741 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -2351,9 +2351,9 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << argsname << " args;" << endl << indent() << "args.read(iprot);" << endl << indent() << "iprot->readMessageEnd();" << endl << - indent() << "iprot->getTransport()->readEnd();" << endl << endl << + indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << indent() << "}" << endl << endl; @@ -2460,10 +2460,10 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "bytes = oprot->getTransport()->writeEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << indent() << "}" << endl; // Close function diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 896f5ae6f..7858166ee 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -59,7 +59,7 @@ class TProcessorEventHandler { /** * Called between reading arguments and calling the handler. */ - virtual void postRead(void* ctx, const char* fn_name) {} + virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {} /** * Called between calling the handler and writing the response. @@ -69,7 +69,7 @@ class TProcessorEventHandler { /** * Called after writing the response. */ - virtual void postWrite(void* ctx, const char* fn_name) {} + virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {} /** * Called when an async function call completes successfully. diff --git a/lib/cpp/src/transport/TBufferTransports.cpp b/lib/cpp/src/transport/TBufferTransports.cpp index 6097130b9..c76f661e8 100644 --- a/lib/cpp/src/transport/TBufferTransports.cpp +++ b/lib/cpp/src/transport/TBufferTransports.cpp @@ -262,6 +262,10 @@ void TFramedTransport::flush() { transport_->flush(); } +uint32_t TFramedTransport::writeEnd() { + return wBase_ - wBuf_.get(); +} + const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { // Don't try to be clever with shifting buffers. // If the fast path failed let the protocol use its slow path. @@ -269,6 +273,10 @@ const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { return NULL; } +uint32_t TFramedTransport::readEnd() { + // include framing bytes + return rBound_ - rBuf_.get() + sizeof(uint32_t); +} void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { // Correct rBound_ so we can use the fast path in the future. diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h index b542fd5e3..f81a6a0da 100644 --- a/lib/cpp/src/transport/TBufferTransports.h +++ b/lib/cpp/src/transport/TBufferTransports.h @@ -348,6 +348,10 @@ class TFramedTransport : public TBufferBase { virtual void flush(); + uint32_t readEnd(); + + uint32_t writeEnd(); + const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); boost::shared_ptr getUnderlyingTransport() { @@ -612,10 +616,18 @@ class TMemoryBuffer : public TBufferBase { uint32_t readAppendToString(std::string& str, uint32_t len); - void readEnd() { + // return number of bytes read + uint32_t readEnd() { + uint32_t bytes = rBase_ - buffer_; if (rBase_ == wBase_) { resetBuffer(); } + return bytes; + } + + // Return number of bytes written + uint32_t writeEnd() { + return wBase_ - buffer_; } uint32_t available_read() const { diff --git a/lib/cpp/src/transport/THttpTransport.cpp b/lib/cpp/src/transport/THttpTransport.cpp index 4010d6b89..0934f1b66 100644 --- a/lib/cpp/src/transport/THttpTransport.cpp +++ b/lib/cpp/src/transport/THttpTransport.cpp @@ -66,13 +66,14 @@ uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) { return readBuffer_.read(buf, len); } -void THttpTransport::readEnd() { +uint32_t THttpTransport::readEnd() { // Read any pending chunked data (footers etc.) if (chunked_) { while (!chunkedDone_) { readChunked(); } } + return 0; } uint32_t THttpTransport::readMoreData() { diff --git a/lib/cpp/src/transport/THttpTransport.h b/lib/cpp/src/transport/THttpTransport.h index e71dcbd13..cd58bcb74 100644 --- a/lib/cpp/src/transport/THttpTransport.h +++ b/lib/cpp/src/transport/THttpTransport.h @@ -55,7 +55,7 @@ class THttpTransport : public TTransport { uint32_t read(uint8_t* buf, uint32_t len); - void readEnd(); + uint32_t readEnd(); void write(const uint8_t* buf, uint32_t len); diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index f9e20cea9..b9c35f0b9 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -116,10 +116,11 @@ class TTransport { * This can be over-ridden to perform a transport-specific action * e.g. logging the request to a file * + * @return number of bytes read if available, 0 otherwise. */ - virtual void readEnd() { + virtual uint32_t readEnd() { // default behaviour is to do nothing - return; + return 0; } /** @@ -137,10 +138,11 @@ class TTransport { * This can be over-ridden to perform a transport-specific action * at the end of a request. * + * @return number of bytes written if available, 0 otherwise */ - virtual void writeEnd() { + virtual uint32_t writeEnd() { // default behaviour is to do nothing - return; + return 0; } /** @@ -149,7 +151,9 @@ class TTransport { * * @throws TTransportException if an error occurs */ - virtual void flush() {} + virtual void flush() { + // default behaviour is to do nothing + } /** * Attempts to return a pointer to \c len bytes, possibly copied into \c buf. diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index a840fa6c1..72289bca4 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -135,16 +135,16 @@ uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) { return have; } -void TPipedFileReaderTransport::readEnd() { - TPipedTransport::readEnd(); +uint32_t TPipedFileReaderTransport::readEnd() { + return TPipedTransport::readEnd(); } void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) { TPipedTransport::write(buf, len); } -void TPipedFileReaderTransport::writeEnd() { - TPipedTransport::writeEnd(); +uint32_t TPipedFileReaderTransport::writeEnd() { + return TPipedTransport::writeEnd(); } void TPipedFileReaderTransport::flush() { diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index d65c91674..8b0c076fb 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -136,7 +136,7 @@ class TPipedTransport : virtual public TTransport { uint32_t read(uint8_t* buf, uint32_t len); - void readEnd() { + uint32_t readEnd() { if (pipeOnRead_) { dstTrans_->write(rBuf_, rPos_); @@ -148,18 +148,22 @@ class TPipedTransport : virtual public TTransport { // If requests are being pipelined, copy down our read-ahead data, // then reset our state. int read_ahead = rLen_ - rPos_; + uint32_t bytes = rPos_; memcpy(rBuf_, rBuf_ + rPos_, read_ahead); rPos_ = 0; rLen_ = read_ahead; + + return bytes; } void write(const uint8_t* buf, uint32_t len); - void writeEnd() { + uint32_t writeEnd() { if (pipeOnWrite_) { dstTrans_->write(wBuf_, wLen_); dstTrans_->flush(); } + return wLen_; } void flush(); @@ -237,9 +241,9 @@ class TPipedFileReaderTransport : public TPipedTransport, void close(); uint32_t read(uint8_t* buf, uint32_t len); uint32_t readAll(uint8_t* buf, uint32_t len); - void readEnd(); + uint32_t readEnd(); void write(const uint8_t* buf, uint32_t len); - void writeEnd(); + uint32_t writeEnd(); void flush(); // TFileReaderTransport functions diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 18bdc548c..369237e5c 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -298,13 +298,13 @@ class TestProcessorEventHandler : public TProcessorEventHandler { virtual void preRead(void* ctx, const char* fn_name) { communicate("preRead", ctx, fn_name); } - virtual void postRead(void* ctx, const char* fn_name) { + virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) { communicate("postRead", ctx, fn_name); } virtual void preWrite(void* ctx, const char* fn_name) { communicate("preWrite", ctx, fn_name); } - virtual void postWrite(void* ctx, const char* fn_name) { + virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) { communicate("postWrite", ctx, fn_name); } virtual void asyncComplete(void* ctx, const char* fn_name) {