THRIFT-4390: Fix bug where binary/buffered messages > 4K could not be read/written

Client: rs

This closes #1458
This commit is contained in:
Allen George 2017-12-11 11:44:11 -05:00 committed by James E. King, III
parent a15060acd8
commit cf7ba4ca32
4 changed files with 59 additions and 34 deletions

View File

@ -312,13 +312,6 @@ where
transport: transport,
}
}
fn write_transport(&mut self, buf: &[u8]) -> ::Result<()> {
self.transport
.write(buf)
.map(|_| ())
.map_err(From::from)
}
}
impl<T> TOutputProtocol for TBinaryOutputProtocol<T>
@ -384,7 +377,7 @@ where
fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
self.write_i32(b.len() as i32)?;
self.write_transport(b)
self.transport.write_all(b).map_err(From::from)
}
fn write_bool(&mut self, b: bool) -> ::Result<()> {

View File

@ -174,6 +174,7 @@ where
C: Write,
{
buf: Vec<u8>,
cap: usize,
channel: C,
}
@ -191,8 +192,11 @@ where
/// `read_capacity` and an internal write buffer of size
/// `write_capacity` that wraps the given `TIoChannel`.
pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
assert!(write_capacity > 0, "write buffer size must be a positive integer");
TBufferedWriteTransport {
buf: Vec::with_capacity(write_capacity),
cap: write_capacity,
channel: channel,
}
}
@ -203,13 +207,28 @@ where
C: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
self.buf.extend_from_slice(&buf[..avail_bytes]);
assert!(
self.buf.len() <= self.buf.capacity(),
"copy overflowed buffer"
);
Ok(avail_bytes)
if !buf.is_empty() {
let mut avail_bytes;
loop {
avail_bytes = cmp::min(buf.len(), self.cap - self.buf.len());
if avail_bytes == 0 {
self.flush()?;
} else {
break;
}
}
let avail_bytes = avail_bytes;
self.buf.extend_from_slice(&buf[..avail_bytes]);
assert!(self.buf.len() <= self.cap, "copy overflowed buffer");
Ok(avail_bytes)
} else {
Ok(0)
}
}
fn flush(&mut self) -> io::Result<()> {
@ -364,14 +383,21 @@ mod tests {
}
#[test]
fn must_return_zero_if_nothing_can_be_written() {
fn must_return_error_when_nothing_can_be_written_to_underlying_channel() {
let mem = TBufferChannel::with_capacity(0, 0);
let mut t = TBufferedWriteTransport::with_capacity(0, mem);
let mut t = TBufferedWriteTransport::with_capacity(1, mem);
let b = vec![0; 10];
let r = t.write(&b);
assert_eq!(r.unwrap(), 0);
// should have written 1 byte
assert_eq!(r.unwrap(), 1);
// let's try again...
let r = t.write(&b[1..]);
// this time we'll error out because the auto-flush failed
assert!(r.is_err());
}
#[test]
@ -387,23 +413,35 @@ mod tests {
}
#[test]
fn must_return_zero_if_write_buffer_full() {
let mem = TBufferChannel::with_capacity(0, 0);
fn must_auto_flush_if_write_buffer_full() {
let mem = TBufferChannel::with_capacity(0, 8);
let mut t = TBufferedWriteTransport::with_capacity(4, mem);
let b = [0x00, 0x01, 0x02, 0x03];
let b0 = [0x00, 0x01, 0x02, 0x03];
let b1 = [0x04, 0x05, 0x06, 0x07];
// we've now filled the write buffer
let r = t.write(&b);
// write the first 4 bytes; we've now filled the transport's write buffer
let r = t.write(&b0);
assert_eq!(r.unwrap(), 4);
// try write the same bytes again - nothing should be writable
let r = t.write(&b);
assert_eq!(r.unwrap(), 0);
// try write the next 4 bytes; this causes the transport to auto-flush the first 4 bytes
let r = t.write(&b1);
assert_eq!(r.unwrap(), 4);
// check that in writing the second 4 bytes we auto-flushed the first 4 bytes
assert_eq_transport_num_written_bytes!(t, 4);
assert_eq_transport_written_bytes!(t, b0);
t.channel.empty_write_buffer();
// now flush the transport to push the second 4 bytes to the underlying channel
assert!(t.flush().is_ok());
// check that we wrote out the second 4 bytes
assert_eq_transport_written_bytes!(t, b1);
}
#[test]
fn must_only_write_to_inner_transport_on_flush() {
fn must_write_to_inner_transport_on_flush() {
let mem = TBufferChannel::with_capacity(10, 10);
let mut t = TBufferedWriteTransport::new(mem);

View File

@ -156,7 +156,7 @@ impl Read for TTcpChannel {
impl Write for TTcpChannel {
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
self.if_set(|s| s.write_all(b)).map(|_| b.len())
self.if_set(|s| s.write(b))
}
fn flush(&mut self) -> io::Result<()> {

View File

@ -295,17 +295,11 @@
"rs-cpp_multic-compact_framed-ip",
"rs-cpp_multic_buffered-ip",
"rs-cpp_multic_framed-ip",
"rs-csharp_binary_buffered-ip",
"rs-csharp_binary_framed-ip",
"rs-csharp_compact_buffered-ip",
"rs-csharp_compact_framed-ip",
"rs-csharp_multi-binary_buffered-ip",
"rs-csharp_multi-binary_framed-ip",
"rs-csharp_multi_buffered-ip",
"rs-csharp_multi_framed-ip",
"rs-csharp_multic-compact_buffered-ip",
"rs-csharp_multic-compact_framed-ip",
"rs-csharp_multic_buffered-ip",
"rs-csharp_multic_framed-ip",
"rs-dart_binary_framed-ip",
"rs-dart_compact_framed-ip",