mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
Memory-based transport rewrite.
The old implementations of the memory-based transports (TBufferedTransport, TFramedTransport, and TMemoryBuffer) shared very little code and all worked a bit differently. This change unifies them all as subclasses of a single base (TBufferBase) which handles the fast-path operations (when requests can be satisfied by the buffer) with inline methods (that will eventually be made nonvirtual in the template branch) and calls out to pure-virutal methods to handle full/empty buffers. All of the buffer-management is now done in terms of "base and bound" pointers rather than "pos" integers. These classes were moved to TBufferTransports.{h,cpp}. The .h is included in TTransportUtils for backwards compatibility. Also added a "TShortReadTransport" to assist testing transports. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a4edfa07f
commit
28f298dd5d
@ -2337,7 +2337,7 @@ void t_cpp_generator::generate_service_skeleton(t_service* tservice) {
|
||||
"#include <protocol/TBinaryProtocol.h>" << endl <<
|
||||
"#include <server/TSimpleServer.h>" << endl <<
|
||||
"#include <transport/TServerSocket.h>" << endl <<
|
||||
"#include <transport/TTransportUtils.h>" << endl <<
|
||||
"#include <transport/TBufferTransports.h>" << endl <<
|
||||
endl <<
|
||||
"using namespace facebook::thrift;" << endl <<
|
||||
"using namespace facebook::thrift::protocol;" << endl <<
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <transport/TFDTransport.h>
|
||||
#include <protocol/TBinaryProtocol.h>
|
||||
#include <protocol/TDebugProtocol.h>
|
||||
|
@ -41,6 +41,7 @@ libthrift_la_SOURCES = src/Thrift.cpp \
|
||||
src/transport/TSocketPool.cpp \
|
||||
src/transport/TServerSocket.cpp \
|
||||
src/transport/TTransportUtils.cpp \
|
||||
src/transport/TBufferTransports.cpp \
|
||||
src/server/TServer.cpp \
|
||||
src/server/TSimpleServer.cpp \
|
||||
src/server/TThreadPoolServer.cpp \
|
||||
@ -107,6 +108,7 @@ include_transport_HEADERS = \
|
||||
src/transport/TTransport.h \
|
||||
src/transport/TTransportException.h \
|
||||
src/transport/TTransportUtils.h \
|
||||
src/transport/TBufferTransports.h \
|
||||
src/transport/TZlibTransport.h
|
||||
|
||||
include_serverdir = $(include_thriftdir)/server
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <TProcessor.h>
|
||||
#include <transport/TTransport.h>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
namespace facebook { namespace thrift { namespace processor {
|
||||
|
@ -12,8 +12,6 @@
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
#include <transport/TTransportUtils.h>
|
||||
|
||||
namespace facebook { namespace thrift { namespace protocol {
|
||||
|
||||
/*
|
||||
@ -158,6 +156,9 @@ class TDebugProtocolFactory : public TProtocolFactory {
|
||||
}}} // facebook::thrift::protocol
|
||||
|
||||
|
||||
// TODO(dreiss): Move (part of) ThriftDebugString into a .cpp file and remove this.
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
namespace facebook { namespace thrift {
|
||||
|
||||
template<typename ThriftStruct>
|
||||
|
@ -8,7 +8,6 @@
|
||||
#define _THRIFT_PROTOCOL_TJSONPROTOCOL_H_ 1
|
||||
|
||||
#include "TProtocol.h"
|
||||
#include <transport/TTransportUtils.h>
|
||||
|
||||
#include <stack>
|
||||
|
||||
@ -303,6 +302,9 @@ class TJSONProtocolFactory {
|
||||
}}} // facebook::thrift::protocol
|
||||
|
||||
|
||||
// TODO(dreiss): Move part of ThriftJSONString into a .cpp file and remove this.
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
namespace facebook { namespace thrift {
|
||||
|
||||
template<typename ThriftStruct>
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
#include <Thrift.h>
|
||||
#include <server/TServer.h>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <concurrency/ThreadManager.h>
|
||||
#include <stack>
|
||||
#include <string>
|
||||
|
353
lib/cpp/src/transport/TBufferTransports.cpp
Normal file
353
lib/cpp/src/transport/TBufferTransports.cpp
Normal file
@ -0,0 +1,353 @@
|
||||
// Copyright (c) 2006- Facebook
|
||||
// Distributed under the Thrift Software License
|
||||
//
|
||||
// See accompanying file LICENSE or visit the Thrift site at:
|
||||
// http://developers.facebook.com/thrift/
|
||||
|
||||
#include <cassert>
|
||||
#include <algorithm>
|
||||
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
using std::string;
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
|
||||
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
|
||||
uint32_t want = len;
|
||||
uint32_t have = rBound_ - rBase_;
|
||||
|
||||
// We should only take the slow path if we can't satisfy the read
|
||||
// with the data already in the buffer.
|
||||
assert(have < want);
|
||||
|
||||
// Copy out whatever we have.
|
||||
if (have > 0) {
|
||||
memcpy(buf, rBase_, have);
|
||||
want -= have;
|
||||
buf += have;
|
||||
}
|
||||
// Get more from underlying transport up to buffer size.
|
||||
// Note that this makes a lot of sense if len < rBufSize_
|
||||
// and almost no sense otherwise. TODO(dreiss): Fix that
|
||||
// case (possibly including some readv hotness).
|
||||
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
|
||||
|
||||
// Hand over whatever we have.
|
||||
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
|
||||
memcpy(buf, rBase_, give);
|
||||
rBase_ += give;
|
||||
want -= give;
|
||||
|
||||
return (len - want);
|
||||
}
|
||||
|
||||
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
|
||||
uint32_t have_bytes = wBase_ - wBuf_.get();
|
||||
uint32_t space = wBound_ - wBase_;
|
||||
// We should only take the slow path if we can't accomodate the write
|
||||
// with the free space already in the buffer.
|
||||
assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
|
||||
|
||||
// Now here's the tricky question: should we copy data from buf into our
|
||||
// internal buffer and write it from there, or should we just write out
|
||||
// the current internal buffer in one syscall and write out buf in another.
|
||||
// If our currently buffered data plus buf is at least double our buffer
|
||||
// size, we will have to do two syscalls no matter what (except in the
|
||||
// degenerate case when our buffer is empty), so there is no use copying.
|
||||
// Otherwise, there is sort of a sliding scale. If we have N-1 bytes
|
||||
// buffered and need to write 2, it would be crazy to do two syscalls.
|
||||
// On the other hand, if we have 2 bytes buffered and are writing 2N-3,
|
||||
// we can save a syscall in the short term by loading up our buffer, writing
|
||||
// it out, and copying the rest of the bytes into our buffer. Of course,
|
||||
// if we get another 2-byte write, we haven't saved any syscalls at all,
|
||||
// and have just copied nearly 2N bytes for nothing. Finding a perfect
|
||||
// policy would require predicting the size of future writes, so we're just
|
||||
// going to always eschew syscalls if we have less than 2N bytes to write.
|
||||
|
||||
// The case where we have to do two syscalls.
|
||||
// This case also covers the case where the buffer is empty,
|
||||
// but it is clearer (I think) to think of it as two separate cases.
|
||||
if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
|
||||
// TODO(dreiss): writev
|
||||
if (have_bytes > 0) {
|
||||
transport_->write(wBuf_.get(), have_bytes);
|
||||
}
|
||||
transport_->write(buf, len);
|
||||
wBase_ = wBuf_.get();
|
||||
return;
|
||||
}
|
||||
|
||||
// Fill up our internal buffer for a write.
|
||||
memcpy(wBase_, buf, space);
|
||||
buf += space;
|
||||
len -= space;
|
||||
transport_->write(wBuf_.get(), wBufSize_);
|
||||
|
||||
// Copy the rest into our buffer.
|
||||
assert(len < wBufSize_);
|
||||
memcpy(wBuf_.get(), buf, len);
|
||||
wBase_ = wBuf_.get() + len;
|
||||
return;
|
||||
}
|
||||
|
||||
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
|
||||
// If the request is bigger than our buffer, we are hosed.
|
||||
if (*len > rBufSize_) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// The number of bytes of data we have already.
|
||||
uint32_t have = rBound_ - rBase_;
|
||||
// The number of additional bytes we need from the underlying transport.
|
||||
uint32_t need = *len - have;
|
||||
// The space from the start of the buffer to the end of our data.
|
||||
uint32_t offset = rBound_ - rBuf_.get();
|
||||
assert(need > 0);
|
||||
|
||||
// If we have less than half our buffer space available, shift the data
|
||||
// we have down to the start. If the borrow is big compared to our buffer,
|
||||
// this could be kind of a waste, but if the borrow is small, it frees up
|
||||
// space at the end of our buffer to do a bigger single read from the
|
||||
// underlying transport. Also, if our needs extend past the end of the
|
||||
// buffer, we have to do a copy no matter what.
|
||||
if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
|
||||
memmove(rBuf_.get(), rBase_, have);
|
||||
setReadBuffer(rBuf_.get(), have);
|
||||
}
|
||||
|
||||
// First try to fill up the buffer.
|
||||
uint32_t got = transport_->read(rBound_, rBufSize_ - have);
|
||||
rBound_ += got;
|
||||
need -= got;
|
||||
|
||||
// If that fails, readAll until we get what we need.
|
||||
if (need > 0) {
|
||||
rBound_ += transport_->readAll(rBound_, need);
|
||||
}
|
||||
|
||||
*len = rBound_ - rBase_;
|
||||
return rBase_;
|
||||
}
|
||||
|
||||
void TBufferedTransport::flush() {
|
||||
// Write out any data waiting in the write buffer.
|
||||
uint32_t have_bytes = wBase_ - wBuf_.get();
|
||||
if (have_bytes > 0) {
|
||||
transport_->write(wBuf_.get(), have_bytes);
|
||||
wBase_ = wBuf_.get();
|
||||
}
|
||||
|
||||
// Flush the underlying transport.
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
|
||||
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
|
||||
uint32_t want = len;
|
||||
uint32_t have = rBound_ - rBase_;
|
||||
|
||||
// We should only take the slow path if we can't satisfy the read
|
||||
// with the data already in the buffer.
|
||||
assert(have < want);
|
||||
|
||||
// Copy out whatever we have.
|
||||
if (have > 0) {
|
||||
memcpy(buf, rBase_, have);
|
||||
want -= have;
|
||||
buf += have;
|
||||
}
|
||||
|
||||
// Read another frame.
|
||||
readFrame();
|
||||
|
||||
// TODO(dreiss): Should we warn when reads cross frames?
|
||||
|
||||
// Hand over whatever we have.
|
||||
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
|
||||
memcpy(buf, rBase_, give);
|
||||
rBase_ += give;
|
||||
want -= give;
|
||||
|
||||
return (len - want);
|
||||
}
|
||||
|
||||
void TFramedTransport::readFrame() {
|
||||
// TODO(dreiss): Think about using readv here, even though it would
|
||||
// result in (gasp) read-ahead.
|
||||
|
||||
// Read the size of the next frame.
|
||||
int32_t sz;
|
||||
transport_->readAll((uint8_t*)&sz, sizeof(sz));
|
||||
sz = ntohl(sz);
|
||||
|
||||
if (sz < 0) {
|
||||
throw TTransportException("Frame size has negative value");
|
||||
}
|
||||
|
||||
// Read the frame payload, and reset markers.
|
||||
if (sz > static_cast<int32_t>(rBufSize_)) {
|
||||
rBuf_.reset(new uint8_t[sz]);
|
||||
rBufSize_ = sz;
|
||||
}
|
||||
transport_->readAll(rBuf_.get(), sz);
|
||||
setReadBuffer(rBuf_.get(), sz);
|
||||
}
|
||||
|
||||
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
|
||||
// Double buffer size until sufficient.
|
||||
uint32_t have = wBase_ - wBuf_.get();
|
||||
while (wBufSize_ < len + have) {
|
||||
wBufSize_ *= 2;
|
||||
}
|
||||
|
||||
// TODO(dreiss): Consider modifying this class to use malloc/free
|
||||
// so we can use realloc here.
|
||||
|
||||
// Allocate new buffer.
|
||||
uint8_t* new_buf = new uint8_t[wBufSize_];
|
||||
|
||||
// Copy the old buffer to the new one.
|
||||
memcpy(new_buf, wBuf_.get(), have);
|
||||
|
||||
// Now point buf to the new one.
|
||||
wBuf_.reset(new_buf);
|
||||
wBase_ = wBuf_.get() + have;
|
||||
wBound_ = wBuf_.get() + wBufSize_;
|
||||
|
||||
// Copy the data into the new buffer.
|
||||
memcpy(wBase_, buf, len);
|
||||
wBase_ += len;
|
||||
}
|
||||
|
||||
void TFramedTransport::flush() {
|
||||
int32_t sz_hbo, sz_nbo;
|
||||
assert(wBufSize_ > sizeof(sz_nbo));
|
||||
|
||||
// Slip the frame size into the start of the buffer.
|
||||
sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
|
||||
sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
|
||||
memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
|
||||
|
||||
// Write size and frame body.
|
||||
transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
|
||||
|
||||
// Reset our pointers.
|
||||
wBase_ = wBuf_.get();
|
||||
|
||||
// Pad the buffer so we can insert the size later.
|
||||
uint32_t pad = 0;
|
||||
this->write((uint8_t*)&pad, sizeof(pad));
|
||||
|
||||
// Flush the underlying transport.
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
|
||||
// Don't try to be clever with shifting buffers.
|
||||
// If the fast path failed let the protocol use its slow path.
|
||||
// Besides, who is going to try to borrow across messages?
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
|
||||
// Correct rBound_ so we can use the fast path in the future.
|
||||
rBound_ = wBase_;
|
||||
|
||||
// Decide how much to give.
|
||||
uint32_t give = std::min(len, available_read());
|
||||
|
||||
*out_start = rBase_;
|
||||
*out_give = give;
|
||||
|
||||
// Preincrement rBase_ so the caller doesn't have to.
|
||||
rBase_ += give;
|
||||
}
|
||||
|
||||
uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
|
||||
uint8_t* start;
|
||||
uint32_t give;
|
||||
computeRead(len, &start, &give);
|
||||
|
||||
// Copy into the provided buffer.
|
||||
memcpy(buf, start, give);
|
||||
|
||||
return give;
|
||||
}
|
||||
|
||||
uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
|
||||
// Don't get some stupid assertion failure.
|
||||
if (buffer_ == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* start;
|
||||
uint32_t give;
|
||||
computeRead(len, &start, &give);
|
||||
|
||||
// Append to the provided string.
|
||||
str.append((char*)start, give);
|
||||
|
||||
return give;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::ensureCanWrite(uint32_t len) {
|
||||
// Check available space
|
||||
uint32_t avail = available_write();
|
||||
if (len <= avail) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!owner_) {
|
||||
throw TTransportException("Insufficient space in external MemoryBuffer");
|
||||
}
|
||||
|
||||
// Grow the buffer as necessary.
|
||||
while (len > avail) {
|
||||
bufferSize_ *= 2;
|
||||
wBound_ = buffer_ + bufferSize_;
|
||||
avail = available_write();
|
||||
}
|
||||
|
||||
// Allocate into a new pointer so we don't bork ours if it fails.
|
||||
void* new_buffer = std::realloc(buffer_, bufferSize_);
|
||||
if (new_buffer == NULL) {
|
||||
throw TTransportException("Out of memory.");
|
||||
}
|
||||
|
||||
ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
|
||||
buffer_ += offset;
|
||||
rBase_ += offset;
|
||||
rBound_ += offset;
|
||||
wBase_ += offset;
|
||||
wBound_ += offset;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
|
||||
ensureCanWrite(len);
|
||||
|
||||
// Copy into the buffer and increment wBase_.
|
||||
memcpy(wBase_, buf, len);
|
||||
wBase_ += len;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::wroteBytes(uint32_t len) {
|
||||
uint32_t avail = available_write();
|
||||
if (len > avail) {
|
||||
throw TTransportException("Client wrote more bytes than size of buffer.");
|
||||
}
|
||||
wBase_ += len;
|
||||
}
|
||||
|
||||
const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
|
||||
rBound_ = wBase_;
|
||||
if (available_read() >= *len) {
|
||||
*len = available_read();
|
||||
return rBase_;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
}}} // facebook::thrift::transport
|
664
lib/cpp/src/transport/TBufferTransports.h
Normal file
664
lib/cpp/src/transport/TBufferTransports.h
Normal file
@ -0,0 +1,664 @@
|
||||
// Copyright (c) 2006- Facebook
|
||||
// Distributed under the Thrift Software License
|
||||
//
|
||||
// See accompanying file LICENSE or visit the Thrift site at:
|
||||
// http://developers.facebook.com/thrift/
|
||||
|
||||
#ifndef _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_
|
||||
#define _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_ 1
|
||||
|
||||
#include "boost/scoped_array.hpp"
|
||||
|
||||
#include <transport/TTransport.h>
|
||||
|
||||
#ifdef __GNUC__
|
||||
#define TDB_LIKELY(val) (__builtin_expect((val), 1))
|
||||
#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
|
||||
#else
|
||||
#define TDB_LIKELY(val) (val)
|
||||
#define TDB_UNLIKELY(val) (val)
|
||||
#endif
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
|
||||
/**
|
||||
* Base class for all transports that use read/write buffers for performance.
|
||||
*
|
||||
* TBufferBase is designed to implement the fast-path "memcpy" style
|
||||
* operations that work in the common case. It does so with small and
|
||||
* (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract
|
||||
* class. Subclasses are expected to define the "slow path" operations
|
||||
* that have to be done when the buffers are full or empty.
|
||||
*
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TBufferBase : public TTransport {
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* Fast-path read.
|
||||
*
|
||||
* When we have enough data buffered to fulfill the read, we can satisfy it
|
||||
* with a single memcpy, then adjust our internal pointers. If the buffer
|
||||
* is empty, we call out to our slow path, implemented by a subclass.
|
||||
* This method is meant to eventually be nonvirtual and inlinable.
|
||||
*/
|
||||
uint32_t read(uint8_t* buf, uint32_t len) {
|
||||
uint8_t* new_rBase = rBase_ + len;
|
||||
if (TDB_LIKELY(new_rBase <= rBound_)) {
|
||||
memcpy(buf, rBase_, len);
|
||||
rBase_ = new_rBase;
|
||||
return len;
|
||||
}
|
||||
return readSlow(buf, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fast-path write.
|
||||
*
|
||||
* When we have enough empty space in our buffer to accomodate the write, we
|
||||
* can satisfy it with a single memcpy, then adjust our internal pointers.
|
||||
* If the buffer is full, we call out to our slow path, implemented by a
|
||||
* subclass. This method is meant to eventually be nonvirtual and
|
||||
* inlinable.
|
||||
*/
|
||||
void write(const uint8_t* buf, uint32_t len) {
|
||||
uint8_t* new_wBase = wBase_ + len;
|
||||
if (TDB_LIKELY(new_wBase <= wBound_)) {
|
||||
memcpy(wBase_, buf, len);
|
||||
wBase_ = new_wBase;
|
||||
return;
|
||||
}
|
||||
writeSlow(buf, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fast-path borrow. A lot like the fast-path read.
|
||||
*/
|
||||
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
|
||||
if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
|
||||
// With strict aliasing, writing to len shouldn't force us to
|
||||
// refetch rBase_ from memory. TODO(dreiss): Verify this.
|
||||
*len = rBound_ - rBase_;
|
||||
return rBase_;
|
||||
}
|
||||
return borrowSlow(buf, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Consume doesn't require a slow path.
|
||||
*/
|
||||
void consume(uint32_t len) {
|
||||
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
|
||||
rBase_ += len;
|
||||
} else {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"consume did not follow a borrow.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected:
|
||||
|
||||
/// Slow path read.
|
||||
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
|
||||
|
||||
/// Slow path read.
|
||||
virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
|
||||
|
||||
/**
|
||||
* Slow path borrow.
|
||||
*
|
||||
* POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
|
||||
*/
|
||||
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
|
||||
|
||||
/**
|
||||
* Trivial constructor.
|
||||
*
|
||||
* Initialize pointers safely. Constructing is not a very
|
||||
* performance-sensitive operation, so it is okay to just leave it to
|
||||
* the concrete class to set up pointers correctly.
|
||||
*/
|
||||
TBufferBase()
|
||||
: rBase_(NULL)
|
||||
, rBound_(NULL)
|
||||
, wBase_(NULL)
|
||||
, wBound_(NULL)
|
||||
{}
|
||||
|
||||
/// Convenience mutator for setting the read buffer.
|
||||
void setReadBuffer(uint8_t* buf, uint32_t len) {
|
||||
rBase_ = buf;
|
||||
rBound_ = buf+len;
|
||||
}
|
||||
|
||||
/// Convenience mutator for setting the read buffer.
|
||||
void setWriteBuffer(uint8_t* buf, uint32_t len) {
|
||||
wBase_ = buf;
|
||||
wBound_ = buf+len;
|
||||
}
|
||||
|
||||
virtual ~TBufferBase() {}
|
||||
|
||||
/// Reads begin here.
|
||||
uint8_t* rBase_;
|
||||
/// Reads may extend to just before here.
|
||||
uint8_t* rBound_;
|
||||
|
||||
/// Writes begin here.
|
||||
uint8_t* wBase_;
|
||||
/// Writes may extend to just before here.
|
||||
uint8_t* wBound_;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Buffered transport. For reads it will read more data than is requested
|
||||
* and will serve future data out of a local buffer. For writes, data is
|
||||
* stored to an in memory buffer before being written out.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TBufferedTransport : public TBufferBase {
|
||||
public:
|
||||
|
||||
static const int DEFAULT_BUFFER_SIZE = 512;
|
||||
|
||||
/// Use default buffer sizes.
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport)
|
||||
: transport_(transport)
|
||||
, rBufSize_(DEFAULT_BUFFER_SIZE)
|
||||
, wBufSize_(DEFAULT_BUFFER_SIZE)
|
||||
, rBuf_(new uint8_t[rBufSize_])
|
||||
, wBuf_(new uint8_t[wBufSize_])
|
||||
{
|
||||
initPointers();
|
||||
}
|
||||
|
||||
/// Use specified buffer sizes.
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
|
||||
: transport_(transport)
|
||||
, rBufSize_(sz)
|
||||
, wBufSize_(sz)
|
||||
, rBuf_(new uint8_t[rBufSize_])
|
||||
, wBuf_(new uint8_t[wBufSize_])
|
||||
{
|
||||
initPointers();
|
||||
}
|
||||
|
||||
/// Use specified read and write buffer sizes.
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
|
||||
: transport_(transport)
|
||||
, rBufSize_(rsz)
|
||||
, wBufSize_(rsz)
|
||||
, rBuf_(new uint8_t[rBufSize_])
|
||||
, wBuf_(new uint8_t[wBufSize_])
|
||||
{
|
||||
initPointers();
|
||||
}
|
||||
|
||||
void open() {
|
||||
transport_->open();
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return transport_->isOpen();
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
if (rBase_ == rBound_) {
|
||||
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
|
||||
}
|
||||
return (rBound_ > rBase_);
|
||||
}
|
||||
|
||||
void close() {
|
||||
flush();
|
||||
transport_->close();
|
||||
}
|
||||
|
||||
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
|
||||
|
||||
virtual void writeSlow(const uint8_t* buf, uint32_t len);
|
||||
|
||||
void flush();
|
||||
|
||||
|
||||
/**
|
||||
* The following behavior is currently implemented by TBufferedTransport,
|
||||
* but that may change in a future version:
|
||||
* 1/ If len is at most rBufSize_, borrow will never return NULL.
|
||||
* Depending on the underlying transport, it could throw an exception
|
||||
* or hang forever.
|
||||
* 2/ Some borrow requests may copy bytes internally. However,
|
||||
* if len is at most rBufSize_/2, none of the copied bytes
|
||||
* will ever have to be copied again. For optimial performance,
|
||||
* stay under this limit.
|
||||
*/
|
||||
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
return transport_;
|
||||
}
|
||||
|
||||
protected:
|
||||
void initPointers() {
|
||||
setReadBuffer(rBuf_.get(), 0);
|
||||
setWriteBuffer(wBuf_.get(), wBufSize_);
|
||||
// Write size never changes.
|
||||
}
|
||||
|
||||
boost::shared_ptr<TTransport> transport_;
|
||||
|
||||
uint32_t rBufSize_;
|
||||
uint32_t wBufSize_;
|
||||
boost::scoped_array<uint8_t> rBuf_;
|
||||
boost::scoped_array<uint8_t> wBuf_;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Wraps a transport into a buffered one.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TBufferedTransportFactory : public TTransportFactory {
|
||||
public:
|
||||
TBufferedTransportFactory() {}
|
||||
|
||||
virtual ~TBufferedTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Wraps the transport into a buffered one.
|
||||
*/
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Framed transport. All writes go into an in-memory buffer until flush is
|
||||
* called, at which point the transport writes the length of the entire
|
||||
* binary chunk followed by the data payload. This allows the receiver on the
|
||||
* other end to always do fixed-length reads.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TFramedTransport : public TBufferBase {
|
||||
public:
|
||||
|
||||
static const int DEFAULT_BUFFER_SIZE = 512;
|
||||
|
||||
/// Use default buffer sizes.
|
||||
TFramedTransport(boost::shared_ptr<TTransport> transport)
|
||||
: transport_(transport)
|
||||
, rBufSize_(0)
|
||||
, wBufSize_(DEFAULT_BUFFER_SIZE)
|
||||
, rBuf_()
|
||||
, wBuf_(new uint8_t[wBufSize_])
|
||||
{
|
||||
initPointers();
|
||||
}
|
||||
|
||||
TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
|
||||
: transport_(transport)
|
||||
, rBufSize_(0)
|
||||
, wBufSize_(sz)
|
||||
, rBuf_()
|
||||
, wBuf_(new uint8_t[wBufSize_])
|
||||
{
|
||||
initPointers();
|
||||
}
|
||||
|
||||
void open() {
|
||||
transport_->open();
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return transport_->isOpen();
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
return (rBase_ < rBound_) || transport_->peek();
|
||||
}
|
||||
|
||||
void close() {
|
||||
flush();
|
||||
transport_->close();
|
||||
}
|
||||
|
||||
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
|
||||
|
||||
virtual void writeSlow(const uint8_t* buf, uint32_t len);
|
||||
|
||||
virtual void flush();
|
||||
|
||||
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
return transport_;
|
||||
}
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Reads a frame of input from the underlying stream.
|
||||
*/
|
||||
void readFrame();
|
||||
|
||||
void initPointers() {
|
||||
setReadBuffer(NULL, 0);
|
||||
setWriteBuffer(wBuf_.get(), wBufSize_);
|
||||
|
||||
// Pad the buffer so we can insert the size later.
|
||||
int32_t pad = 0;
|
||||
this->write((uint8_t*)&pad, sizeof(pad));
|
||||
}
|
||||
|
||||
boost::shared_ptr<TTransport> transport_;
|
||||
|
||||
uint32_t rBufSize_;
|
||||
uint32_t wBufSize_;
|
||||
boost::scoped_array<uint8_t> rBuf_;
|
||||
boost::scoped_array<uint8_t> wBuf_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Wraps a transport into a framed one.
|
||||
*
|
||||
* @author Dave Simpson <dave@powerset.com>
|
||||
*/
|
||||
class TFramedTransportFactory : public TTransportFactory {
|
||||
public:
|
||||
TFramedTransportFactory() {}
|
||||
|
||||
virtual ~TFramedTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Wraps the transport into a framed one.
|
||||
*/
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* A memory buffer is a tranpsort that simply reads from and writes to an
|
||||
* in memory buffer. Anytime you call write on it, the data is simply placed
|
||||
* into a buffer, and anytime you call read, data is read from that buffer.
|
||||
*
|
||||
* The buffers are allocated using C constructs malloc,realloc, and the size
|
||||
* doubles as necessary. We've considered using scoped
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TMemoryBuffer : public TBufferBase {
|
||||
private:
|
||||
|
||||
// Common initialization done by all constructors.
|
||||
void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
|
||||
if (buf == NULL && size != 0) {
|
||||
assert(owner);
|
||||
buf = (uint8_t*)std::malloc(size);
|
||||
if (buf == NULL) {
|
||||
throw TTransportException("Out of memory");
|
||||
}
|
||||
}
|
||||
|
||||
buffer_ = buf;
|
||||
bufferSize_ = size;
|
||||
|
||||
rBase_ = buffer_;
|
||||
rBound_ = buffer_ + wPos;
|
||||
// TODO(dreiss): Investigate NULL-ing this if !owner.
|
||||
wBase_ = buffer_ + wPos;
|
||||
wBound_ = buffer_ + bufferSize_;
|
||||
|
||||
owner_ = owner;
|
||||
|
||||
// rBound_ is really an artifact. In principle, it should always be
|
||||
// equal to wBase_. We update it in a few places (computeRead, etc.).
|
||||
}
|
||||
|
||||
public:
|
||||
static const uint32_t defaultSize = 1024;
|
||||
|
||||
/**
|
||||
* This enum specifies how a TMemoryBuffer should treat
|
||||
* memory passed to it via constructors or resetBuffer.
|
||||
*
|
||||
* OBSERVE:
|
||||
* TMemoryBuffer will simply store a pointer to the memory.
|
||||
* It is the callers responsibility to ensure that the pointer
|
||||
* remains valid for the lifetime of the TMemoryBuffer,
|
||||
* and that it is properly cleaned up.
|
||||
* Note that no data can be written to observed buffers.
|
||||
*
|
||||
* COPY:
|
||||
* TMemoryBuffer will make an internal copy of the buffer.
|
||||
* The caller has no responsibilities.
|
||||
*
|
||||
* TAKE_OWNERSHIP:
|
||||
* TMemoryBuffer will become the "owner" of the buffer,
|
||||
* and will be responsible for freeing it.
|
||||
* The membory must have been allocated with malloc.
|
||||
*/
|
||||
enum MemoryPolicy {
|
||||
OBSERVE = 1,
|
||||
COPY = 2,
|
||||
TAKE_OWNERSHIP = 3,
|
||||
};
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with a default-sized buffer,
|
||||
* owned by the TMemoryBuffer object.
|
||||
*/
|
||||
TMemoryBuffer() {
|
||||
initCommon(NULL, defaultSize, true, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with a buffer of a specified size,
|
||||
* owned by the TMemoryBuffer object.
|
||||
*
|
||||
* @param sz The initial size of the buffer.
|
||||
*/
|
||||
TMemoryBuffer(uint32_t sz) {
|
||||
initCommon(NULL, sz, true, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with buf as its initial contents.
|
||||
*
|
||||
* @param buf The initial contents of the buffer.
|
||||
* Note that, while buf is a non-const pointer,
|
||||
* TMemoryBuffer will not write to it if policy == OBSERVE,
|
||||
* so it is safe to const_cast<uint8_t*>(whatever).
|
||||
* @param sz The size of @c buf.
|
||||
* @param policy See @link MemoryPolicy @endlink .
|
||||
*/
|
||||
TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
||||
if (buf == NULL && sz != 0) {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"TMemoryBuffer given null buffer with non-zero size.");
|
||||
}
|
||||
|
||||
switch (policy) {
|
||||
case OBSERVE:
|
||||
case TAKE_OWNERSHIP:
|
||||
initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
|
||||
break;
|
||||
case COPY:
|
||||
initCommon(NULL, sz, true, 0);
|
||||
this->write(buf, sz);
|
||||
break;
|
||||
default:
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"Invalid MemoryPolicy for TMemoryBuffer");
|
||||
}
|
||||
}
|
||||
|
||||
~TMemoryBuffer() {
|
||||
if (owner_) {
|
||||
std::free(buffer_);
|
||||
}
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
return (rBase_ < wBase_);
|
||||
}
|
||||
|
||||
void open() {}
|
||||
|
||||
void close() {}
|
||||
|
||||
// TODO(dreiss): Make bufPtr const.
|
||||
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
|
||||
*bufPtr = rBase_;
|
||||
*sz = wBase_ - rBase_;
|
||||
}
|
||||
|
||||
std::string getBufferAsString() {
|
||||
if (buffer_ == NULL) {
|
||||
return "";
|
||||
}
|
||||
uint8_t* buf;
|
||||
uint32_t sz;
|
||||
getBuffer(&buf, &sz);
|
||||
return std::string((char*)buf, (std::string::size_type)sz);
|
||||
}
|
||||
|
||||
void appendBufferToString(std::string& str) {
|
||||
if (buffer_ == NULL) {
|
||||
return;
|
||||
}
|
||||
uint8_t* buf;
|
||||
uint32_t sz;
|
||||
getBuffer(&buf, &sz);
|
||||
str.append((char*)buf, sz);
|
||||
}
|
||||
|
||||
void resetBuffer() {
|
||||
rBase_ = buffer_;
|
||||
rBound_ = buffer_;
|
||||
wBase_ = buffer_;
|
||||
// It isn't safe to write into a buffer we don't own.
|
||||
if (!owner_) {
|
||||
wBound_ = wBase_;
|
||||
bufferSize_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// See constructor documentation.
|
||||
void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
||||
// Use a variant of the copy-and-swap trick for assignment operators.
|
||||
// This is sub-optimal in terms of performance for two reasons:
|
||||
// 1/ The constructing and swapping of the (small) values
|
||||
// in the temporary object takes some time, and is not necessary.
|
||||
// 2/ If policy == COPY, we allocate the new buffer before
|
||||
// freeing the old one, precluding the possibility of
|
||||
// reusing that memory.
|
||||
// I doubt that either of these problems could be optimized away,
|
||||
// but the second is probably no a common case, and the first is minor.
|
||||
// I don't expect resetBuffer to be a common operation, so I'm willing to
|
||||
// bite the performance bullet to make the method this simple.
|
||||
|
||||
// Construct the new buffer.
|
||||
TMemoryBuffer new_buffer(buf, sz, policy);
|
||||
// Move it into ourself.
|
||||
this->swap(new_buffer);
|
||||
// Our old self gets destroyed.
|
||||
}
|
||||
|
||||
std::string readAsString(uint32_t len) {
|
||||
std::string str;
|
||||
(void)readAppendToString(str, len);
|
||||
return str;
|
||||
}
|
||||
|
||||
uint32_t readAppendToString(std::string& str, uint32_t len);
|
||||
|
||||
void readEnd() {
|
||||
if (rBase_ == wBase_) {
|
||||
resetBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t available_read() const {
|
||||
// Remember, wBase_ is the real rBound_.
|
||||
return wBase_ - rBase_;
|
||||
}
|
||||
|
||||
uint32_t available_write() const {
|
||||
return wBound_ - wBase_;
|
||||
}
|
||||
|
||||
// Returns a pointer to where the client can write data to append to
|
||||
// the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
|
||||
// write of the provided length. The returned pointer is very convenient for
|
||||
// passing to read(), recv(), or similar. You must call wroteBytes() as soon
|
||||
// as data is written or the buffer will not be aware that data has changed.
|
||||
uint8_t* getWritePtr(uint32_t len) {
|
||||
ensureCanWrite(len);
|
||||
return wBase_;
|
||||
}
|
||||
|
||||
// Informs the buffer that the client has written 'len' bytes into storage
|
||||
// that had been provided by getWritePtr().
|
||||
void wroteBytes(uint32_t len);
|
||||
|
||||
protected:
|
||||
void swap(TMemoryBuffer& that) {
|
||||
using std::swap;
|
||||
swap(buffer_, that.buffer_);
|
||||
swap(bufferSize_, that.bufferSize_);
|
||||
|
||||
swap(rBase_, that.rBase_);
|
||||
swap(rBound_, that.rBound_);
|
||||
swap(wBase_, that.wBase_);
|
||||
swap(wBound_, that.wBound_);
|
||||
|
||||
swap(owner_, that.owner_);
|
||||
}
|
||||
|
||||
// Make sure there's at least 'len' bytes available for writing.
|
||||
void ensureCanWrite(uint32_t len);
|
||||
|
||||
// Compute the position and available data for reading.
|
||||
void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
|
||||
|
||||
uint32_t readSlow(uint8_t* buf, uint32_t len);
|
||||
|
||||
void writeSlow(const uint8_t* buf, uint32_t len);
|
||||
|
||||
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
// Data buffer
|
||||
uint8_t* buffer_;
|
||||
|
||||
// Allocated buffer size
|
||||
uint32_t bufferSize_;
|
||||
|
||||
// Is this object the owner of the buffer?
|
||||
bool owner_;
|
||||
|
||||
// Don't forget to update constrctors, initCommon, and swap if
|
||||
// you add new members.
|
||||
};
|
||||
|
||||
}}} // facebook::thrift::transport
|
||||
|
||||
#endif // #ifndef _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_
|
@ -71,7 +71,7 @@ THttpClient::~THttpClient() {
|
||||
}
|
||||
|
||||
uint32_t THttpClient::read(uint8_t* buf, uint32_t len) {
|
||||
if (readBuffer_.available() == 0) {
|
||||
if (readBuffer_.available_read() == 0) {
|
||||
readBuffer_.resetBuffer();
|
||||
uint32_t got = readMoreData();
|
||||
if (got == 0) {
|
||||
|
@ -7,7 +7,7 @@
|
||||
#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
|
||||
#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
|
||||
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
|
84
lib/cpp/src/transport/TShortReadTransport.h
Normal file
84
lib/cpp/src/transport/TShortReadTransport.h
Normal file
@ -0,0 +1,84 @@
|
||||
// Copyright (c) 2006- Facebook
|
||||
// Distributed under the Thrift Software License
|
||||
//
|
||||
// See accompanying file LICENSE or visit the Thrift site at:
|
||||
// http://developers.facebook.com/thrift/
|
||||
|
||||
#ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_
|
||||
#define _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ 1
|
||||
|
||||
#include <cstdlib>
|
||||
|
||||
#include <transport/TTransport.h>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport { namespace test {
|
||||
|
||||
/**
|
||||
* This class is only meant for testing. It wraps another transport.
|
||||
* Calls to read are passed through with some probability. Otherwise,
|
||||
* the read amount is randomly reduced before being passed through.
|
||||
*
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TShortReadTransport : public TTransport {
|
||||
public:
|
||||
TShortReadTransport(boost::shared_ptr<TTransport> transport, double full_prob)
|
||||
: transport_(transport)
|
||||
, fullProb_(full_prob)
|
||||
{}
|
||||
|
||||
bool isOpen() {
|
||||
return transport_->isOpen();
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
return transport_->peek();
|
||||
}
|
||||
|
||||
void open() {
|
||||
transport_->open();
|
||||
}
|
||||
|
||||
void close() {
|
||||
transport_->close();
|
||||
}
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len) {
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (rand()/(double)RAND_MAX >= fullProb_) {
|
||||
len = 1 + rand()%len;
|
||||
}
|
||||
return transport_->read(buf, len);
|
||||
}
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len) {
|
||||
transport_->write(buf, len);
|
||||
}
|
||||
|
||||
void flush() {
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
|
||||
return transport_->borrow(buf, len);
|
||||
}
|
||||
|
||||
void consume(uint32_t len) {
|
||||
return transport_->consume(len);
|
||||
}
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
return transport_;
|
||||
}
|
||||
|
||||
protected:
|
||||
boost::shared_ptr<TTransport> transport_;
|
||||
double fullProb_;
|
||||
};
|
||||
|
||||
}}}} // facebook::thrift::transport::test
|
||||
|
||||
#endif // #ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_
|
@ -10,348 +10,6 @@ using std::string;
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
|
||||
uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
|
||||
uint32_t need = len;
|
||||
|
||||
// We don't have enough data yet
|
||||
if (rLen_-rPos_ < need) {
|
||||
// Copy out whatever we have
|
||||
if (rLen_-rPos_ > 0) {
|
||||
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
|
||||
need -= rLen_-rPos_;
|
||||
buf += rLen_-rPos_;
|
||||
}
|
||||
// Get more from underlying transport up to buffer size
|
||||
rLen_ = transport_->read(rBuf_, rBufSize_);
|
||||
rPos_ = 0;
|
||||
}
|
||||
|
||||
// Hand over whatever we have
|
||||
uint32_t give = need;
|
||||
if (rLen_-rPos_ < give) {
|
||||
give = rLen_-rPos_;
|
||||
}
|
||||
memcpy(buf, rBuf_+rPos_, give);
|
||||
rPos_ += give;
|
||||
need -= give;
|
||||
return (len - need);
|
||||
}
|
||||
|
||||
void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint32_t pos = 0;
|
||||
|
||||
while ((len-pos) + wLen_ >= wBufSize_) {
|
||||
uint32_t copy = wBufSize_ - wLen_;
|
||||
memcpy(wBuf_ + wLen_, buf + pos, copy);
|
||||
|
||||
transport_->write(wBuf_, wBufSize_);
|
||||
pos += copy;
|
||||
wLen_ = 0;
|
||||
}
|
||||
|
||||
if ((len - pos) > 0) {
|
||||
memcpy(wBuf_ + wLen_, buf + pos, len - pos);
|
||||
wLen_ += len - pos;
|
||||
}
|
||||
}
|
||||
|
||||
const uint8_t* TBufferedTransport::borrow(uint8_t* buf, uint32_t* len) {
|
||||
// The number of additional bytes we need from the underlying transport.
|
||||
// Could be zero or negative.
|
||||
uint32_t need = *len - (rLen_-rPos_);
|
||||
|
||||
// If we have enough data, just hand over a pointer.
|
||||
if (need <= 0) {
|
||||
*len = rLen_-rPos_;
|
||||
return rBuf_+rPos_;
|
||||
}
|
||||
|
||||
// If the request is bigger than our buffer, we are hosed.
|
||||
if (*len > rBufSize_) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// If we have less than half our buffer available,
|
||||
// or we need more space than is in the buffer,
|
||||
// shift the data we have down to the start.
|
||||
if ((rLen_ > rBufSize_/2) || (rLen_+need > rBufSize_)) {
|
||||
memmove(rBuf_, rBuf_+rPos_, rLen_-rPos_);
|
||||
rLen_ -= rPos_;
|
||||
rPos_ = 0;
|
||||
}
|
||||
|
||||
// First try to fill up the buffer.
|
||||
uint32_t got = transport_->read(rBuf_+rLen_, rBufSize_-rLen_);
|
||||
rLen_ += got;
|
||||
need -= got;
|
||||
|
||||
// If that fails, readAll until we get what we need.
|
||||
if (need > 0) {
|
||||
rLen_ += transport_->readAll(rBuf_+rLen_, need);
|
||||
}
|
||||
|
||||
*len = rLen_-rPos_;
|
||||
return rBuf_+rPos_;
|
||||
}
|
||||
|
||||
void TBufferedTransport::consume(uint32_t len) {
|
||||
if (rLen_-rPos_ >= len) {
|
||||
rPos_ += len;
|
||||
} else {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"consume did not follow a borrow.");
|
||||
}
|
||||
}
|
||||
|
||||
void TBufferedTransport::flush() {
|
||||
// Write out any data waiting in the write buffer
|
||||
if (wLen_ > 0) {
|
||||
transport_->write(wBuf_, wLen_);
|
||||
wLen_ = 0;
|
||||
}
|
||||
|
||||
// Flush the underlying transport
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
|
||||
if (!read_) {
|
||||
return transport_->read(buf, len);
|
||||
}
|
||||
|
||||
uint32_t need = len;
|
||||
|
||||
// We don't have enough data yet
|
||||
if (rLen_-rPos_ < need) {
|
||||
// Copy out whatever we have
|
||||
if (rLen_-rPos_ > 0) {
|
||||
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
|
||||
need -= rLen_-rPos_;
|
||||
buf += rLen_-rPos_;
|
||||
}
|
||||
|
||||
// Read another chunk
|
||||
readFrame();
|
||||
}
|
||||
|
||||
// Hand over whatever we have
|
||||
uint32_t give = need;
|
||||
if (rLen_-rPos_ < give) {
|
||||
give = rLen_-rPos_;
|
||||
}
|
||||
memcpy(buf, rBuf_+rPos_, give);
|
||||
rPos_ += give;
|
||||
need -= give;
|
||||
return (len - need);
|
||||
}
|
||||
|
||||
void TFramedTransport::readFrame() {
|
||||
// Get rid of the old frame
|
||||
if (rBuf_ != NULL) {
|
||||
delete [] rBuf_;
|
||||
rBuf_ = NULL;
|
||||
}
|
||||
|
||||
// Read in the next chunk size
|
||||
int32_t sz;
|
||||
transport_->readAll((uint8_t*)&sz, 4);
|
||||
sz = (int32_t)ntohl(sz);
|
||||
|
||||
if (sz < 0) {
|
||||
throw TTransportException("Frame size has negative value");
|
||||
}
|
||||
|
||||
// Read the frame payload, reset markers
|
||||
rBuf_ = new uint8_t[sz];
|
||||
transport_->readAll(rBuf_, sz);
|
||||
rPos_ = 0;
|
||||
rLen_ = sz;
|
||||
}
|
||||
|
||||
void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Shortcut out if not write mode
|
||||
if (!write_) {
|
||||
transport_->write(buf, len);
|
||||
return;
|
||||
}
|
||||
|
||||
// Need to grow the buffer
|
||||
if (len + wLen_ >= wBufSize_) {
|
||||
|
||||
// Double buffer size until sufficient
|
||||
while (wBufSize_ < len + wLen_) {
|
||||
wBufSize_ *= 2;
|
||||
}
|
||||
|
||||
// Allocate new buffer
|
||||
uint8_t* wBuf2 = new uint8_t[wBufSize_];
|
||||
|
||||
// Copy the old buffer to the new one
|
||||
memcpy(wBuf2, wBuf_, wLen_);
|
||||
|
||||
// Now point buf to the new one
|
||||
delete [] wBuf_;
|
||||
wBuf_ = wBuf2;
|
||||
}
|
||||
|
||||
// Copy data into buffer
|
||||
memcpy(wBuf_ + wLen_, buf, len);
|
||||
wLen_ += len;
|
||||
}
|
||||
|
||||
void TFramedTransport::flush() {
|
||||
if (!write_) {
|
||||
transport_->flush();
|
||||
return;
|
||||
}
|
||||
|
||||
// Write frame size
|
||||
int32_t sz = wLen_;
|
||||
sz = (int32_t)htonl(sz);
|
||||
|
||||
transport_->write((const uint8_t*)&sz, 4);
|
||||
|
||||
// Write frame body
|
||||
if (wLen_ > 0) {
|
||||
transport_->write(wBuf_, wLen_);
|
||||
}
|
||||
|
||||
// All done
|
||||
wLen_ = 0;
|
||||
|
||||
// Flush the underlying
|
||||
transport_->flush();
|
||||
}
|
||||
|
||||
const uint8_t* TFramedTransport::borrow(uint8_t* buf, uint32_t* len) {
|
||||
// Don't try to be clever with shifting buffers.
|
||||
// If we have enough data, give a pointer to it,
|
||||
// otherwise let the protcol use its slow path.
|
||||
if (read_ && (rLen_-rPos_ >= *len)) {
|
||||
*len = rLen_-rPos_;
|
||||
return rBuf_+rPos_;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void TFramedTransport::consume(uint32_t len) {
|
||||
if (rLen_-rPos_ >= len) {
|
||||
rPos_ += len;
|
||||
} else {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"consume did not follow a borrow.");
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
|
||||
// Check avaible data for reading
|
||||
uint32_t avail = wPos_ - rPos_;
|
||||
if (avail == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Decide how much to give
|
||||
uint32_t give = len;
|
||||
if (avail < len) {
|
||||
give = avail;
|
||||
}
|
||||
|
||||
// Copy into buffer and increment rPos_
|
||||
memcpy(buf, buffer_ + rPos_, give);
|
||||
rPos_ += give;
|
||||
|
||||
return give;
|
||||
}
|
||||
|
||||
uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
|
||||
// Don't get some stupid assertion failure.
|
||||
if (buffer_ == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check avaible data for reading
|
||||
uint32_t avail = wPos_ - rPos_;
|
||||
if (avail == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Device how much to give
|
||||
uint32_t give = len;
|
||||
if (avail < len) {
|
||||
give = avail;
|
||||
}
|
||||
|
||||
// Reserve memory, copy into string, and increment rPos_
|
||||
str.reserve(str.length()+give);
|
||||
str.append((char*)buffer_ + rPos_, give);
|
||||
rPos_ += give;
|
||||
|
||||
return give;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::ensureCanWrite(uint32_t len) {
|
||||
// Check available space
|
||||
uint32_t avail = bufferSize_ - wPos_;
|
||||
if (len <= avail) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!owner_) {
|
||||
throw TTransportException("Insufficient space in external MemoryBuffer");
|
||||
}
|
||||
|
||||
// Grow the buffer as necessary
|
||||
while (len > avail) {
|
||||
bufferSize_ *= 2;
|
||||
avail = bufferSize_ - wPos_;
|
||||
}
|
||||
buffer_ = (uint8_t*)std::realloc(buffer_, bufferSize_);
|
||||
if (buffer_ == NULL) {
|
||||
throw TTransportException("Out of memory.");
|
||||
}
|
||||
}
|
||||
|
||||
void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
|
||||
ensureCanWrite(len);
|
||||
|
||||
// Copy into the buffer and increment wPos_
|
||||
memcpy(buffer_ + wPos_, buf, len);
|
||||
wPos_ += len;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::wroteBytes(uint32_t len) {
|
||||
uint32_t avail = bufferSize_ - wPos_;
|
||||
if (len > avail) {
|
||||
throw TTransportException("Client wrote more bytes than size of buffer.");
|
||||
}
|
||||
wPos_ += len;
|
||||
}
|
||||
|
||||
const uint8_t* TMemoryBuffer::borrow(uint8_t* buf, uint32_t* len) {
|
||||
if (wPos_-rPos_ >= *len) {
|
||||
*len = wPos_-rPos_;
|
||||
return buffer_ + rPos_;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void TMemoryBuffer::consume(uint32_t len) {
|
||||
if (wPos_-rPos_ >= len) {
|
||||
rPos_ += len;
|
||||
} else {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"consume did not follow a borrow.");
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
|
||||
uint32_t need = len;
|
||||
|
||||
|
@ -12,6 +12,8 @@
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
#include <transport/TTransport.h>
|
||||
// Include the buffered transports that used to be defined here.
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <transport/TFileTransport.h>
|
||||
|
||||
namespace facebook { namespace thrift { namespace transport {
|
||||
@ -43,497 +45,6 @@ class TNullTransport : public TTransport {
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Buffered transport. For reads it will read more data than is requested
|
||||
* and will serve future data out of a local buffer. For writes, data is
|
||||
* stored to an in memory buffer before being written out.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TBufferedTransport : public TTransport {
|
||||
public:
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport) :
|
||||
transport_(transport),
|
||||
rBufSize_(512), rPos_(0), rLen_(0),
|
||||
wBufSize_(512), wLen_(0) {
|
||||
rBuf_ = new uint8_t[rBufSize_];
|
||||
wBuf_ = new uint8_t[wBufSize_];
|
||||
}
|
||||
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
|
||||
transport_(transport),
|
||||
rBufSize_(sz), rPos_(0), rLen_(0),
|
||||
wBufSize_(sz), wLen_(0) {
|
||||
rBuf_ = new uint8_t[rBufSize_];
|
||||
wBuf_ = new uint8_t[wBufSize_];
|
||||
}
|
||||
|
||||
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
|
||||
transport_(transport),
|
||||
rBufSize_(rsz), rPos_(0), rLen_(0),
|
||||
wBufSize_(wsz), wLen_(0) {
|
||||
rBuf_ = new uint8_t[rBufSize_];
|
||||
wBuf_ = new uint8_t[wBufSize_];
|
||||
}
|
||||
|
||||
~TBufferedTransport() {
|
||||
delete [] rBuf_;
|
||||
delete [] wBuf_;
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return transport_->isOpen();
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
if (rPos_ >= rLen_) {
|
||||
rLen_ = transport_->read(rBuf_, rBufSize_);
|
||||
rPos_ = 0;
|
||||
}
|
||||
return (rLen_ > rPos_);
|
||||
}
|
||||
|
||||
void open() {
|
||||
transport_->open();
|
||||
}
|
||||
|
||||
void close() {
|
||||
flush();
|
||||
transport_->close();
|
||||
}
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
|
||||
void flush();
|
||||
|
||||
/**
|
||||
* The following behavior is currently implemented by TBufferedTransport,
|
||||
* but that may change in a future version:
|
||||
* 1/ If len is at most rBufSize_, borrow will never return NULL.
|
||||
* Depending on the underlying transport, it could throw an exception
|
||||
* or hang forever.
|
||||
* 2/ Some borrow requests may copy bytes internally. However,
|
||||
* if len is at most rBufSize_/2, none of the copied bytes
|
||||
* will ever have to be copied again. For optimial performance,
|
||||
* stay under this limit.
|
||||
*/
|
||||
const uint8_t* borrow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
void consume(uint32_t len);
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
return transport_;
|
||||
}
|
||||
|
||||
protected:
|
||||
boost::shared_ptr<TTransport> transport_;
|
||||
uint8_t* rBuf_;
|
||||
uint32_t rBufSize_;
|
||||
uint32_t rPos_;
|
||||
uint32_t rLen_;
|
||||
|
||||
uint8_t* wBuf_;
|
||||
uint32_t wBufSize_;
|
||||
uint32_t wLen_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Wraps a transport into a buffered one.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TBufferedTransportFactory : public TTransportFactory {
|
||||
public:
|
||||
TBufferedTransportFactory() {}
|
||||
|
||||
virtual ~TBufferedTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Wraps the transport into a buffered one.
|
||||
*/
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* Framed transport. All writes go into an in-memory buffer until flush is
|
||||
* called, at which point the transport writes the length of the entire
|
||||
* binary chunk followed by the data payload. This allows the receiver on the
|
||||
* other end to always do fixed-length reads.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
*/
|
||||
class TFramedTransport : public TTransport {
|
||||
public:
|
||||
TFramedTransport(boost::shared_ptr<TTransport> transport) :
|
||||
transport_(transport),
|
||||
rPos_(0),
|
||||
rLen_(0),
|
||||
read_(true),
|
||||
wBufSize_(512),
|
||||
wLen_(0),
|
||||
write_(true) {
|
||||
rBuf_ = NULL;
|
||||
wBuf_ = new uint8_t[wBufSize_];
|
||||
}
|
||||
|
||||
TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
|
||||
transport_(transport),
|
||||
rPos_(0),
|
||||
rLen_(0),
|
||||
read_(true),
|
||||
wBufSize_(sz),
|
||||
wLen_(0),
|
||||
write_(true) {
|
||||
rBuf_ = NULL;
|
||||
wBuf_ = new uint8_t[wBufSize_];
|
||||
}
|
||||
|
||||
~TFramedTransport() {
|
||||
if (rBuf_ != NULL) {
|
||||
delete [] rBuf_;
|
||||
}
|
||||
if (wBuf_ != NULL) {
|
||||
delete [] wBuf_;
|
||||
}
|
||||
}
|
||||
|
||||
void setRead(bool read) {
|
||||
read_ = read;
|
||||
}
|
||||
|
||||
void setWrite(bool write) {
|
||||
write_ = write;
|
||||
}
|
||||
|
||||
void open() {
|
||||
transport_->open();
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return transport_->isOpen();
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
if (rPos_ < rLen_) {
|
||||
return true;
|
||||
}
|
||||
return transport_->peek();
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (wLen_ > 0) {
|
||||
flush();
|
||||
}
|
||||
transport_->close();
|
||||
}
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
|
||||
void flush();
|
||||
|
||||
const uint8_t* borrow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
void consume(uint32_t len);
|
||||
|
||||
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
||||
return transport_;
|
||||
}
|
||||
|
||||
protected:
|
||||
boost::shared_ptr<TTransport> transport_;
|
||||
uint8_t* rBuf_;
|
||||
uint32_t rPos_;
|
||||
uint32_t rLen_;
|
||||
bool read_;
|
||||
|
||||
uint8_t* wBuf_;
|
||||
uint32_t wBufSize_;
|
||||
uint32_t wLen_;
|
||||
bool write_;
|
||||
|
||||
/**
|
||||
* Reads a frame of input from the underlying stream.
|
||||
*/
|
||||
void readFrame();
|
||||
};
|
||||
|
||||
/**
|
||||
* Wraps a transport into a framed one.
|
||||
*
|
||||
* @author Dave Simpson <dave@powerset.com>
|
||||
*/
|
||||
class TFramedTransportFactory : public TTransportFactory {
|
||||
public:
|
||||
TFramedTransportFactory() {}
|
||||
|
||||
virtual ~TFramedTransportFactory() {}
|
||||
|
||||
/**
|
||||
* Wraps the transport into a framed one.
|
||||
*/
|
||||
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
||||
return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* A memory buffer is a tranpsort that simply reads from and writes to an
|
||||
* in memory buffer. Anytime you call write on it, the data is simply placed
|
||||
* into a buffer, and anytime you call read, data is read from that buffer.
|
||||
*
|
||||
* The buffers are allocated using C constructs malloc,realloc, and the size
|
||||
* doubles as necessary.
|
||||
*
|
||||
* @author Mark Slee <mcslee@facebook.com>
|
||||
* @author David Reiss <dreiss@facebook.com>
|
||||
*/
|
||||
class TMemoryBuffer : public TTransport {
|
||||
private:
|
||||
|
||||
// Common initialization done by all constructors.
|
||||
void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
|
||||
if (buf == NULL && size != 0) {
|
||||
assert(owner);
|
||||
buf = (uint8_t*)std::malloc(size);
|
||||
if (buf == NULL) {
|
||||
throw TTransportException("Out of memory");
|
||||
}
|
||||
}
|
||||
|
||||
buffer_ = buf;
|
||||
bufferSize_ = size;
|
||||
owner_ = owner;
|
||||
wPos_ = wPos;
|
||||
rPos_ = 0;
|
||||
}
|
||||
|
||||
// make sure there's at least 'len' bytes available for writing
|
||||
void ensureCanWrite(uint32_t len);
|
||||
|
||||
public:
|
||||
static const uint32_t defaultSize = 1024;
|
||||
|
||||
/**
|
||||
* This enum specifies how a TMemoryBuffer should treat
|
||||
* memory passed to it via constructors or resetBuffer.
|
||||
*
|
||||
* OBSERVE:
|
||||
* TMemoryBuffer will simply store a pointer to the memory.
|
||||
* It is the callers responsibility to ensure that the pointer
|
||||
* remains valid for the lifetime of the TMemoryBuffer,
|
||||
* and that it is properly cleaned up.
|
||||
* Note that no data can be written to observed buffers.
|
||||
*
|
||||
* COPY:
|
||||
* TMemoryBuffer will make an internal copy of the buffer.
|
||||
* The caller has no responsibilities.
|
||||
*
|
||||
* TAKE_OWNERSHIP:
|
||||
* TMemoryBuffer will become the "owner" of the buffer,
|
||||
* and will be responsible for freeing it.
|
||||
* The membory must have been allocated with malloc.
|
||||
*/
|
||||
enum MemoryPolicy {
|
||||
OBSERVE = 1,
|
||||
COPY = 2,
|
||||
TAKE_OWNERSHIP = 3,
|
||||
};
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with a default-sized buffer,
|
||||
* owned by the TMemoryBuffer object.
|
||||
*/
|
||||
TMemoryBuffer() {
|
||||
initCommon(NULL, defaultSize, true, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with a buffer of a specified size,
|
||||
* owned by the TMemoryBuffer object.
|
||||
*
|
||||
* @param sz The initial size of the buffer.
|
||||
*/
|
||||
TMemoryBuffer(uint32_t sz) {
|
||||
initCommon(NULL, sz, true, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a TMemoryBuffer with buf as its initial contents.
|
||||
*
|
||||
* @param buf The initial contents of the buffer.
|
||||
* Note that, while buf is a non-const pointer,
|
||||
* TMemoryBuffer will not write to it if policy == OBSERVE,
|
||||
* so it is safe to const_cast<uint8_t*>(whatever).
|
||||
* @param sz The size of @c buf.
|
||||
* @param policy See @link MemoryPolicy @endlink .
|
||||
*/
|
||||
TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
||||
if (buf == NULL && sz != 0) {
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"TMemoryBuffer given null buffer with non-zero size.");
|
||||
}
|
||||
|
||||
switch (policy) {
|
||||
case OBSERVE:
|
||||
case TAKE_OWNERSHIP:
|
||||
initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
|
||||
break;
|
||||
case COPY:
|
||||
initCommon(NULL, sz, true, 0);
|
||||
this->write(buf, sz);
|
||||
break;
|
||||
default:
|
||||
throw TTransportException(TTransportException::BAD_ARGS,
|
||||
"Invalid MemoryPolicy for TMemoryBuffer");
|
||||
}
|
||||
}
|
||||
|
||||
~TMemoryBuffer() {
|
||||
if (owner_) {
|
||||
std::free(buffer_);
|
||||
buffer_ = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
bool isOpen() {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool peek() {
|
||||
return (rPos_ < wPos_);
|
||||
}
|
||||
|
||||
void open() {}
|
||||
|
||||
void close() {}
|
||||
|
||||
// TODO(dreiss): Make bufPtr const.
|
||||
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
|
||||
*bufPtr = buffer_;
|
||||
*sz = wPos_;
|
||||
}
|
||||
|
||||
std::string getBufferAsString() {
|
||||
if (buffer_ == NULL) {
|
||||
return "";
|
||||
}
|
||||
return std::string((char*)buffer_, (std::string::size_type)wPos_);
|
||||
}
|
||||
|
||||
void appendBufferToString(std::string& str) {
|
||||
if (buffer_ == NULL) {
|
||||
return;
|
||||
}
|
||||
str.append((char*)buffer_, wPos_);
|
||||
}
|
||||
|
||||
void resetBuffer() {
|
||||
wPos_ = 0;
|
||||
rPos_ = 0;
|
||||
// It isn't safe to write into a buffer we don't own.
|
||||
if (!owner_) {
|
||||
bufferSize_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// See constructor documentation.
|
||||
void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
||||
// Use a variant of the copy-and-swap trick for assignment operators.
|
||||
// This is sub-optimal in terms of performance for two reasons:
|
||||
// 1/ The constructing and swapping of the (small) values
|
||||
// in the temporary object takes some time, and is not necessary.
|
||||
// 2/ If policy == COPY, we allocate the new buffer before
|
||||
// freeing the old one, precluding the possibility of
|
||||
// reusing that memory.
|
||||
// I doubt that either of these problems could be optimized away,
|
||||
// but the second is probably no a common case, and the first is minor.
|
||||
// I don't expect resetBuffer to be a common operation, so I'm willing to
|
||||
// bite the performance bullet to make the method this simple.
|
||||
|
||||
// Construct the new buffer.
|
||||
TMemoryBuffer new_buffer(buf, sz, policy);
|
||||
// Move it into ourself.
|
||||
this->swap(new_buffer);
|
||||
// Our old self gets destroyed.
|
||||
}
|
||||
|
||||
uint32_t read(uint8_t* buf, uint32_t len);
|
||||
|
||||
std::string readAsString(uint32_t len) {
|
||||
std::string str;
|
||||
(void)readAppendToString(str, len);
|
||||
return str;
|
||||
}
|
||||
|
||||
uint32_t readAppendToString(std::string& str, uint32_t len);
|
||||
|
||||
void readEnd() {
|
||||
if (rPos_ == wPos_) {
|
||||
resetBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
void write(const uint8_t* buf, uint32_t len);
|
||||
|
||||
uint32_t available() const {
|
||||
return wPos_ - rPos_;
|
||||
}
|
||||
|
||||
const uint8_t* borrow(uint8_t* buf, uint32_t* len);
|
||||
|
||||
void consume(uint32_t len);
|
||||
|
||||
void swap(TMemoryBuffer& that) {
|
||||
using std::swap;
|
||||
swap(buffer_, that.buffer_);
|
||||
swap(bufferSize_, that.bufferSize_);
|
||||
swap(wPos_, that.wPos_);
|
||||
swap(owner_, that.owner_);
|
||||
}
|
||||
|
||||
// Returns a pointer to where the client can write data to append to
|
||||
// the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
|
||||
// write of the provided length. The returned pointer is very convenient for
|
||||
// passing to read(), recv(), or similar. You must call wroteBytes() as soon
|
||||
// as data is written or the buffer will not be aware that data has changed.
|
||||
uint8_t* getWritePtr(uint32_t len) {
|
||||
ensureCanWrite(len);
|
||||
return buffer_ + wPos_;
|
||||
}
|
||||
|
||||
// Informs the buffer that the client has written 'len' bytes into storage
|
||||
// that had been provided by getWritePtr().
|
||||
void wroteBytes(uint32_t len);
|
||||
|
||||
private:
|
||||
// Data buffer
|
||||
uint8_t* buffer_;
|
||||
|
||||
// Allocated buffer size
|
||||
uint32_t bufferSize_;
|
||||
|
||||
// Where the write is at
|
||||
uint32_t wPos_;
|
||||
|
||||
// Where the reader is at
|
||||
uint32_t rPos_;
|
||||
|
||||
// Is this object the owner of the buffer?
|
||||
bool owner_;
|
||||
|
||||
// Don't forget to update constrctors, initCommon, and swap if
|
||||
// you add new members.
|
||||
};
|
||||
|
||||
/**
|
||||
* TPipedTransport. This transport allows piping of a request from one
|
||||
* transport to another either when readEnd() or writeEnd(). The typical
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <iostream>
|
||||
#include <cmath>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <protocol/TBinaryProtocol.h>
|
||||
#include <protocol/TJSONProtocol.h>
|
||||
#include "gen-cpp/DebugProtoTest_types.h"
|
||||
|
@ -21,7 +21,7 @@ g++ -Wall -g -I../lib/cpp/src -I/usr/local/include/boost-1_33_1 \
|
||||
#include "gen-cpp/DebugProtoTest_types.h"
|
||||
#include "gen-cpp/OptionalRequiredTest_types.h"
|
||||
#include <protocol/TDenseProtocol.h>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
|
||||
// Can't use memcmp here. GCC is too smart.
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <iostream>
|
||||
#include <cmath>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <protocol/TJSONProtocol.h>
|
||||
#include "gen-cpp/DebugProtoTest_types.h"
|
||||
|
||||
|
@ -40,7 +40,8 @@ TESTS = \
|
||||
|
||||
UnitTests_SOURCES = \
|
||||
UnitTestMain.cpp \
|
||||
TMemoryBufferTest.cpp
|
||||
TMemoryBufferTest.cpp \
|
||||
TBufferBaseTest.cpp
|
||||
|
||||
UnitTests_LDADD = libtestgencpp.la
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <iostream>
|
||||
#include <protocol/TDebugProtocol.h>
|
||||
#include <protocol/TBinaryProtocol.h>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include "gen-cpp/OptionalRequiredTest_types.h"
|
||||
|
||||
using std::cout;
|
||||
|
589
test/TBufferBaseTest.cpp
Normal file
589
test/TBufferBaseTest.cpp
Normal file
@ -0,0 +1,589 @@
|
||||
#include <algorithm>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <transport/TShortReadTransport.h>
|
||||
|
||||
using std::string;
|
||||
using boost::shared_ptr;
|
||||
using facebook::thrift::transport::TMemoryBuffer;
|
||||
using facebook::thrift::transport::TBufferedTransport;
|
||||
using facebook::thrift::transport::TFramedTransport;
|
||||
using facebook::thrift::transport::test::TShortReadTransport;
|
||||
|
||||
#define foreach BOOST_FOREACH
|
||||
|
||||
// Shamelessly copied from ZlibTransport. TODO: refactor.
|
||||
unsigned int dist[][5000] = {
|
||||
{ 1<<15 },
|
||||
|
||||
{
|
||||
5,13,9,1,8,9,11,13,18,48,24,13,21,13,5,11,35,2,4,20,17,72,27,14,15,4,7,26,
|
||||
12,1,14,9,2,16,29,41,7,24,4,27,14,4,1,4,25,3,6,34,10,8,50,2,14,13,55,29,3,
|
||||
43,53,49,14,4,10,32,27,48,1,3,1,11,5,17,16,51,17,30,15,11,9,2,2,11,52,12,2,
|
||||
13,94,1,19,1,38,2,8,43,8,33,7,30,8,17,22,2,15,14,12,34,2,12,6,37,29,74,3,
|
||||
165,16,11,17,5,14,3,10,7,37,11,24,7,1,3,12,37,8,9,34,17,12,8,21,13,37,1,4,
|
||||
30,14,78,4,15,2,40,37,17,12,36,82,14,4,1,4,7,17,11,16,88,77,2,3,15,3,34,11,
|
||||
5,79,22,34,8,4,4,40,22,24,28,9,13,3,34,27,9,16,39,16,39,13,2,4,3,41,26,10,4,
|
||||
33,4,7,12,5,6,3,10,30,8,21,16,58,19,9,0,47,7,13,11,19,15,7,53,57,2,13,28,22,
|
||||
3,16,9,25,33,12,40,7,12,64,7,14,24,44,9,2,14,11,2,58,1,26,30,11,9,5,24,7,9,
|
||||
94,2,10,21,5,5,4,5,6,179,9,18,2,7,13,31,41,17,4,36,3,21,6,26,8,15,18,44,27,
|
||||
11,9,25,7,0,14,2,12,20,23,13,2,163,9,5,15,65,2,14,6,8,98,11,15,14,34,2,3,10,
|
||||
22,9,92,7,10,32,67,13,3,4,35,8,2,1,5,0,26,381,7,27,8,2,16,93,4,19,5,8,25,9,
|
||||
31,14,4,21,5,3,9,22,56,4,18,3,11,18,6,4,3,40,12,16,110,8,35,14,1,18,40,9,12,
|
||||
14,3,11,7,57,13,18,116,53,19,22,7,16,11,5,8,21,16,1,75,21,20,1,28,2,6,1,7,
|
||||
19,38,5,6,9,9,4,1,7,55,36,62,5,4,4,24,15,1,12,35,48,20,5,17,1,5,26,15,4,54,
|
||||
13,5,5,15,5,19,32,29,31,7,6,40,7,80,11,18,8,128,48,6,12,84,13,4,7,2,13,9,16,
|
||||
17,3,254,1,4,181,8,44,7,6,24,27,9,23,14,34,16,22,25,10,3,3,4,4,12,2,12,6,7,
|
||||
13,58,13,6,11,19,53,11,66,18,19,10,4,13,2,5,49,58,1,67,7,21,64,14,11,14,8,3,
|
||||
26,33,91,31,20,7,9,42,39,4,3,55,11,10,0,7,4,75,8,12,0,27,3,8,9,0,12,12,23,
|
||||
28,23,20,4,13,30,2,22,20,19,30,6,22,2,6,4,24,7,19,55,86,5,33,2,161,6,7,1,62,
|
||||
13,3,72,12,12,9,7,12,10,5,10,29,1,5,22,13,13,5,2,12,3,7,14,18,2,3,46,21,17,
|
||||
15,19,3,27,5,16,45,31,10,8,17,18,18,3,7,24,6,55,9,3,6,12,10,12,8,91,9,4,4,4,
|
||||
27,29,16,5,7,22,43,28,11,14,8,11,28,109,55,71,40,3,8,22,26,15,44,3,25,29,5,
|
||||
3,32,17,12,3,29,27,25,15,11,8,40,39,38,17,3,9,11,2,32,11,6,20,48,75,27,3,7,
|
||||
54,12,95,12,7,24,23,2,13,8,15,16,5,12,4,17,7,19,88,2,6,13,115,45,12,21,2,86,
|
||||
74,9,7,5,16,32,16,2,21,18,6,34,5,18,260,7,12,16,44,19,92,31,7,8,2,9,0,0,15,
|
||||
8,38,4,8,20,18,2,83,3,3,4,9,5,3,10,3,5,29,15,7,11,8,48,17,23,2,17,4,11,22,
|
||||
21,64,8,8,4,19,95,0,17,28,9,11,20,71,5,11,18,12,13,45,49,4,1,33,32,23,13,5,
|
||||
52,2,2,16,3,4,7,12,2,1,12,6,24,1,22,155,21,3,45,4,12,44,26,5,40,36,9,9,8,20,
|
||||
35,31,3,2,32,50,10,8,37,2,75,35,22,15,192,8,11,23,1,4,29,6,8,8,5,12,18,32,4,
|
||||
7,12,2,0,0,9,5,48,11,35,3,1,123,6,29,8,11,8,23,51,16,6,63,12,2,5,4,14,2,15,
|
||||
7,14,3,2,7,17,32,8,8,10,1,23,62,2,49,6,49,47,23,3,20,7,11,39,10,24,6,15,5,5,
|
||||
11,8,16,36,8,13,20,3,10,44,7,52,7,10,36,6,15,10,5,11,4,14,19,17,10,12,3,6,
|
||||
23,4,13,94,70,7,36,7,38,7,28,8,4,15,3,19,4,33,39,21,109,4,80,6,40,4,432,4,4,
|
||||
7,8,3,31,8,28,37,34,10,2,21,5,22,0,7,36,14,12,6,24,1,21,5,9,2,29,20,54,113,
|
||||
13,31,39,27,6,0,27,4,5,2,43,7,8,57,8,62,7,9,12,22,90,30,6,19,7,10,20,6,5,58,
|
||||
32,30,41,4,10,25,13,3,8,7,10,2,9,6,151,44,16,12,16,20,8,3,18,11,17,4,10,45,
|
||||
15,8,56,38,52,25,40,14,4,17,15,8,2,19,7,8,26,30,2,3,180,8,26,17,38,35,5,16,
|
||||
28,5,15,56,13,14,18,9,15,83,27,3,9,4,11,8,27,27,44,10,12,8,3,48,14,7,9,4,4,
|
||||
8,4,5,9,122,8,14,12,19,17,21,4,29,63,21,17,10,12,18,47,10,10,53,4,18,16,4,8,
|
||||
118,9,5,12,9,11,9,3,12,32,3,23,2,15,3,3,30,3,17,235,15,22,9,299,14,17,1,5,
|
||||
16,8,3,7,3,13,2,7,6,4,8,66,2,13,6,15,16,47,3,36,5,7,10,24,1,9,9,8,13,16,26,
|
||||
12,7,24,21,18,49,23,39,10,41,4,13,4,27,11,12,12,19,4,147,8,10,9,40,21,2,83,
|
||||
10,5,6,11,25,9,50,57,40,12,12,21,1,3,24,23,9,3,9,13,2,3,12,57,8,11,13,15,26,
|
||||
15,10,47,36,4,25,1,5,8,5,4,0,12,49,5,19,4,6,16,14,6,10,69,10,33,29,7,8,61,
|
||||
12,4,0,3,7,6,3,16,29,27,38,4,21,0,24,3,2,1,19,16,22,2,8,138,11,7,7,3,12,22,
|
||||
3,16,5,7,3,53,9,10,32,14,5,7,3,6,22,9,59,26,8,7,58,5,16,11,55,7,4,11,146,91,
|
||||
8,13,18,14,6,8,8,31,26,22,6,11,30,11,30,15,18,31,3,48,17,7,6,4,9,2,25,3,35,
|
||||
13,13,7,8,4,31,10,8,10,4,3,45,10,23,2,7,259,17,21,13,14,3,26,3,8,27,4,18,9,
|
||||
66,7,12,5,8,17,4,23,55,41,51,2,32,26,66,4,21,14,12,65,16,22,17,5,14,2,29,24,
|
||||
7,3,36,2,43,53,86,5,28,4,58,13,49,121,6,2,73,2,1,47,4,2,27,10,35,28,27,10,
|
||||
17,10,56,7,10,14,28,20,24,40,7,4,7,3,10,11,32,6,6,3,15,11,54,573,2,3,6,2,3,
|
||||
14,64,4,16,12,16,42,10,26,4,6,11,69,18,27,2,2,17,22,9,13,22,11,6,1,15,49,3,
|
||||
14,1
|
||||
},
|
||||
|
||||
{
|
||||
11,11,11,15,47,1,3,1,23,5,8,18,3,23,15,21,1,7,19,10,26,1,17,11,31,21,41,18,
|
||||
34,4,9,58,19,3,3,36,5,18,13,3,14,4,9,10,4,19,56,15,3,5,3,11,27,9,4,10,13,4,
|
||||
11,6,9,2,18,3,10,19,11,4,53,4,2,2,3,4,58,16,3,0,5,30,2,11,93,10,2,14,10,6,2,
|
||||
115,2,25,16,22,38,101,4,18,13,2,145,51,45,15,14,15,13,20,7,24,5,13,14,30,40,
|
||||
10,4,107,12,24,14,39,12,6,13,20,7,7,11,5,18,18,45,22,6,39,3,2,1,51,9,11,4,
|
||||
13,9,38,44,8,11,9,15,19,9,23,17,17,17,13,9,9,1,10,4,18,6,2,9,5,27,32,72,8,
|
||||
37,9,4,10,30,17,20,15,17,66,10,4,73,35,37,6,4,16,117,45,13,4,75,5,24,65,10,
|
||||
4,9,4,13,46,5,26,29,10,4,4,52,3,13,18,63,6,14,9,24,277,9,88,2,48,27,123,14,
|
||||
61,7,5,10,8,7,90,3,10,3,3,48,17,13,10,18,33,2,19,36,6,21,1,16,12,5,6,2,16,
|
||||
15,29,88,28,2,15,6,11,4,6,11,3,3,4,18,9,53,5,4,3,33,8,9,8,6,7,36,9,62,14,2,
|
||||
1,10,1,16,7,32,7,23,20,11,10,23,2,1,0,9,16,40,2,81,5,22,8,5,4,37,51,37,10,
|
||||
19,57,11,2,92,31,6,39,10,13,16,8,20,6,9,3,10,18,25,23,12,30,6,2,26,7,64,18,
|
||||
6,30,12,13,27,7,10,5,3,33,24,99,4,23,4,1,27,7,27,49,8,20,16,3,4,13,9,22,67,
|
||||
28,3,10,16,3,2,10,4,8,1,8,19,3,85,6,21,1,9,16,2,30,10,33,12,4,9,3,1,60,38,6,
|
||||
24,32,3,14,3,40,8,34,115,5,9,27,5,96,3,40,6,15,5,8,22,112,5,5,25,17,58,2,7,
|
||||
36,21,52,1,3,95,12,21,4,11,8,59,24,5,21,4,9,15,8,7,21,3,26,5,11,6,7,17,65,
|
||||
14,11,10,2,17,5,12,22,4,4,2,21,8,112,3,34,63,35,2,25,1,2,15,65,23,0,3,5,15,
|
||||
26,27,9,5,48,11,15,4,9,5,33,20,15,1,18,19,11,24,40,10,21,74,6,6,32,30,40,5,
|
||||
4,7,44,10,25,46,16,12,5,40,7,18,5,18,9,12,8,4,25,5,6,36,4,43,8,9,12,35,17,4,
|
||||
8,9,11,27,5,10,17,40,8,12,4,18,9,18,12,20,25,39,42,1,24,13,22,15,7,112,35,3,
|
||||
7,17,33,2,5,5,19,8,4,12,24,14,13,2,1,13,6,5,19,11,7,57,0,19,6,117,48,14,8,
|
||||
10,51,17,12,14,2,5,8,9,15,4,48,53,13,22,4,25,12,11,19,45,5,2,6,54,22,9,15,9,
|
||||
13,2,7,11,29,82,16,46,4,26,14,26,40,22,4,26,6,18,13,4,4,20,3,3,7,12,17,8,9,
|
||||
23,6,20,7,25,23,19,5,15,6,23,15,11,19,11,3,17,59,8,18,41,4,54,23,44,75,13,
|
||||
20,6,11,2,3,1,13,10,3,7,12,3,4,7,8,30,6,6,7,3,32,9,5,28,6,114,42,13,36,27,
|
||||
59,6,93,13,74,8,69,140,3,1,17,48,105,6,11,5,15,1,10,10,14,8,53,0,8,24,60,2,
|
||||
6,35,2,12,32,47,16,17,75,2,5,4,37,28,10,5,9,57,4,59,5,12,13,7,90,5,11,5,24,
|
||||
22,13,30,1,2,10,9,6,19,3,18,47,2,5,7,9,35,15,3,6,1,21,14,14,18,14,9,12,8,73,
|
||||
6,19,3,32,9,14,17,17,5,55,23,6,16,28,3,11,48,4,6,6,6,12,16,30,10,30,27,51,
|
||||
18,29,2,3,15,1,76,0,16,33,4,27,3,62,4,10,2,4,8,15,9,41,26,22,2,4,20,4,49,0,
|
||||
8,1,57,13,12,39,3,63,10,19,34,35,2,7,8,29,72,4,10,0,77,8,6,7,9,15,21,9,4,1,
|
||||
20,23,1,9,18,9,15,36,4,7,6,15,5,7,7,40,2,9,22,2,3,20,4,12,34,13,6,18,15,1,
|
||||
38,20,12,7,16,3,19,85,12,16,18,16,2,17,1,13,8,6,12,15,97,17,12,9,3,21,15,12,
|
||||
23,44,81,26,30,2,5,17,6,6,0,22,42,19,6,19,41,14,36,7,3,56,7,9,3,2,6,9,69,3,
|
||||
15,4,30,28,29,7,9,15,17,17,6,1,6,153,9,33,5,12,14,16,28,3,8,7,14,12,4,6,36,
|
||||
9,24,13,13,4,2,9,15,19,9,53,7,13,4,150,17,9,2,6,12,7,3,5,58,19,58,28,8,14,3,
|
||||
20,3,0,32,56,7,5,4,27,1,68,4,29,13,5,58,2,9,65,41,27,16,15,12,14,2,10,9,24,
|
||||
3,2,9,2,2,3,14,32,10,22,3,13,11,4,6,39,17,0,10,5,5,10,35,16,19,14,1,8,63,19,
|
||||
14,8,56,10,2,12,6,12,6,7,16,2,9,9,12,20,73,25,13,21,17,24,5,32,8,12,25,8,14,
|
||||
16,5,23,3,7,6,3,11,24,6,30,4,21,13,28,4,6,29,15,5,17,6,26,8,15,8,3,7,7,50,
|
||||
11,30,6,2,28,56,16,24,25,23,24,89,31,31,12,7,22,4,10,17,3,3,8,11,13,5,3,27,
|
||||
1,12,1,14,8,10,29,2,5,2,2,20,10,0,31,10,21,1,48,3,5,43,4,5,18,13,5,18,25,34,
|
||||
18,3,5,22,16,3,4,20,3,9,3,25,6,6,44,21,3,12,7,5,42,3,2,14,4,36,5,3,45,51,15,
|
||||
9,11,28,9,7,6,6,12,26,5,14,10,11,42,55,13,21,4,28,6,7,23,27,11,1,41,36,0,32,
|
||||
15,26,2,3,23,32,11,2,15,7,29,26,144,33,20,12,7,21,10,7,11,65,46,10,13,20,32,
|
||||
4,4,5,19,2,19,15,49,41,1,75,10,11,25,1,2,45,11,8,27,18,10,60,28,29,12,30,19,
|
||||
16,4,24,11,19,27,17,49,18,7,40,13,19,22,8,55,12,11,3,6,5,11,8,10,22,5,9,9,
|
||||
25,7,17,7,64,1,24,2,12,17,44,4,12,27,21,11,10,7,47,5,9,13,12,38,27,21,7,29,
|
||||
7,1,17,3,3,5,48,62,10,3,11,17,15,15,6,3,8,10,8,18,19,13,3,9,7,6,44,9,10,4,
|
||||
43,8,6,6,14,20,38,24,2,4,5,5,7,5,9,39,8,44,40,9,19,7,3,15,25,2,37,18,15,9,5,
|
||||
8,32,10,5,18,4,7,46,20,17,23,4,11,16,18,31,11,3,11,1,14,1,25,4,27,13,13,39,
|
||||
14,6,6,35,6,16,13,11,122,21,15,20,24,10,5,152,15,39,5,20,16,9,14,7,53,6,3,8,
|
||||
19,63,32,6,2,3,20,1,19,5,13,42,15,4,6,68,31,46,11,38,10,24,5,5,8,9,12,3,35,
|
||||
46,26,16,2,8,4,74,16,44,4,5,1,16,4,14,23,16,69,15,42,31,14,7,7,6,97,14,40,1,
|
||||
8,7,34,9,39,19,13,15,10,21,18,10,5,15,38,7,5,12,7,20,15,4,11,6,14,5,17,7,39,
|
||||
35,36,18,20,26,22,4,2,36,21,64,0,5,9,10,6,4,1,7,3,1,3,3,4,10,20,90,2,22,48,
|
||||
16,23,2,33,40,1,21,21,17,20,8,8,12,4,83,14,48,4,21,3,9,27,5,11,40,15,9,3,16,
|
||||
17,9,11,4,24,31,17,3,4,2,11,1,8,4,8,6,41,17,4,13,3,7,17,8,27,5,13,6,10,7,13,
|
||||
12,18,13,60,18,3,8,1,12,125,2,7,16,2,11,2,4,7,26,5,9,14,14,16,8,14,7,14,6,9,
|
||||
13,9,6,4,26,35,49,36,55,3,9,6,40,26,23,31,19,41,2,10,31,6,54,5,69,16,7,8,16,
|
||||
1,5,7,4,22,7,7,5,4,48,11,13,3,98,4,11,19,4,2,14,7,34,7,10,3,2,12,7,6,2,5,118
|
||||
},
|
||||
};
|
||||
|
||||
uint8_t data[1<<15];
|
||||
string data_str;
|
||||
void init_data() {
|
||||
static bool initted = false;
|
||||
if (initted) return;
|
||||
initted = true;
|
||||
|
||||
// Repeatability. Kind of.
|
||||
std::srand(42);
|
||||
for (int i = 0; i < (int)(sizeof(data)/sizeof(data[0])); ++i) {
|
||||
data[i] = (uint8_t)rand();
|
||||
}
|
||||
|
||||
data_str.assign((char*)data, sizeof(data));
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_SUITE( TBufferBaseTest )
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_GetBuffer ) {
|
||||
init_data();
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
TMemoryBuffer buffer(16);
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
|
||||
while (offset < 1<<15) {
|
||||
buffer.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
string output = buffer.getBufferAsString();
|
||||
BOOST_CHECK_EQUAL(data_str, output);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read ) {
|
||||
init_data();
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
for (int d2 = 0; d2 < 3; d2++) {
|
||||
TMemoryBuffer buffer(16);
|
||||
uint8_t data_out[1<<15];
|
||||
int offset;
|
||||
int index;
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < 1<<15) {
|
||||
buffer.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < 1<<15) {
|
||||
unsigned int got = buffer.read(&data_out[offset], dist[d2][index]);
|
||||
BOOST_CHECK_EQUAL(got, dist[d2][index]);
|
||||
offset += dist[d2][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, sizeof(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_ReadString ) {
|
||||
init_data();
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
for (int d2 = 0; d2 < 3; d2++) {
|
||||
TMemoryBuffer buffer(16);
|
||||
string output;
|
||||
int offset;
|
||||
int index;
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < 1<<15) {
|
||||
buffer.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < 1<<15) {
|
||||
unsigned int got = buffer.readAppendToString(output, dist[d2][index]);
|
||||
BOOST_CHECK_EQUAL(got, dist[d2][index]);
|
||||
offset += dist[d2][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK_EQUAL(output, data_str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Multi1 ) {
|
||||
init_data();
|
||||
|
||||
// Do shorter writes and reads so we don't align to power-of-two boundaries.
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
for (int d2 = 0; d2 < 3; d2++) {
|
||||
TMemoryBuffer buffer(16);
|
||||
uint8_t data_out[1<<15];
|
||||
int offset;
|
||||
int index;
|
||||
|
||||
for (int iter = 0; iter < 6; iter++) {
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < (1<<15)-42) {
|
||||
buffer.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < (1<<15)-42) {
|
||||
buffer.read(&data_out[offset], dist[d2][index]);
|
||||
offset += dist[d2][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, (1<<15)-42));
|
||||
|
||||
// Pull out the extra data.
|
||||
buffer.read(data_out, 42);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Multi2 ) {
|
||||
init_data();
|
||||
|
||||
// Do shorter writes and reads so we don't align to power-of-two boundaries.
|
||||
// Pull the buffer out of the loop so its state gets worked harder.
|
||||
TMemoryBuffer buffer(16);
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
for (int d2 = 0; d2 < 3; d2++) {
|
||||
uint8_t data_out[1<<15];
|
||||
int offset;
|
||||
int index;
|
||||
|
||||
for (int iter = 0; iter < 6; iter++) {
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < (1<<15)-42) {
|
||||
buffer.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
offset = 0;
|
||||
index = 0;
|
||||
while (offset < (1<<15)-42) {
|
||||
buffer.read(&data_out[offset], dist[d2][index]);
|
||||
offset += dist[d2][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, (1<<15)-42));
|
||||
|
||||
// Pull out the extra data.
|
||||
buffer.read(data_out, 42);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Incomplete ) {
|
||||
init_data();
|
||||
|
||||
// Do shorter writes and reads so we don't align to power-of-two boundaries.
|
||||
// Pull the buffer out of the loop so its state gets worked harder.
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
for (int d2 = 0; d2 < 3; d2++) {
|
||||
TMemoryBuffer buffer(16);
|
||||
uint8_t data_out[1<<13];
|
||||
|
||||
int write_offset = 0;
|
||||
int write_index = 0;
|
||||
unsigned int to_write = (1<<14)-42;
|
||||
while (to_write > 0) {
|
||||
int write_amt = std::min(dist[d1][write_index], to_write);
|
||||
buffer.write(&data[write_offset], write_amt);
|
||||
write_offset += write_amt;
|
||||
write_index++;
|
||||
to_write -= write_amt;
|
||||
}
|
||||
|
||||
int read_offset = 0;
|
||||
int read_index = 0;
|
||||
unsigned int to_read = (1<<13)-42;
|
||||
while (to_read > 0) {
|
||||
int read_amt = std::min(dist[d2][read_index], to_read);
|
||||
int got = buffer.read(&data_out[read_offset], read_amt);
|
||||
BOOST_CHECK_EQUAL(got, read_amt);
|
||||
read_offset += read_amt;
|
||||
read_index++;
|
||||
to_read -= read_amt;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, (1<<13)-42));
|
||||
|
||||
int second_offset = write_offset;
|
||||
int second_index = write_index-1;
|
||||
unsigned int to_second = (1<<14)+42;
|
||||
while (to_second > 0) {
|
||||
int second_amt = std::min(dist[d1][second_index], to_second);
|
||||
//printf("%d\n", second_amt);
|
||||
buffer.write(&data[second_offset], second_amt);
|
||||
second_offset += second_amt;
|
||||
second_index++;
|
||||
to_second -= second_amt;
|
||||
}
|
||||
|
||||
string output = buffer.getBufferAsString();
|
||||
BOOST_CHECK_EQUAL(data_str.substr((1<<13)-42), output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_BufferedTransport_Write ) {
|
||||
init_data();
|
||||
|
||||
int sizes[] = {
|
||||
12, 15, 16, 17, 20,
|
||||
501, 512, 523,
|
||||
2000, 2048, 2096,
|
||||
1<<14, 1<<17,
|
||||
};
|
||||
|
||||
foreach (int size, sizes) {
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer(16));
|
||||
TBufferedTransport trans(buffer, size);
|
||||
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
while (offset < 1<<15) {
|
||||
trans.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
trans.flush();
|
||||
|
||||
string output = buffer->getBufferAsString();
|
||||
BOOST_CHECK_EQUAL(data_str, output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_BufferedTransport_Read_Full ) {
|
||||
init_data();
|
||||
|
||||
int sizes[] = {
|
||||
12, 15, 16, 17, 20,
|
||||
501, 512, 523,
|
||||
2000, 2048, 2096,
|
||||
1<<14, 1<<17,
|
||||
};
|
||||
|
||||
foreach (int size, sizes) {
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer(data, sizeof(data)));
|
||||
TBufferedTransport trans(buffer, size);
|
||||
uint8_t data_out[1<<15];
|
||||
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
while (offset < 1<<15) {
|
||||
// Note: this doesn't work with "read" because TBufferedTransport
|
||||
// doesn't try loop over reads, so we get short reads. We don't
|
||||
// check the return value, so that messes us up.
|
||||
trans.readAll(&data_out[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, sizeof(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_BufferedTransport_Read_Short ) {
|
||||
init_data();
|
||||
|
||||
int sizes[] = {
|
||||
12, 15, 16, 17, 20,
|
||||
501, 512, 523,
|
||||
2000, 2048, 2096,
|
||||
1<<14, 1<<17,
|
||||
};
|
||||
|
||||
foreach (int size, sizes) {
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer(data, sizeof(data)));
|
||||
shared_ptr<TShortReadTransport> tshort(new TShortReadTransport(buffer, 0.125));
|
||||
TBufferedTransport trans(buffer, size);
|
||||
uint8_t data_out[1<<15];
|
||||
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
while (offset < 1<<15) {
|
||||
// Note: this doesn't work with "read" because TBufferedTransport
|
||||
// doesn't try loop over reads, so we get short reads. We don't
|
||||
// check the return value, so that messes us up.
|
||||
trans.readAll(&data_out[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, sizeof(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_FramedTransport_Write ) {
|
||||
init_data();
|
||||
|
||||
int sizes[] = {
|
||||
12, 15, 16, 17, 20,
|
||||
501, 512, 523,
|
||||
2000, 2048, 2096,
|
||||
1<<14, 1<<17,
|
||||
};
|
||||
|
||||
foreach (int size, sizes) {
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer(16));
|
||||
TFramedTransport trans(buffer, size);
|
||||
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
while (offset < 1<<15) {
|
||||
trans.write(&data[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
trans.flush();
|
||||
|
||||
int32_t frame_size = -1;
|
||||
buffer->read(reinterpret_cast<uint8_t*>(&frame_size), sizeof(frame_size));
|
||||
frame_size = (int32_t)ntohl((uint32_t)frame_size);
|
||||
BOOST_CHECK_EQUAL(frame_size, 1<<15);
|
||||
BOOST_CHECK_EQUAL(data_str.size(), (unsigned int)frame_size);
|
||||
string output = buffer->getBufferAsString();
|
||||
BOOST_CHECK_EQUAL(data_str, output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_FramedTransport_Read ) {
|
||||
init_data();
|
||||
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
uint8_t data_out[1<<15];
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
|
||||
TFramedTransport trans(buffer);
|
||||
int32_t length = sizeof(data);
|
||||
length = (int32_t)htonl((uint32_t)length);
|
||||
buffer->write(reinterpret_cast<uint8_t*>(&length), sizeof(length));
|
||||
buffer->write(data, sizeof(data));
|
||||
|
||||
int offset = 0;
|
||||
int index = 0;
|
||||
while (offset < 1<<15) {
|
||||
// This should work with read because we have one huge frame.
|
||||
trans.read(&data_out[offset], dist[d1][index]);
|
||||
offset += dist[d1][index];
|
||||
index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK(!memcmp(data, data_out, sizeof(data)));
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_FramedTransport_Write_Read ) {
|
||||
init_data();
|
||||
|
||||
int sizes[] = {
|
||||
12, 15, 16, 17, 20,
|
||||
501, 512, 523,
|
||||
2000, 2048, 2096,
|
||||
1<<14, 1<<17,
|
||||
};
|
||||
|
||||
int probs[] = { 1, 2, 4, 8, 16, 32, };
|
||||
|
||||
foreach (int size, sizes) {
|
||||
foreach (int prob, probs) {
|
||||
for (int d1 = 0; d1 < 3; d1++) {
|
||||
shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer(16));
|
||||
TFramedTransport trans(buffer, size);
|
||||
uint8_t data_out[1<<15];
|
||||
std::vector<int> flush_sizes;
|
||||
|
||||
int write_offset = 0;
|
||||
int write_index = 0;
|
||||
int flush_size = 0;
|
||||
while (write_offset < 1<<15) {
|
||||
trans.write(&data[write_offset], dist[d1][write_index]);
|
||||
write_offset += dist[d1][write_index];
|
||||
flush_size += dist[d1][write_index];
|
||||
write_index++;
|
||||
if (rand()%prob == 0) {
|
||||
flush_sizes.push_back(flush_size);
|
||||
flush_size = 0;
|
||||
trans.flush();
|
||||
}
|
||||
}
|
||||
if (flush_size != 0) {
|
||||
flush_sizes.push_back(flush_size);
|
||||
flush_size = 0;
|
||||
trans.flush();
|
||||
}
|
||||
|
||||
int read_offset = 0;
|
||||
int read_index = 0;
|
||||
foreach (int fsize, flush_sizes) {
|
||||
// We are exploiting an implementation detail of TFramedTransport.
|
||||
// The read buffer starts empty and it will never do more than one
|
||||
// readFrame per read, so we should always get exactly one frame.
|
||||
int got = trans.read(&data_out[read_offset], 1<<15);
|
||||
BOOST_CHECK_EQUAL(got, fsize);
|
||||
read_offset += got;
|
||||
read_index++;
|
||||
}
|
||||
|
||||
BOOST_CHECK_EQUAL((unsigned int)read_offset, sizeof(data));
|
||||
BOOST_CHECK(!memcmp(data, data_out, sizeof(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
@ -2,7 +2,7 @@
|
||||
#include <iostream>
|
||||
#include <climits>
|
||||
#include <cassert>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <protocol/TBinaryProtocol.h>
|
||||
#include "gen-cpp/ThriftTest_types.h"
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <stdexcept>
|
||||
#include <Thrift.h>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
using namespace std;
|
||||
using boost::shared_ptr;
|
||||
using facebook::thrift::transport::TTransportException;
|
||||
|
@ -11,7 +11,7 @@ g++ -Wall -g -I../lib/cpp/src -I/usr/local/include/boost-1_33_1 \
|
||||
#include <cassert>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <transport/TTransportUtils.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
#include <transport/TZlibTransport.h>
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user