mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-928. cpp: Include request/response size in processor callbacks
Required updating transport interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005129 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18cd0f0334
commit
ef7200f6e8
@ -2351,9 +2351,9 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
|
||||
indent() << argsname << " args;" << endl <<
|
||||
indent() << "args.read(iprot);" << endl <<
|
||||
indent() << "iprot->readMessageEnd();" << endl <<
|
||||
indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
|
||||
indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl << endl <<
|
||||
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
|
||||
indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
|
||||
indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
|
||||
indent() << "}" << endl <<
|
||||
endl;
|
||||
|
||||
@ -2460,10 +2460,10 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
|
||||
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
|
||||
indent() << "result.write(oprot);" << endl <<
|
||||
indent() << "oprot->writeMessageEnd();" << endl <<
|
||||
indent() << "oprot->getTransport()->writeEnd();" << endl <<
|
||||
indent() << "bytes = oprot->getTransport()->writeEnd();" << endl <<
|
||||
indent() << "oprot->getTransport()->flush();" << endl << endl <<
|
||||
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
|
||||
indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
|
||||
indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
|
||||
indent() << "}" << endl;
|
||||
|
||||
// Close function
|
||||
|
@ -59,7 +59,7 @@ class TProcessorEventHandler {
|
||||
/**
|
||||
* Called between reading arguments and calling the handler.
|
||||
*/
|
||||
virtual void postRead(void* ctx, const char* fn_name) {}
|
||||
virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {}
|
||||
|
||||
/**
|
||||
* Called between calling the handler and writing the response.
|
||||
@ -69,7 +69,7 @@ class TProcessorEventHandler {
|
||||
/**
|
||||
* Called after writing the response.
|
||||
*/
|
||||
virtual void postWrite(void* ctx, const char* fn_name) {}
|
||||
virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {}
|
||||
|
||||
/**
|
||||
* Called when an async function call completes successfully.
|
||||
|
@ -262,6 +262,10 @@ void TFramedTransport::flush() {
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
uint32_t TFramedTransport::writeEnd() {
|
||||
return wBase_ - wBuf_.get();
|
||||
}
|
||||
|
||||
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
|
||||
// Don't try to be clever with shifting buffers.
|
||||
// If the fast path failed let the protocol use its slow path.
|
||||
@ -269,6 +273,10 @@ const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint32_t TFramedTransport::readEnd() {
|
||||
// include framing bytes
|
||||
return rBound_ - rBuf_.get() + sizeof(uint32_t);
|
||||
}
|
||||
|
||||
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
|
||||
// Correct rBound_ so we can use the fast path in the future.
|
||||
|
@ -348,6 +348,10 @@ class TFramedTransport : public TBufferBase {
|
||||
|
||||
virtual void flush();
|
||||
|
||||
uint32_t readEnd();
|
||||
|
||||
uint32_t writeEnd();
|
||||
|
||||
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
@ -612,10 +616,18 @@ class TMemoryBuffer : public TBufferBase {
|
||||
|
||||
uint32_t readAppendToString(std::string& str, uint32_t len);
|
||||
|
||||
void readEnd() {
|
||||
// return number of bytes read
|
||||
uint32_t readEnd() {
|
||||
uint32_t bytes = rBase_ - buffer_;
|
||||
if (rBase_ == wBase_) {
|
||||
resetBuffer();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
// Return number of bytes written
|
||||
uint32_t writeEnd() {
|
||||
return wBase_ - buffer_;
|
||||
}
|
||||
|
||||
uint32_t available_read() const {
|
||||
|
@ -66,13 +66,14 @@ uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) {
|
||||
return readBuffer_.read(buf, len);
|
||||
}
|
||||
|
||||
void THttpTransport::readEnd() {
|
||||
uint32_t THttpTransport::readEnd() {
|
||||
// Read any pending chunked data (footers etc.)
|
||||
if (chunked_) {
|
||||
while (!chunkedDone_) {
|
||||
readChunked();
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t THttpTransport::readMoreData() {
|
||||
|
@ -55,7 +55,7 @@ class THttpTransport : public TTransport {
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
void readEnd();
|
||||
uint32_t readEnd();
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
|
||||
|
@ -116,10 +116,11 @@ class TTransport {
|
||||
* This can be over-ridden to perform a transport-specific action
|
||||
* e.g. logging the request to a file
|
||||
*
|
||||
* @return number of bytes read if available, 0 otherwise.
|
||||
*/
|
||||
virtual void readEnd() {
|
||||
virtual uint32_t readEnd() {
|
||||
// default behaviour is to do nothing
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -137,10 +138,11 @@ class TTransport {
|
||||
* This can be over-ridden to perform a transport-specific action
|
||||
* at the end of a request.
|
||||
*
|
||||
* @return number of bytes written if available, 0 otherwise
|
||||
*/
|
||||
virtual void writeEnd() {
|
||||
virtual uint32_t writeEnd() {
|
||||
// default behaviour is to do nothing
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -149,7 +151,9 @@ class TTransport {
|
||||
*
|
||||
* @throws TTransportException if an error occurs
|
||||
*/
|
||||
virtual void flush() {}
|
||||
virtual void flush() {
|
||||
// default behaviour is to do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
|
||||
|
@ -135,16 +135,16 @@ uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
|
||||
return have;
|
||||
}
|
||||
|
||||
void TPipedFileReaderTransport::readEnd() {
|
||||
TPipedTransport::readEnd();
|
||||
uint32_t TPipedFileReaderTransport::readEnd() {
|
||||
return TPipedTransport::readEnd();
|
||||
}
|
||||
|
||||
void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
|
||||
TPipedTransport::write(buf, len);
|
||||
}
|
||||
|
||||
void TPipedFileReaderTransport::writeEnd() {
|
||||
TPipedTransport::writeEnd();
|
||||
uint32_t TPipedFileReaderTransport::writeEnd() {
|
||||
return TPipedTransport::writeEnd();
|
||||
}
|
||||
|
||||
void TPipedFileReaderTransport::flush() {
|
||||
|
@ -136,7 +136,7 @@ class TPipedTransport : virtual public TTransport {
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
void readEnd() {
|
||||
uint32_t readEnd() {
|
||||
|
||||
if (pipeOnRead_) {
|
||||
dstTrans_->write(rBuf_, rPos_);
|
||||
@ -148,18 +148,22 @@ class TPipedTransport : virtual public TTransport {
|
||||
// If requests are being pipelined, copy down our read-ahead data,
|
||||
// then reset our state.
|
||||
int read_ahead = rLen_ - rPos_;
|
||||
uint32_t bytes = rPos_;
|
||||
memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
|
||||
rPos_ = 0;
|
||||
rLen_ = read_ahead;
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
|
||||
void writeEnd() {
|
||||
uint32_t writeEnd() {
|
||||
if (pipeOnWrite_) {
|
||||
dstTrans_->write(wBuf_, wLen_);
|
||||
dstTrans_->flush();
|
||||
}
|
||||
return wLen_;
|
||||
}
|
||||
|
||||
void flush();
|
||||
@ -237,9 +241,9 @@ class TPipedFileReaderTransport : public TPipedTransport,
|
||||
void close();
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
uint32_t readAll(uint8_t* buf, uint32_t len);
|
||||
void readEnd();
|
||||
uint32_t readEnd();
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
void writeEnd();
|
||||
uint32_t writeEnd();
|
||||
void flush();
|
||||
|
||||
// TFileReaderTransport functions
|
||||
|
@ -298,13 +298,13 @@ class TestProcessorEventHandler : public TProcessorEventHandler {
|
||||
virtual void preRead(void* ctx, const char* fn_name) {
|
||||
communicate("preRead", ctx, fn_name);
|
||||
}
|
||||
virtual void postRead(void* ctx, const char* fn_name) {
|
||||
virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
|
||||
communicate("postRead", ctx, fn_name);
|
||||
}
|
||||
virtual void preWrite(void* ctx, const char* fn_name) {
|
||||
communicate("preWrite", ctx, fn_name);
|
||||
}
|
||||
virtual void postWrite(void* ctx, const char* fn_name) {
|
||||
virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
|
||||
communicate("postWrite", ctx, fn_name);
|
||||
}
|
||||
virtual void asyncComplete(void* ctx, const char* fn_name) {
|
||||
|
Loading…
Reference in New Issue
Block a user