THRIFT-711. java: TFramedTransport should support direct buffer access

This patch adds direct buffer read access to TFramedTransport as well as a simple test for reading, direct buffer reading, and writing.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@918142 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bryan Duxbury 2010-03-02 18:39:57 +00:00
parent 3e60fa6a54
commit 0f52f07201
3 changed files with 186 additions and 6 deletions

View File

@ -186,6 +186,8 @@
classpathref="test.classpath" failonerror="true" />
<java classname="org.apache.thrift.test.PartialDeserializeTest"
classpathref="test.classpath" failonerror="true" />
<java classname="org.apache.thrift.test.transport.TFramedTransportTest"
classpathref="test.classpath" failonerror="true" />
</target>
<target name="testclient" description="Run a test client">

View File

@ -19,8 +19,6 @@
package org.apache.thrift.transport;
import java.io.ByteArrayInputStream;
import org.apache.thrift.TByteArrayOutputStream;
/**
@ -43,7 +41,7 @@ public class TFramedTransport extends TTransport {
/**
* Buffer for input
*/
private ByteArrayInputStream readBuffer_ = null;
private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
public static class Factory extends TTransportFactory {
public Factory() {
@ -87,8 +85,24 @@ public class TFramedTransport extends TTransport {
return readBuffer_.read(buf, off, len);
}
public byte[] getBuffer() {
return readBuffer_.getBuffer();
}
public int getBufferPosition() {
return readBuffer_.getBufferPosition();
}
public int getBytesRemainingInBuffer() {
return readBuffer_.getBytesRemainingInBuffer();
}
public void consumeBuffer(int len) {
readBuffer_.consumeBuffer(len);
}
private final byte[] i32rd = new byte[4];
private void readFrame() throws TTransportException {
byte[] i32rd = new byte[4];
transport_.readAll(i32rd, 0, 4);
int size =
((i32rd[0] & 0xff) << 24) |
@ -99,10 +113,10 @@ public class TFramedTransport extends TTransport {
if (size < 0) {
throw new TTransportException("Read a negative frame size (" + size + ")!");
}
byte[] buff = new byte[size];
transport_.readAll(buff, 0, size);
readBuffer_ = new ByteArrayInputStream(buff);
readBuffer_.reset(buff);
}
public void write(byte[] buf, int off, int len) throws TTransportException {

View File

@ -0,0 +1,164 @@
package org.apache.thrift.test.transport;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class TFramedTransportTest {
public static class WriteCountingTransport extends TTransport {
private int writeCount = 0;
private final TTransport trans;
public WriteCountingTransport(TTransport underlying) {
trans = underlying;
}
@Override
public void close() {}
@Override
public boolean isOpen() {return true;}
@Override
public void open() throws TTransportException {}
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
return 0;
}
@Override
public void write(byte[] buf, int off, int len) throws TTransportException {
writeCount ++;
trans.write(buf, off, len);
}
}
public static class ReadCountingTransport extends TTransport {
public int readCount = 0;
private TTransport trans;
public ReadCountingTransport(TTransport underlying) {
trans = underlying;
}
@Override
public void close() {}
@Override
public boolean isOpen() {return true;}
@Override
public void open() throws TTransportException {}
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
readCount++;
return trans.read(buf, off, len);
}
@Override
public void write(byte[] buf, int off, int len) throws TTransportException {}
}
public static void main(String[] args) throws TTransportException, IOException {
testWrite();
testRead();
testDirectRead();
}
private static void testWrite() throws TTransportException, IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(baos));
TTransport trans = new TFramedTransport(countingTrans);
trans.write(byteSequence(0,100));
failUnless(countingTrans.writeCount == 0);
trans.write(byteSequence(101,200));
trans.write(byteSequence(201,255));
failUnless(countingTrans.writeCount == 0);
trans.flush();
failUnless(countingTrans.writeCount == 2);
DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
failUnless(din.readInt() == 256);
byte[] buf = new byte[256];
din.read(buf, 0, 256);
failUnless(Arrays.equals(byteSequence(0,255), buf));
}
private static void testRead() throws IOException, TTransportException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(50);
dos.write(byteSequence(0, 49));
TMemoryBuffer membuf = new TMemoryBuffer(0);
membuf.write(baos.toByteArray());
ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
TFramedTransport trans = new TFramedTransport(countTrans);
byte[] readBuf = new byte[10];
trans.read(readBuf, 0, 10);
failUnless(Arrays.equals(readBuf, byteSequence(0,9)));
trans.read(readBuf, 0, 10);
failUnless(Arrays.equals(readBuf, byteSequence(10,19)));
failUnless(countTrans.readCount == 2);
}
private static void testDirectRead() throws IOException, TTransportException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(50);
dos.write(byteSequence(0, 49));
TMemoryBuffer membuf = new TMemoryBuffer(0);
membuf.write(baos.toByteArray());
ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
TFramedTransport trans = new TFramedTransport(countTrans);
failUnless(trans.getBytesRemainingInBuffer() == 0);
byte[] readBuf = new byte[10];
trans.read(readBuf, 0, 10);
failUnless(Arrays.equals(readBuf, byteSequence(0,9)));
failUnless(trans.getBytesRemainingInBuffer() == 40);
failUnless(trans.getBufferPosition() == 10);
trans.consumeBuffer(5);
failUnless(trans.getBytesRemainingInBuffer() == 35);
failUnless(trans.getBufferPosition() == 15);
failUnless(countTrans.readCount == 2);
}
private static void failUnless(boolean b) {
if (!b) {
throw new RuntimeException();
}
}
private static byte[] byteSequence(int start, int end) {
byte[] result = new byte[end-start+1];
for (int i = 0; i <= (end-start); i++) {
result[i] = (byte)(start+i);
}
return result;
}
}