THRIFT-926. cpp: Fix inconsistencies in transport read() behavior

- TBufferedTransport::borrow() could block if not enough data was
  available.  Now it returns NULL immediately in this case, like all
  other transports.

- TBufferedTransport::read() could block some data was available in the
  readahead buffer, but not enough to satisfy the request.  It would
  attempt to call read() on the underlying transport, but this might
  block.  Now it just returns the remaining data in the readahead
  buffer.  The caller is responsible for calling read() again to get the
  rest of the data they want.

- TFrameTransport::read() threw an exception if read() on the underlying
  transport returned 0 when looking for a frame header.  Now
  TFrameTransport::read() returns 0, too.  (It still throws an exception
  if the underlying transport returns 0 after a partial frame or frame
  header has been read.)

- TFDTransport::read() threw an exception on EINTR.  Now it retries up
  to 5 times, similarly to the way TSocket::read() behaves.

- TZlibTransport::read() could block when less data than was requested
  is available.  Now it only calls read() on the underlying transport
  when it would otherwise have nothing to return.

  This does mean that TZlibTransport::read() now often returns less data
  than is actually available at the time.  This required updating
  several of the ZlibTest tests to use readAll() instead of read(),
  since they previously assumed read() would return all available data.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Reiss 2010-10-06 17:10:40 +00:00
parent e5c435cccf
commit 0a2d81e816
6 changed files with 139 additions and 61 deletions

View File

@ -28,19 +28,23 @@ namespace apache { namespace thrift { namespace transport {
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t want = len;
uint32_t have = rBound_ - rBase_;
// We should only take the slow path if we can't satisfy the read
// with the data already in the buffer.
assert(have < want);
assert(have < len);
// Copy out whatever we have.
// If we have some date in the buffer, copy it out and return it.
// We have to return it without attempting to read more, since we aren't
// guaranteed that the underlying transport actually has more data, so
// attempting to read from it could block.
if (have > 0) {
memcpy(buf, rBase_, have);
want -= have;
buf += have;
setReadBuffer(rBuf_.get(), 0);
return have;
}
// No data is available in our buffer.
// Get more from underlying transport up to buffer size.
// Note that this makes a lot of sense if len < rBufSize_
// and almost no sense otherwise. TODO(dreiss): Fix that
@ -48,12 +52,11 @@ uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
// Hand over whatever we have.
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
want -= give;
return (len - want);
return give;
}
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
@ -106,43 +109,9 @@ void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
}
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
// If the request is bigger than our buffer, we are hosed.
if (*len > rBufSize_) {
return NULL;
}
// The number of bytes of data we have already.
uint32_t have = rBound_ - rBase_;
// The number of additional bytes we need from the underlying transport.
int32_t need = *len - have;
// The space from the start of the buffer to the end of our data.
uint32_t offset = rBound_ - rBuf_.get();
assert(need > 0);
// If we have less than half our buffer space available, shift the data
// we have down to the start. If the borrow is big compared to our buffer,
// this could be kind of a waste, but if the borrow is small, it frees up
// space at the end of our buffer to do a bigger single read from the
// underlying transport. Also, if our needs extend past the end of the
// buffer, we have to do a copy no matter what.
if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
memmove(rBuf_.get(), rBase_, have);
setReadBuffer(rBuf_.get(), have);
offset = have;
}
// First try to fill up the buffer.
uint32_t got = transport_->read(rBound_, rBufSize_ - offset);
rBound_ += got;
need -= got;
// If that fails, readAll until we get what we need.
if (need > 0) {
rBound_ += transport_->readAll(rBound_, need);
}
*len = rBound_ - rBase_;
return rBase_;
// Simply return NULL. We don't know if there is actually data available on
// the underlying transport, so calling read() might block.
return NULL;
}
void TBufferedTransport::flush() {
@ -177,7 +146,10 @@ uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
}
// Read another frame.
readFrame();
if (!readFrame()) {
// EOF. No frame available.
return 0;
}
// TODO(dreiss): Should we warn when reads cross frames?
@ -190,13 +162,33 @@ uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
return (len - want);
}
void TFramedTransport::readFrame() {
bool TFramedTransport::readFrame() {
// TODO(dreiss): Think about using readv here, even though it would
// result in (gasp) read-ahead.
// Read the size of the next frame.
// We can't use readAll(&sz, sizeof(sz)), since that always throws an
// exception on EOF. We want to throw an exception only if EOF occurs after
// partial size data.
int32_t sz;
transport_->readAll((uint8_t*)&sz, sizeof(sz));
uint32_t size_bytes_read = 0;
while (size_bytes_read < sizeof(sz)) {
uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;
uint32_t bytes_read = transport_->read(szp, sizeof(sz) - size_bytes_read);
if (bytes_read == 0) {
if (size_bytes_read == 0) {
// EOF before any data was read.
return false;
} else {
// EOF after a partial frame header. Raise an exception.
throw TTransportException(TTransportException::END_OF_FILE,
"No more data to read after "
"partial frame header.");
}
}
size_bytes_read += bytes_read;
}
sz = ntohl(sz);
if (sz < 0) {
@ -210,6 +202,7 @@ void TFramedTransport::readFrame() {
}
transport_->readAll(rBuf_.get(), sz);
setReadBuffer(rBuf_.get(), sz);
return true;
}
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {

View File

@ -389,8 +389,11 @@ class TFramedTransport
protected:
/**
* Reads a frame of input from the underlying stream.
*
* Returns true if a frame was read successfully, or false on EOF.
* (Raises a TTransportException if EOF occurs after a partial frame.)
*/
void readFrame();
bool readFrame();
void initPointers() {
setReadBuffer(NULL, 0);

View File

@ -45,14 +45,23 @@ void TFDTransport::close() {
}
uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
ssize_t rv = ::read(fd_, buf, len);
if (rv < 0) {
int errno_copy = errno;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::read()",
errno_copy);
unsigned int maxRetries = 5; // same as the TSocket default
unsigned int retries = 0;
while (true) {
ssize_t rv = ::read(fd_, buf, len);
if (rv < 0) {
if (errno == EINTR && retries < maxRetries) {
// If interrupted, try again
++retries;
continue;
}
int errno_copy = errno;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::read()",
errno_copy);
}
return rv;
}
return rv;
}
void TFDTransport::write(const uint8_t* buf, uint32_t len) {

View File

@ -149,6 +149,14 @@ uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) {
return len;
}
// If we will need to read from the underlying transport to get more data,
// but we already have some data available, return it now. Reading from
// the underlying transport may block, and read() is only allowed to block
// when no data is available.
if (need < len && rstream_->avail_in == 0) {
return len - need;
}
// If we get to this point, we need to get some more data.
// If zlib has reported the end of a stream, we can't really do any more.

View File

@ -580,6 +580,64 @@ void test_read_part_available() {
clear_triggers();
}
template <class CoupledTransports>
void test_read_partial_midframe() {
CoupledTransports transports;
BOOST_REQUIRE(transports.in != NULL);
BOOST_REQUIRE(transports.out != NULL);
uint8_t write_buf[16];
uint8_t read_buf[16];
memset(write_buf, 'a', sizeof(write_buf));
// Attempt to read 10 bytes, when only 9 are available, but after we have
// already read part of the data that is available. This exercises a
// different code path for several of the transports.
//
// For transports that add their own framing (e.g., TFramedTransport and
// TFileTransport), the two flush calls break up the data in to a 10 byte
// frame and a 3 byte frame. The first read then puts us partway through the
// first frame, and then we attempt to read past the end of that frame, and
// through the next frame, too.
//
// For buffered transports that perform read-ahead (e.g.,
// TBufferedTransport), the read-ahead will most likely see all 13 bytes
// written on the first read. The next read will then attempt to read past
// the end of the read-ahead buffer.
//
// Flush 10 bytes, then 3 bytes. This creates 2 separate frames for
// transports that track framing internally.
transports.out->write(write_buf, 10);
transports.out->flush();
transports.out->write(write_buf, 3);
transports.out->flush();
// Now read 4 bytes, so that we are partway through the written data.
uint32_t bytes_read = transports.in->read(read_buf, 4);
BOOST_CHECK_EQUAL(bytes_read, 4);
// Now attempt to read 10 bytes. Only 9 more are available.
//
// We should be able to get all 9 bytes, but it might take multiple read
// calls, since it is valid for read() to return fewer bytes than requested.
// (Most transports do immediately return 9 bytes, but the framing transports
// tend to only return to the end of the current frame, which is 6 bytes in
// this case.)
uint32_t total_read = 0;
while (total_read < 9) {
set_trigger(3, transports.out, 1);
bytes_read = transports.in->read(read_buf, 10);
BOOST_REQUIRE_EQUAL(numTriggersFired, 0);
BOOST_REQUIRE_GT(bytes_read, 0);
total_read += bytes_read;
BOOST_REQUIRE_LE(total_read, 9);
}
BOOST_CHECK_EQUAL(total_read, 9);
clear_triggers();
}
template <class CoupledTransports>
void test_borrow_part_available() {
CoupledTransports transports;
@ -851,6 +909,12 @@ class TransportTestGen {
test_read_part_available<CoupledTransports>, name);
suite_->add(tc, expectedFailures);
snprintf(name, sizeof(name), "%s::test_read_partial_midframe()",
transportName);
tc = boost::unit_test::make_test_case(
test_read_partial_midframe<CoupledTransports>, name);
suite_->add(tc, expectedFailures);
snprintf(name, sizeof(name), "%s::test_read_none_available()",
transportName);
tc = boost::unit_test::make_test_case(

View File

@ -148,7 +148,7 @@ void test_write_then_read(const uint8_t* buf, uint32_t buf_len) {
zlib_trans->finish();
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
uint32_t got = zlib_trans->read(mirror.get(), buf_len);
uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
BOOST_REQUIRE_EQUAL(got, buf_len);
BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
zlib_trans->verifyChecksum();
@ -172,7 +172,7 @@ void test_separate_checksum(const uint8_t* buf, uint32_t buf_len) {
tmp_buf.length()-1));
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
uint32_t got = zlib_trans->read(mirror.get(), buf_len);
uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
BOOST_REQUIRE_EQUAL(got, buf_len);
BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
zlib_trans->verifyChecksum();
@ -193,7 +193,7 @@ void test_incomplete_checksum(const uint8_t* buf, uint32_t buf_len) {
tmp_buf.length());
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
uint32_t got = zlib_trans->read(mirror.get(), buf_len);
uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
BOOST_REQUIRE_EQUAL(got, buf_len);
BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
try {
@ -233,7 +233,8 @@ void test_read_write_mix(const uint8_t* buf, uint32_t buf_len,
expected_read_len = buf_len - tot;
}
uint32_t got = zlib_trans->read(mirror.get() + tot, read_len);
BOOST_REQUIRE_EQUAL(got, expected_read_len);
BOOST_REQUIRE_LE(got, expected_read_len);
BOOST_REQUIRE_NE(got, 0);
tot += got;
}
@ -271,7 +272,7 @@ void test_invalid_checksum(const uint8_t* buf, uint32_t buf_len) {
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
try {
zlib_trans->read(mirror.get(), buf_len);
zlib_trans->readAll(mirror.get(), buf_len);
zlib_trans->verifyChecksum();
BOOST_ERROR("verifyChecksum() did not report an error");
} catch (TZlibTransportException& ex) {