mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
Merge pull request #2191 from zeshuai007/Implements_TConfig
THRIFT-5237 Implement MAX_MESSAGE_SIZE and consolidate limits into a TConfiguration class(JAVA)
This commit is contained in:
commit
119030848c
@ -2526,7 +2526,7 @@ void t_java_generator::generate_java_bean_boilerplate(ostream& out, t_struct* ts
|
||||
}else{
|
||||
indent(out) << " : java.nio.ByteBuffer.wrap(" << field_name << ".clone());" << endl;
|
||||
}
|
||||
|
||||
|
||||
if (!bean_style_) {
|
||||
indent(out) << " return this;" << endl;
|
||||
}
|
||||
@ -3861,19 +3861,18 @@ void t_java_generator::generate_deserialize_container(ostream& out,
|
||||
// Declare variables, read header
|
||||
if (ttype->is_map()) {
|
||||
indent(out) << "org.apache.thrift.protocol.TMap " << obj
|
||||
<< " = new org.apache.thrift.protocol.TMap("
|
||||
<< " = iprot.readMapBegin("
|
||||
<< type_to_enum(((t_map*)ttype)->get_key_type()) << ", "
|
||||
<< type_to_enum(((t_map*)ttype)->get_val_type()) << ", "
|
||||
<< "iprot.readI32());" << endl;
|
||||
<< type_to_enum(((t_map*)ttype)->get_val_type()) << "); "<< endl;
|
||||
} else if (ttype->is_set()) {
|
||||
indent(out) << "org.apache.thrift.protocol.TSet " << obj
|
||||
<< " = new org.apache.thrift.protocol.TSet("
|
||||
<< type_to_enum(((t_set*)ttype)->get_elem_type()) << ", iprot.readI32());"
|
||||
<< " = iprot.readSetBegin("
|
||||
<< type_to_enum(((t_set*)ttype)->get_elem_type()) << ");"
|
||||
<< endl;
|
||||
} else if (ttype->is_list()) {
|
||||
indent(out) << "org.apache.thrift.protocol.TList " << obj
|
||||
<< " = new org.apache.thrift.protocol.TList("
|
||||
<< type_to_enum(((t_list*)ttype)->get_elem_type()) << ", iprot.readI32());"
|
||||
<< " = iprot.readListBegin("
|
||||
<< type_to_enum(((t_list*)ttype)->get_elem_type()) << ");"
|
||||
<< endl;
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer;
|
||||
public interface TAsyncProcessor {
|
||||
/**
|
||||
* Process a single frame.
|
||||
|
||||
|
||||
* <b>Note:</b> Implementations must call fb.responseReady() once processing
|
||||
* is complete
|
||||
*
|
||||
|
101
lib/java/src/org/apache/thrift/TConfiguration.java
Normal file
101
lib/java/src/org/apache/thrift/TConfiguration.java
Normal file
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.thrift;
|
||||
|
||||
|
||||
public class TConfiguration {
|
||||
public static final int DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
|
||||
public static final int DEFAULT_MAX_FRAME_SIZE = 16384000; // this value is used consistently across all Thrift libraries
|
||||
public static final int DEFAULT_RECURSION_DEPTH = 64;
|
||||
|
||||
private int maxMessageSize;
|
||||
private int maxFrameSize;
|
||||
private int recursionLimit;
|
||||
|
||||
public TConfiguration() {
|
||||
this(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_RECURSION_DEPTH);
|
||||
}
|
||||
public TConfiguration(int maxMessageSize, int maxFrameSize, int recursionLimit) {
|
||||
this.maxFrameSize = maxFrameSize;
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
this.recursionLimit = recursionLimit;
|
||||
}
|
||||
|
||||
public int getMaxMessageSize() {
|
||||
return maxMessageSize;
|
||||
}
|
||||
|
||||
public int getMaxFrameSize() {
|
||||
return maxFrameSize;
|
||||
}
|
||||
|
||||
public int getRecursionLimit() {
|
||||
return recursionLimit;
|
||||
}
|
||||
|
||||
public void setMaxMessageSize(int maxMessageSize) {
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
}
|
||||
|
||||
public void setMaxFrameSize(int maxFrameSize) {
|
||||
this.maxFrameSize = maxFrameSize;
|
||||
}
|
||||
|
||||
public void setRecursionLimit(int recursionLimit) {
|
||||
this.recursionLimit = recursionLimit;
|
||||
}
|
||||
|
||||
public static final TConfiguration DEFAULT = new Builder().build();
|
||||
|
||||
public static TConfiguration.Builder custom() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private int maxMessageSize ;
|
||||
private int maxFrameSize;
|
||||
private int recursionLimit ;
|
||||
|
||||
Builder() {
|
||||
super();
|
||||
this.maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
|
||||
this.recursionLimit = DEFAULT_RECURSION_DEPTH;
|
||||
}
|
||||
|
||||
public Builder setMaxMessageSize(int maxMessageSize) {
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxFrameSize(int maxFrameSize) {
|
||||
this.maxFrameSize = maxFrameSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setRecursionLimit(int recursionLimit) {
|
||||
this.recursionLimit = recursionLimit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TConfiguration build() {
|
||||
return new TConfiguration(maxMessageSize, maxFrameSize, recursionLimit);
|
||||
}
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.protocol.TProtocolUtil;
|
||||
import org.apache.thrift.protocol.TType;
|
||||
import org.apache.thrift.transport.TMemoryInputTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* Generic utility for easily deserializing objects from a byte array or Java
|
||||
@ -42,7 +43,7 @@ public class TDeserializer {
|
||||
/**
|
||||
* Create a new TDeserializer that uses the TBinaryProtocol by default.
|
||||
*/
|
||||
public TDeserializer() {
|
||||
public TDeserializer() throws TTransportException {
|
||||
this(new TBinaryProtocol.Factory());
|
||||
}
|
||||
|
||||
@ -52,8 +53,8 @@ public class TDeserializer {
|
||||
*
|
||||
* @param protocolFactory Factory to create a protocol
|
||||
*/
|
||||
public TDeserializer(TProtocolFactory protocolFactory) {
|
||||
trans_ = new TMemoryInputTransport();
|
||||
public TDeserializer(TProtocolFactory protocolFactory) throws TTransportException {
|
||||
trans_ = new TMemoryInputTransport(new TConfiguration());
|
||||
protocol_ = protocolFactory.getProtocol(trans_);
|
||||
}
|
||||
|
||||
@ -105,19 +106,19 @@ public class TDeserializer {
|
||||
|
||||
/**
|
||||
* Deserialize only a single Thrift object (addressed by recursively using field id)
|
||||
* from a byte record.
|
||||
* from a byte record.
|
||||
* @param tb The object to read into
|
||||
* @param bytes The serialized object to read from
|
||||
* @param fieldIdPathFirst First of the FieldId's that define a path tb
|
||||
* @param fieldIdPathRest The rest FieldId's that define a path tb
|
||||
* @throws TException
|
||||
* @throws TException
|
||||
*/
|
||||
public void partialDeserialize(TBase tb, byte[] bytes, TFieldIdEnum fieldIdPathFirst, TFieldIdEnum ... fieldIdPathRest) throws TException {
|
||||
try {
|
||||
if (locateField(bytes, fieldIdPathFirst, fieldIdPathRest) != null) {
|
||||
// if this line is reached, iprot will be positioned at the start of tb.
|
||||
tb.read(protocol_);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new TException(e);
|
||||
} finally {
|
||||
|
@ -75,7 +75,7 @@ import java.util.concurrent.TimeoutException;
|
||||
*
|
||||
*/
|
||||
public class TNonblockingMultiFetchClient {
|
||||
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(
|
||||
TNonblockingMultiFetchClient.class);
|
||||
|
||||
@ -86,7 +86,7 @@ public class TNonblockingMultiFetchClient {
|
||||
// time limit for fetching data from all servers (in second)
|
||||
private int fetchTimeoutSeconds;
|
||||
|
||||
// store request that will be sent to servers
|
||||
// store request that will be sent to servers
|
||||
private ByteBuffer requestBuf;
|
||||
private ByteBuffer requestBufDuplication;
|
||||
|
||||
@ -104,7 +104,7 @@ public class TNonblockingMultiFetchClient {
|
||||
this.fetchTimeoutSeconds = fetchTimeoutSeconds;
|
||||
this.requestBuf = requestBuf;
|
||||
this.servers = servers;
|
||||
|
||||
|
||||
stats = new TNonblockingMultiFetchStats();
|
||||
recvBuf = null;
|
||||
}
|
||||
@ -128,7 +128,7 @@ public class TNonblockingMultiFetchClient {
|
||||
if (requestBufDuplication == null) {
|
||||
requestBufDuplication = requestBuf.duplicate();
|
||||
}
|
||||
return requestBufDuplication;
|
||||
return requestBufDuplication;
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,7 +171,7 @@ public class TNonblockingMultiFetchClient {
|
||||
task.cancel(true);
|
||||
LOGGER.error("Exception during fetch", ee);
|
||||
} catch (TimeoutException te) {
|
||||
// attempt to cancel execution of the task.
|
||||
// attempt to cancel execution of the task.
|
||||
task.cancel(true);
|
||||
LOGGER.error("Timeout for fetch", te);
|
||||
}
|
||||
@ -207,10 +207,10 @@ public class TNonblockingMultiFetchClient {
|
||||
// buffer for receiving response from servers
|
||||
recvBuf = new ByteBuffer[numTotalServers];
|
||||
// buffer for sending request
|
||||
ByteBuffer sendBuf[] = new ByteBuffer[numTotalServers];
|
||||
long numBytesRead[] = new long[numTotalServers];
|
||||
int frameSize[] = new int[numTotalServers];
|
||||
boolean hasReadFrameSize[] = new boolean[numTotalServers];
|
||||
ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
|
||||
long[] numBytesRead = new long[numTotalServers];
|
||||
int[] frameSize = new int[numTotalServers];
|
||||
boolean[] hasReadFrameSize = new boolean[numTotalServers];
|
||||
|
||||
try {
|
||||
selector = Selector.open();
|
||||
@ -240,10 +240,11 @@ public class TNonblockingMultiFetchClient {
|
||||
} catch (Exception e) {
|
||||
stats.incNumConnectErrorServers();
|
||||
LOGGER.error("Set up socket to server {} error", server, e);
|
||||
|
||||
// free resource
|
||||
if (s != null) {
|
||||
try {s.close();} catch (Exception ex) {}
|
||||
}
|
||||
}
|
||||
if (key != null) {
|
||||
key.cancel();
|
||||
}
|
||||
@ -253,7 +254,7 @@ public class TNonblockingMultiFetchClient {
|
||||
// wait for events
|
||||
while (stats.getNumReadCompletedServers() +
|
||||
stats.getNumConnectErrorServers() < stats.getNumTotalServers()) {
|
||||
// if the thread is interrupted (e.g., task is cancelled)
|
||||
// if the thread is interrupted (e.g., task is cancelled)
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
return;
|
||||
}
|
||||
@ -380,4 +381,4 @@ public class TNonblockingMultiFetchClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.transport.TIOStreamTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* Generic utility for easily serializing objects into a byte array or Java
|
||||
@ -42,7 +43,7 @@ public class TSerializer {
|
||||
/**
|
||||
* This transport wraps that byte array
|
||||
*/
|
||||
private final TIOStreamTransport transport_ = new TIOStreamTransport(baos_);
|
||||
private final TIOStreamTransport transport_;
|
||||
|
||||
/**
|
||||
* Internal protocol used for serializing objects.
|
||||
@ -52,7 +53,7 @@ public class TSerializer {
|
||||
/**
|
||||
* Create a new TSerializer that uses the TBinaryProtocol by default.
|
||||
*/
|
||||
public TSerializer() {
|
||||
public TSerializer() throws TTransportException {
|
||||
this(new TBinaryProtocol.Factory());
|
||||
}
|
||||
|
||||
@ -62,7 +63,8 @@ public class TSerializer {
|
||||
*
|
||||
* @param protocolFactory Factory to create a protocol
|
||||
*/
|
||||
public TSerializer(TProtocolFactory protocolFactory) {
|
||||
public TSerializer(TProtocolFactory protocolFactory) throws TTransportException {
|
||||
transport_ = new TIOStreamTransport(new TConfiguration(), baos_);
|
||||
protocol_ = protocolFactory.getProtocol(transport_);
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.TMemoryBuffer;
|
||||
import org.apache.thrift.transport.TNonblockingTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
@ -98,7 +98,7 @@ public abstract class TAsyncMethodCall<T> {
|
||||
protected long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
|
||||
protected long getSequenceId() {
|
||||
return sequenceId;
|
||||
}
|
||||
@ -106,11 +106,11 @@ public abstract class TAsyncMethodCall<T> {
|
||||
public TAsyncClient getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
|
||||
public boolean hasTimeout() {
|
||||
return timeout > 0;
|
||||
}
|
||||
|
||||
|
||||
public long getTimeoutTimestamp() {
|
||||
return timeout + startTime;
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* Binary protocol implementation for thrift.
|
||||
@ -279,6 +280,8 @@ public class TBinaryProtocol extends TProtocol {
|
||||
@Override
|
||||
public TMap readMapBegin() throws TException {
|
||||
TMap map = new TMap(readByte(), readByte(), readI32());
|
||||
|
||||
checkReadBytesAvailable(map);
|
||||
checkContainerReadLength(map.size);
|
||||
return map;
|
||||
}
|
||||
@ -289,6 +292,8 @@ public class TBinaryProtocol extends TProtocol {
|
||||
@Override
|
||||
public TList readListBegin() throws TException {
|
||||
TList list = new TList(readByte(), readI32());
|
||||
|
||||
checkReadBytesAvailable(list);
|
||||
checkContainerReadLength(list.size);
|
||||
return list;
|
||||
}
|
||||
@ -299,6 +304,8 @@ public class TBinaryProtocol extends TProtocol {
|
||||
@Override
|
||||
public TSet readSetBegin() throws TException {
|
||||
TSet set = new TSet(readByte(), readI32());
|
||||
|
||||
checkReadBytesAvailable(set);
|
||||
checkContainerReadLength(set.size);
|
||||
return set;
|
||||
}
|
||||
@ -393,8 +400,6 @@ public class TBinaryProtocol extends TProtocol {
|
||||
public String readString() throws TException {
|
||||
int size = readI32();
|
||||
|
||||
checkStringReadLength(size);
|
||||
|
||||
if (trans_.getBytesRemainingInBuffer() >= size) {
|
||||
String s = new String(trans_.getBuffer(), trans_.getBufferPosition(),
|
||||
size, StandardCharsets.UTF_8);
|
||||
@ -429,11 +434,14 @@ public class TBinaryProtocol extends TProtocol {
|
||||
return ByteBuffer.wrap(buf);
|
||||
}
|
||||
|
||||
private void checkStringReadLength(int length) throws TProtocolException {
|
||||
private void checkStringReadLength(int length) throws TException {
|
||||
if (length < 0) {
|
||||
throw new TProtocolException(TProtocolException.NEGATIVE_SIZE,
|
||||
"Negative length: " + length);
|
||||
}
|
||||
|
||||
getTransport().checkReadBytesAvailable(length);
|
||||
|
||||
if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) {
|
||||
throw new TProtocolException(TProtocolException.SIZE_LIMIT,
|
||||
"Length exceeded max allowed: " + length);
|
||||
@ -454,4 +462,28 @@ public class TBinaryProtocol extends TProtocol {
|
||||
private int readAll(byte[] buf, int off, int len) throws TException {
|
||||
return trans_.readAll(buf, off, len);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Return the minimum number of bytes a type will consume on the wire
|
||||
*/
|
||||
public int getMinSerializedSize(byte type) throws TTransportException {
|
||||
switch (type)
|
||||
{
|
||||
case 0: return 0; // Stop
|
||||
case 1: return 0; // Void
|
||||
case 2: return 1; // Bool sizeof(byte)
|
||||
case 3: return 1; // Byte sizeof(byte)
|
||||
case 4: return 8; // Double sizeof(double)
|
||||
case 6: return 2; // I16 sizeof(short)
|
||||
case 8: return 4; // I32 sizeof(int)
|
||||
case 10: return 8;// I64 sizeof(long)
|
||||
case 11: return 4; // string length sizeof(int)
|
||||
case 12: return 0; // empty struct
|
||||
case 13: return 4; // element count Map sizeof(int)
|
||||
case 14: return 4; // element count Set sizeof(int)
|
||||
case 15: return 4; // element count List sizeof(int)
|
||||
default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* TCompactProtocol2 is the Java implementation of the compact protocol specified
|
||||
@ -579,7 +580,9 @@ public class TCompactProtocol extends TProtocol {
|
||||
int size = readVarint32();
|
||||
checkContainerReadLength(size);
|
||||
byte keyAndValueType = size == 0 ? 0 : readByte();
|
||||
return new TMap(getTType((byte)(keyAndValueType >> 4)), getTType((byte)(keyAndValueType & 0xf)), size);
|
||||
TMap map = new TMap(getTType((byte)(keyAndValueType >> 4)), getTType((byte)(keyAndValueType & 0xf)), size);
|
||||
checkReadBytesAvailable(map);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -595,8 +598,9 @@ public class TCompactProtocol extends TProtocol {
|
||||
size = readVarint32();
|
||||
}
|
||||
checkContainerReadLength(size);
|
||||
byte type = getTType(size_and_type);
|
||||
return new TList(type, size);
|
||||
TList list = new TList(getTType(size_and_type), size);
|
||||
checkReadBytesAvailable(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -694,9 +698,9 @@ public class TCompactProtocol extends TProtocol {
|
||||
*/
|
||||
public ByteBuffer readBinary() throws TException {
|
||||
int length = readVarint32();
|
||||
checkStringReadLength(length);
|
||||
|
||||
if (length == 0) return EMPTY_BUFFER;
|
||||
|
||||
getTransport().checkReadBytesAvailable(length);
|
||||
if (trans_.getBytesRemainingInBuffer() >= length) {
|
||||
ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), length);
|
||||
trans_.consumeBuffer(length);
|
||||
@ -719,11 +723,14 @@ public class TCompactProtocol extends TProtocol {
|
||||
return buf;
|
||||
}
|
||||
|
||||
private void checkStringReadLength(int length) throws TProtocolException {
|
||||
private void checkStringReadLength(int length) throws TException {
|
||||
if (length < 0) {
|
||||
throw new TProtocolException(TProtocolException.NEGATIVE_SIZE,
|
||||
"Negative length: " + length);
|
||||
}
|
||||
|
||||
getTransport().checkReadBytesAvailable(length);
|
||||
|
||||
if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) {
|
||||
throw new TProtocolException(TProtocolException.SIZE_LIMIT,
|
||||
"Length exceeded max allowed: " + length);
|
||||
@ -901,4 +908,40 @@ public class TCompactProtocol extends TProtocol {
|
||||
private byte getCompactType(byte ttype) {
|
||||
return ttypeToCompactType[ttype];
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the minimum number of bytes a type will consume on the wire
|
||||
*/
|
||||
public int getMinSerializedSize(byte type) throws TTransportException {
|
||||
switch (type) {
|
||||
case 0:
|
||||
return 0; // Stop
|
||||
case 1:
|
||||
return 0; // Void
|
||||
case 2:
|
||||
return 1; // Bool sizeof(byte)
|
||||
case 3:
|
||||
return 1; // Byte sizeof(byte)
|
||||
case 4:
|
||||
return 8; // Double sizeof(double)
|
||||
case 6:
|
||||
return 1; // I16 sizeof(byte)
|
||||
case 8:
|
||||
return 1; // I32 sizeof(byte)
|
||||
case 10:
|
||||
return 1;// I64 sizeof(byte)
|
||||
case 11:
|
||||
return 1; // string length sizeof(byte)
|
||||
case 12:
|
||||
return 0; // empty struct
|
||||
case 13:
|
||||
return 1; // element count Map sizeof(byte)
|
||||
case 14:
|
||||
return 1; // element count Set sizeof(byte)
|
||||
case 15:
|
||||
return 1; // element count List sizeof(byte)
|
||||
default:
|
||||
throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import java.util.Stack;
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* JSON protocol implementation for thrift.
|
||||
@ -591,17 +592,17 @@ public class TJSONProtocol extends TProtocol {
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws TException {
|
||||
writeJSONInteger((long)b);
|
||||
writeJSONInteger(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeI16(short i16) throws TException {
|
||||
writeJSONInteger((long)i16);
|
||||
writeJSONInteger(i16);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeI32(int i32) throws TException {
|
||||
writeJSONInteger((long)i32);
|
||||
writeJSONInteger(i32);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -895,7 +896,10 @@ public class TJSONProtocol extends TProtocol {
|
||||
byte valueType = getTypeIDForTypeName(readJSONString(false).get());
|
||||
int size = (int)readJSONInteger();
|
||||
readJSONObjectStart();
|
||||
return new TMap(keyType, valueType, size);
|
||||
TMap map = new TMap(keyType, valueType, size);
|
||||
|
||||
checkReadBytesAvailable(map);
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -909,7 +913,10 @@ public class TJSONProtocol extends TProtocol {
|
||||
readJSONArrayStart();
|
||||
byte elemType = getTypeIDForTypeName(readJSONString(false).get());
|
||||
int size = (int)readJSONInteger();
|
||||
return new TList(elemType, size);
|
||||
TList list = new TList(elemType, size);
|
||||
|
||||
checkReadBytesAvailable(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -922,7 +929,10 @@ public class TJSONProtocol extends TProtocol {
|
||||
readJSONArrayStart();
|
||||
byte elemType = getTypeIDForTypeName(readJSONString(false).get());
|
||||
int size = (int)readJSONInteger();
|
||||
return new TSet(elemType, size);
|
||||
TSet set = new TSet(elemType, size);
|
||||
|
||||
checkReadBytesAvailable(set);
|
||||
return set;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -932,7 +942,7 @@ public class TJSONProtocol extends TProtocol {
|
||||
|
||||
@Override
|
||||
public boolean readBool() throws TException {
|
||||
return (readJSONInteger() == 0 ? false : true);
|
||||
return (readJSONInteger() != 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -952,7 +962,7 @@ public class TJSONProtocol extends TProtocol {
|
||||
|
||||
@Override
|
||||
public long readI64() throws TException {
|
||||
return (long) readJSONInteger();
|
||||
return readJSONInteger();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -962,7 +972,9 @@ public class TJSONProtocol extends TProtocol {
|
||||
|
||||
@Override
|
||||
public String readString() throws TException {
|
||||
return readJSONString(false).toString(StandardCharsets.UTF_8);
|
||||
String str = readJSONString(false).toString(StandardCharsets.UTF_8);
|
||||
getTransport().checkReadBytesAvailable(str.length() * getMinSerializedSize(TType.STRING));
|
||||
return str;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -970,4 +982,28 @@ public class TJSONProtocol extends TProtocol {
|
||||
return ByteBuffer.wrap(readJSONBase64());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Return the minimum number of bytes a type will consume on the wire
|
||||
*/
|
||||
public int getMinSerializedSize(byte type) throws TTransportException {
|
||||
switch (type)
|
||||
{
|
||||
case 0: return 0; // Stop
|
||||
case 1: return 0; // Void
|
||||
case 2: return 1; // Bool
|
||||
case 3: return 1; // Byte
|
||||
case 4: return 1; // Double
|
||||
case 6: return 1; // I16
|
||||
case 8: return 1; // I32
|
||||
case 10: return 1;// I64
|
||||
case 11: return 2; // string length
|
||||
case 12: return 2; // empty struct
|
||||
case 13: return 2; // element count Map
|
||||
case 14: return 2; // element count Set
|
||||
case 15: return 2; // element count List
|
||||
default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -57,6 +57,27 @@ public abstract class TProtocol {
|
||||
return trans_;
|
||||
}
|
||||
|
||||
protected void checkReadBytesAvailable(TMap map) throws TException {
|
||||
long elemSize = getMinSerializedSize(map.keyType) + getMinSerializedSize(map.valueType);
|
||||
trans_.checkReadBytesAvailable(map.size * elemSize);
|
||||
}
|
||||
|
||||
protected void checkReadBytesAvailable(TList list) throws TException {
|
||||
trans_.checkReadBytesAvailable(list.size * getMinSerializedSize(list.elemType));
|
||||
}
|
||||
|
||||
protected void checkReadBytesAvailable(TSet set) throws TException {
|
||||
trans_.checkReadBytesAvailable(set.size * getMinSerializedSize(set.elemType));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return
|
||||
* @param type Returns the minimum amount of bytes needed to store the smallest possible instance of TType.
|
||||
* @return
|
||||
* @throws TException
|
||||
*/
|
||||
public abstract int getMinSerializedSize(byte type) throws TException;
|
||||
|
||||
/**
|
||||
* Writing methods.
|
||||
*/
|
||||
@ -152,7 +173,7 @@ public abstract class TProtocol {
|
||||
* be implemented for stateful protocols.
|
||||
*/
|
||||
public void reset() {}
|
||||
|
||||
|
||||
/**
|
||||
* Scheme accessor
|
||||
*/
|
||||
|
@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
|
||||
* the behaviour of the enclosed <code>TProtocol</code>.
|
||||
*
|
||||
* <p>See p.175 of Design Patterns (by Gamma et al.)</p>
|
||||
*
|
||||
*
|
||||
* @see org.apache.thrift.protocol.TMultiplexedProtocol
|
||||
*/
|
||||
public abstract class TProtocolDecorator extends TProtocol {
|
||||
@ -210,4 +210,14 @@ public abstract class TProtocolDecorator extends TProtocol {
|
||||
public ByteBuffer readBinary() throws TException {
|
||||
return concreteProtocol.readBinary();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param type Returns the minimum amount of bytes needed to store the smallest possible instance of TType.
|
||||
* @return
|
||||
* @throws TException
|
||||
*/
|
||||
public int getMinSerializedSize(byte type) throws TException {
|
||||
return concreteProtocol.getMinSerializedSize(type);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import java.util.Stack;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
/**
|
||||
* JSON protocol implementation for thrift.
|
||||
@ -480,4 +481,28 @@ public class TSimpleJSONProtocol extends TProtocol {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Return the minimum number of bytes a type will consume on the wire
|
||||
*/
|
||||
public int getMinSerializedSize(byte type) throws TException {
|
||||
switch (type)
|
||||
{
|
||||
case 0: return 0; // Stop
|
||||
case 1: return 0; // Void
|
||||
case 2: return 1; // Bool
|
||||
case 3: return 1; // Byte
|
||||
case 4: return 1; // Double
|
||||
case 6: return 1; // I16
|
||||
case 8: return 1; // I32
|
||||
case 10: return 1;// I64
|
||||
case 11: return 2; // string length
|
||||
case 12: return 2; // empty struct
|
||||
case 13: return 2; // element count Map
|
||||
case 14: return 2; // element count Set
|
||||
case 15: return 2; // element count List
|
||||
default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -80,9 +80,9 @@ public final class TTupleProtocol extends TCompactProtocol {
|
||||
* extension). The byte-ordering of the result is big-endian which means the
|
||||
* most significant bit is in element 0. The bit at index 0 of the bit set is
|
||||
* assumed to be the least significant bit.
|
||||
*
|
||||
*
|
||||
* @param bits
|
||||
* @param vectorWidth
|
||||
* @param vectorWidth
|
||||
* @return a byte array of at least length 1
|
||||
*/
|
||||
public static byte[] toByteArray(BitSet bits, int vectorWidth) {
|
||||
@ -95,4 +95,27 @@ public final class TTupleProtocol extends TCompactProtocol {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
public TMap readMapBegin(byte keyType, byte valTyep) throws TException {
|
||||
int size = super.readI32();
|
||||
TMap map = new TMap(keyType, valTyep, size);
|
||||
|
||||
checkReadBytesAvailable(map);
|
||||
return map;
|
||||
}
|
||||
|
||||
public TList readListBegin(byte type) throws TException {
|
||||
int size = super.readI32();
|
||||
TList list = new TList(type, size);
|
||||
|
||||
checkReadBytesAvailable(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
public TSet readSetBegin(byte type) throws TException {
|
||||
return new TSet(readListBegin(type));
|
||||
}
|
||||
|
||||
public void readMapEnd() throws TException {}
|
||||
public void readListEnd() throws TException {}
|
||||
public void readSetEnd() throws TException {}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import org.apache.thrift.TAsyncProcessor;
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.TIOStreamTransport;
|
||||
import org.apache.thrift.transport.TMemoryInputTransport;
|
||||
import org.apache.thrift.transport.TNonblockingServerTransport;
|
||||
@ -305,7 +305,7 @@ public abstract class AbstractNonblockingServer extends TServer {
|
||||
|
||||
public FrameBuffer(final TNonblockingTransport trans,
|
||||
final SelectionKey selectionKey,
|
||||
final AbstractSelectThread selectThread) {
|
||||
final AbstractSelectThread selectThread) throws TTransportException {
|
||||
trans_ = trans;
|
||||
selectionKey_ = selectionKey;
|
||||
selectThread_ = selectThread;
|
||||
@ -542,10 +542,7 @@ public abstract class AbstractNonblockingServer extends TServer {
|
||||
*/
|
||||
private boolean internalRead() {
|
||||
try {
|
||||
if (trans_.read(buffer_) < 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return trans_.read(buffer_) >= 0;
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Got an IOException in internalRead!", e);
|
||||
return false;
|
||||
@ -582,7 +579,7 @@ public abstract class AbstractNonblockingServer extends TServer {
|
||||
} // FrameBuffer
|
||||
|
||||
public class AsyncFrameBuffer extends FrameBuffer {
|
||||
public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
|
||||
public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException {
|
||||
super(trans, selectionKey, selectThread);
|
||||
}
|
||||
|
||||
|
@ -215,7 +215,7 @@ public class TNonblockingServer extends AbstractNonblockingServer {
|
||||
|
||||
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
|
||||
final SelectionKey selectionKey,
|
||||
final AbstractSelectThread selectThread) {
|
||||
final AbstractSelectThread selectThread) throws TTransportException {
|
||||
return processorFactory_.isAsyncProcessor() ?
|
||||
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
|
||||
new FrameBuffer(trans, selectionKey, selectThread);
|
||||
@ -229,7 +229,7 @@ public class TNonblockingServer extends AbstractNonblockingServer {
|
||||
TNonblockingTransport client = null;
|
||||
try {
|
||||
// accept the connection
|
||||
client = (TNonblockingTransport)serverTransport.accept();
|
||||
client = serverTransport.accept();
|
||||
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
|
||||
|
||||
// add this key to the map
|
||||
|
@ -457,7 +457,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
|
||||
|
||||
private TNonblockingTransport doAccept() {
|
||||
try {
|
||||
return (TNonblockingTransport) serverTransport.accept();
|
||||
return serverTransport.accept();
|
||||
} catch (TTransportException tte) {
|
||||
// something went wrong accepting.
|
||||
LOGGER.warn("Exception trying to accept!", tte);
|
||||
@ -685,7 +685,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
|
||||
|
||||
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
|
||||
final SelectionKey selectionKey,
|
||||
final AbstractSelectThread selectThread) {
|
||||
final AbstractSelectThread selectThread) throws TTransportException {
|
||||
return processorFactory_.isAsyncProcessor() ?
|
||||
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
|
||||
new FrameBuffer(trans, selectionKey, selectThread);
|
||||
@ -699,7 +699,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
|
||||
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
|
||||
|
||||
clientKey.attach(frameBuffer);
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | TTransportException e) {
|
||||
LOGGER.warn("Failed to register accepted connection to selector!", e);
|
||||
if (clientKey != null) {
|
||||
cleanupSelectionKey(clientKey);
|
||||
|
@ -27,7 +27,7 @@ import java.util.Arrays;
|
||||
* rate slightly faster than the requested capacity with the (untested)
|
||||
* objective of avoiding expensive buffer allocations and copies.
|
||||
*/
|
||||
class AutoExpandingBuffer {
|
||||
public class AutoExpandingBuffer {
|
||||
private byte[] array;
|
||||
|
||||
public AutoExpandingBuffer(int initialCapacity) {
|
||||
|
@ -18,17 +18,20 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
/**
|
||||
* TTransport for reading from an AutoExpandingBuffer.
|
||||
*/
|
||||
public class AutoExpandingBufferReadTransport extends TTransport {
|
||||
public class AutoExpandingBufferReadTransport extends TEndpointTransport {
|
||||
|
||||
private final AutoExpandingBuffer buf;
|
||||
|
||||
private int pos = 0;
|
||||
private int limit = 0;
|
||||
|
||||
public AutoExpandingBufferReadTransport(int initialCapacity) {
|
||||
public AutoExpandingBufferReadTransport(TConfiguration config, int initialCapacity) throws TTransportException {
|
||||
super(config);
|
||||
this.buf = new AutoExpandingBuffer(initialCapacity);
|
||||
}
|
||||
|
||||
|
@ -18,10 +18,12 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
/**
|
||||
* TTransport for writing to an AutoExpandingBuffer.
|
||||
*/
|
||||
public final class AutoExpandingBufferWriteTransport extends TTransport {
|
||||
public final class AutoExpandingBufferWriteTransport extends TEndpointTransport {
|
||||
|
||||
private final AutoExpandingBuffer buf;
|
||||
private int pos;
|
||||
@ -38,7 +40,8 @@ public final class AutoExpandingBufferWriteTransport extends TTransport {
|
||||
* @throws IllegalArgumentException if frontReserve is less than zero
|
||||
* @throws IllegalArgumentException if frontReserve is greater than initialCapacity
|
||||
*/
|
||||
public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) {
|
||||
public AutoExpandingBufferWriteTransport(TConfiguration config, int initialCapacity, int frontReserve) throws TTransportException {
|
||||
super(config);
|
||||
if (initialCapacity < 1) {
|
||||
throw new IllegalArgumentException("initialCapacity");
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -7,14 +9,16 @@ import java.nio.ByteBuffer;
|
||||
/**
|
||||
* ByteBuffer-backed implementation of TTransport.
|
||||
*/
|
||||
public final class TByteBuffer extends TTransport {
|
||||
public final class TByteBuffer extends TEndpointTransport {
|
||||
private final ByteBuffer byteBuffer;
|
||||
|
||||
/**
|
||||
* Creates a new TByteBuffer wrapping a given NIO ByteBuffer.
|
||||
*/
|
||||
public TByteBuffer(ByteBuffer byteBuffer) {
|
||||
public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
this.byteBuffer = byteBuffer;
|
||||
updateKnownMessageSize(byteBuffer.capacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -32,6 +36,9 @@ public final class TByteBuffer extends TTransport {
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
//
|
||||
checkReadBytesAvailable(len);
|
||||
|
||||
final int n = Math.min(byteBuffer.remaining(), len);
|
||||
if (n > 0) {
|
||||
try {
|
||||
|
100
lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
Normal file
100
lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
Normal file
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class TEndpointTransport extends TTransport{
|
||||
|
||||
protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); }
|
||||
|
||||
protected long knownMessageSize;
|
||||
protected long remainingMessageSize;
|
||||
|
||||
private TConfiguration _configuration;
|
||||
public TConfiguration getConfiguration() {
|
||||
return _configuration;
|
||||
}
|
||||
|
||||
public TEndpointTransport( TConfiguration config) throws TTransportException {
|
||||
_configuration = Objects.isNull(config) ? new TConfiguration() : config;
|
||||
|
||||
resetConsumedMessageSize(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets RemainingMessageSize to the configured maximum
|
||||
* @param newSize
|
||||
*/
|
||||
protected void resetConsumedMessageSize(long newSize) throws TTransportException {
|
||||
// full reset
|
||||
if (newSize < 0)
|
||||
{
|
||||
knownMessageSize = getMaxMessageSize();
|
||||
remainingMessageSize = getMaxMessageSize();
|
||||
return;
|
||||
}
|
||||
|
||||
// update only: message size can shrink, but not grow
|
||||
if (newSize > knownMessageSize)
|
||||
throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
|
||||
|
||||
knownMessageSize = newSize;
|
||||
remainingMessageSize = newSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
|
||||
* Will throw if we already consumed too many bytes or if the new size is larger than allowed.
|
||||
* @param size
|
||||
*/
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
long consumed = knownMessageSize - remainingMessageSize;
|
||||
resetConsumedMessageSize(size == 0 ? -1 : size);
|
||||
countConsumedMessageBytes(consumed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
|
||||
* @param numBytes
|
||||
*/
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
if (remainingMessageSize < numBytes)
|
||||
throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes numBytes from the RemainingMessageSize.
|
||||
* @param numBytes
|
||||
*/
|
||||
protected void countConsumedMessageBytes(long numBytes) throws TTransportException {
|
||||
if (remainingMessageSize >= numBytes)
|
||||
{
|
||||
remainingMessageSize -= numBytes;
|
||||
}
|
||||
else
|
||||
{
|
||||
remainingMessageSize = 0;
|
||||
throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -26,13 +26,14 @@ import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* FileTransport implementation of the TTransport interface.
|
||||
* Currently this is a straightforward port of the cpp implementation
|
||||
*
|
||||
*
|
||||
* It may make better sense to provide a basic stream access on top of the framed file format
|
||||
* The FileTransport can then be a user of this framed file format with some additional logic
|
||||
* for chunking.
|
||||
@ -44,7 +45,7 @@ public class TFileTransport extends TTransport {
|
||||
public static class TruncableBufferedInputStream extends BufferedInputStream {
|
||||
public void trunc() {
|
||||
pos = count = 0;
|
||||
}
|
||||
}
|
||||
public TruncableBufferedInputStream(InputStream in) {
|
||||
super(in);
|
||||
}
|
||||
@ -62,7 +63,7 @@ public class TFileTransport extends TTransport {
|
||||
/**
|
||||
* Initialize an event. Initially, it has no valid contents
|
||||
*
|
||||
* @param buf byte array buffer to store event
|
||||
* @param buf byte array buffer to store event
|
||||
*/
|
||||
public Event(byte[] buf) {
|
||||
buf_ = buf;
|
||||
@ -88,9 +89,9 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
return(ndesired);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static class ChunkState {
|
||||
public static class ChunkState {
|
||||
/**
|
||||
* Chunk Size. Must be same across all implementations
|
||||
*/
|
||||
@ -111,7 +112,7 @@ public class TFileTransport extends TTransport {
|
||||
public long getOffset() { return (offset_);}
|
||||
}
|
||||
|
||||
public static enum TailPolicy {
|
||||
public enum TailPolicy {
|
||||
|
||||
NOWAIT(0, 0),
|
||||
WAIT_FOREVER(500, -1);
|
||||
@ -148,13 +149,13 @@ public class TFileTransport extends TTransport {
|
||||
TailPolicy currentPolicy_ = TailPolicy.NOWAIT;
|
||||
|
||||
|
||||
/**
|
||||
/**
|
||||
* Underlying file being read
|
||||
*/
|
||||
protected TSeekableFile inputFile_ = null;
|
||||
|
||||
/**
|
||||
* Underlying outputStream
|
||||
/**
|
||||
* Underlying outputStream
|
||||
*/
|
||||
protected OutputStream outputStream_ = null;
|
||||
|
||||
@ -181,7 +182,7 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
/**
|
||||
* Get File Tailing Policy
|
||||
*
|
||||
*
|
||||
* @return current read policy
|
||||
*/
|
||||
public TailPolicy getTailPolicy() {
|
||||
@ -190,7 +191,7 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
/**
|
||||
* Set file Tailing Policy
|
||||
*
|
||||
*
|
||||
* @param policy New policy to set
|
||||
* @return Old policy
|
||||
*/
|
||||
@ -203,7 +204,7 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
/**
|
||||
* Initialize read input stream
|
||||
*
|
||||
*
|
||||
* @return input stream to read from file
|
||||
*/
|
||||
private InputStream createInputStream() throws TTransportException {
|
||||
@ -223,7 +224,7 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
/**
|
||||
* Read (potentially tailing) an input stream
|
||||
*
|
||||
*
|
||||
* @param is InputStream to read from
|
||||
* @param buf Buffer to read into
|
||||
* @param off Offset in buffer to read into
|
||||
@ -232,7 +233,7 @@ public class TFileTransport extends TTransport {
|
||||
*
|
||||
* @return number of bytes read
|
||||
*/
|
||||
private int tailRead(InputStream is, byte[] buf,
|
||||
private int tailRead(InputStream is, byte[] buf,
|
||||
int off, int len, TailPolicy tp) throws TTransportException {
|
||||
int orig_len = len;
|
||||
try {
|
||||
@ -322,7 +323,7 @@ public class TFileTransport extends TTransport {
|
||||
// check if event is corrupted and do recovery as required
|
||||
if(esize > cs.getRemaining()) {
|
||||
throw new TTransportException("FileTransport error: bad event size");
|
||||
/*
|
||||
/*
|
||||
if(performRecovery()) {
|
||||
esize=0;
|
||||
} else {
|
||||
@ -361,7 +362,7 @@ public class TFileTransport extends TTransport {
|
||||
* Files are not opened in ctor - but in explicit open call
|
||||
*/
|
||||
public void open() throws TTransportException {
|
||||
if (isOpen())
|
||||
if (isOpen())
|
||||
throw new TTransportException(TTransportException.ALREADY_OPEN);
|
||||
|
||||
try {
|
||||
@ -406,7 +407,7 @@ public class TFileTransport extends TTransport {
|
||||
*
|
||||
* @param path File path to read and write from
|
||||
* @param readOnly Whether this is a read-only transport
|
||||
*/
|
||||
*/
|
||||
public TFileTransport(final String path, boolean readOnly) throws IOException {
|
||||
inputFile_ = new TStandardFile(path);
|
||||
readOnly_ = readOnly;
|
||||
@ -457,8 +458,8 @@ public class TFileTransport extends TTransport {
|
||||
* @throws TTransportException if there was an error reading data
|
||||
*/
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before reading");
|
||||
|
||||
if(currentEvent_.getRemaining() == 0) {
|
||||
@ -471,14 +472,14 @@ public class TFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
public int getNumChunks() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before getNumChunks");
|
||||
try {
|
||||
long len = inputFile_.length();
|
||||
if(len == 0)
|
||||
return 0;
|
||||
else
|
||||
else
|
||||
return (((int)(len/cs.getChunkSize())) + 1);
|
||||
|
||||
} catch (IOException iox) {
|
||||
@ -487,8 +488,8 @@ public class TFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
public int getCurChunk() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before getCurChunk");
|
||||
return (cs.getChunkNum());
|
||||
|
||||
@ -496,8 +497,8 @@ public class TFileTransport extends TTransport {
|
||||
|
||||
|
||||
public void seekToChunk(int chunk) throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before seeking");
|
||||
|
||||
int numChunks = getNumChunks();
|
||||
@ -527,7 +528,7 @@ public class TFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
if(chunk*cs.getChunkSize() != cs.getOffset()) {
|
||||
try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
|
||||
try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
|
||||
catch (IOException iox) {
|
||||
throw new TTransportException("Seek to chunk " +
|
||||
chunk + " " +iox.getMessage(), iox);
|
||||
@ -549,8 +550,8 @@ public class TFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
public void seekToEnd() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before seeking");
|
||||
seekToChunk(getNumChunks());
|
||||
}
|
||||
@ -577,9 +578,25 @@ public class TFileTransport extends TTransport {
|
||||
throw new TTransportException("Not Supported");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TConfiguration getConfiguration() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* test program
|
||||
*
|
||||
*
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
@ -594,7 +611,7 @@ public class TFileTransport extends TTransport {
|
||||
try {
|
||||
num_chunks = Integer.parseInt(args[1]);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Cannot parse " + args[1]);
|
||||
LOGGER.error("Cannot parse " + args[1]);
|
||||
printUsage();
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.params.CoreConnectionPNames;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
/**
|
||||
* HTTP implementation of the TTransport interface. Used for working with a
|
||||
@ -51,7 +52,7 @@ import org.apache.http.params.CoreConnectionPNames;
|
||||
* HttpClient to THttpClient(String url, HttpClient client) will create an
|
||||
* instance which will use HttpURLConnection.
|
||||
*
|
||||
* When using HttpClient, the following configuration leads to 5-15%
|
||||
* When using HttpClient, the following configuration leads to 5-15%
|
||||
* better performance than the HttpURLConnection implementation:
|
||||
*
|
||||
* http.protocol.version=HttpVersion.HTTP_1_1
|
||||
@ -65,7 +66,7 @@ import org.apache.http.params.CoreConnectionPNames;
|
||||
* @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
|
||||
*/
|
||||
|
||||
public class THttpClient extends TTransport {
|
||||
public class THttpClient extends TEndpointTransport {
|
||||
|
||||
private URL url_ = null;
|
||||
|
||||
@ -80,14 +81,14 @@ public class THttpClient extends TTransport {
|
||||
private Map<String,String> customHeaders_ = null;
|
||||
|
||||
private final HttpHost host;
|
||||
|
||||
|
||||
private final HttpClient client;
|
||||
|
||||
|
||||
public static class Factory extends TTransportFactory {
|
||||
|
||||
|
||||
private final String url;
|
||||
private final HttpClient client;
|
||||
|
||||
|
||||
public Factory(String url) {
|
||||
this.url = url;
|
||||
this.client = null;
|
||||
@ -97,14 +98,14 @@ public class THttpClient extends TTransport {
|
||||
this.url = url;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TTransport getTransport(TTransport trans) {
|
||||
try {
|
||||
if (null != client) {
|
||||
return new THttpClient(url, client);
|
||||
return new THttpClient(trans.getConfiguration(), url, client);
|
||||
} else {
|
||||
return new THttpClient(url);
|
||||
return new THttpClient(trans.getConfiguration(), url);
|
||||
}
|
||||
} catch (TTransportException tte) {
|
||||
return null;
|
||||
@ -112,7 +113,8 @@ public class THttpClient extends TTransport {
|
||||
}
|
||||
}
|
||||
|
||||
public THttpClient(String url) throws TTransportException {
|
||||
public THttpClient(TConfiguration config, String url) throws TTransportException {
|
||||
super(config);
|
||||
try {
|
||||
url_ = new URL(url);
|
||||
this.client = null;
|
||||
@ -122,7 +124,30 @@ public class THttpClient extends TTransport {
|
||||
}
|
||||
}
|
||||
|
||||
public THttpClient(String url) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
try {
|
||||
url_ = new URL(url);
|
||||
this.client = null;
|
||||
this.host = null;
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox);
|
||||
}
|
||||
}
|
||||
|
||||
public THttpClient(TConfiguration config, String url, HttpClient client) throws TTransportException {
|
||||
super(config);
|
||||
try {
|
||||
url_ = new URL(url);
|
||||
this.client = client;
|
||||
this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox);
|
||||
}
|
||||
}
|
||||
|
||||
public THttpClient(String url, HttpClient client) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
try {
|
||||
url_ = new URL(url);
|
||||
this.client = client;
|
||||
@ -168,7 +193,6 @@ public class THttpClient extends TTransport {
|
||||
try {
|
||||
inputStream_.close();
|
||||
} catch (IOException ioe) {
|
||||
;
|
||||
}
|
||||
inputStream_ = null;
|
||||
}
|
||||
@ -182,11 +206,16 @@ public class THttpClient extends TTransport {
|
||||
if (inputStream_ == null) {
|
||||
throw new TTransportException("Response buffer is empty, no request.");
|
||||
}
|
||||
|
||||
checkReadBytesAvailable(len);
|
||||
|
||||
try {
|
||||
int ret = inputStream_.read(buf, off, len);
|
||||
if (ret == -1) {
|
||||
throw new TTransportException("No more data available.");
|
||||
}
|
||||
countConsumedMessageBytes(ret);
|
||||
|
||||
return ret;
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox);
|
||||
@ -214,7 +243,7 @@ public class THttpClient extends TTransport {
|
||||
}
|
||||
|
||||
private void flushUsingHttpClient() throws TTransportException {
|
||||
|
||||
|
||||
if (null == this.client) {
|
||||
throw new TTransportException("Null HttpClient, aborting.");
|
||||
}
|
||||
@ -224,22 +253,22 @@ public class THttpClient extends TTransport {
|
||||
requestBuffer_.reset();
|
||||
|
||||
HttpPost post = null;
|
||||
|
||||
|
||||
InputStream is = null;
|
||||
|
||||
try {
|
||||
|
||||
try {
|
||||
// Set request to path + query string
|
||||
post = new HttpPost(this.url_.getFile());
|
||||
|
||||
|
||||
//
|
||||
// Headers are added to the HttpPost instance, not
|
||||
// to HttpClient.
|
||||
//
|
||||
|
||||
|
||||
post.setHeader("Content-Type", "application/x-thrift");
|
||||
post.setHeader("Accept", "application/x-thrift");
|
||||
post.setHeader("User-Agent", "Java/THttpClient/HC");
|
||||
|
||||
|
||||
if (null != customHeaders_) {
|
||||
for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
|
||||
post.setHeader(header.getKey(), header.getValue());
|
||||
@ -247,17 +276,17 @@ public class THttpClient extends TTransport {
|
||||
}
|
||||
|
||||
post.setEntity(new ByteArrayEntity(data));
|
||||
|
||||
|
||||
HttpResponse response = this.client.execute(this.host, post);
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
|
||||
//
|
||||
//
|
||||
// Retrieve the inputstream BEFORE checking the status code so
|
||||
// resources get freed in the finally clause.
|
||||
//
|
||||
|
||||
is = response.getEntity().getContent();
|
||||
|
||||
|
||||
if (responseCode != HttpStatus.SC_OK) {
|
||||
throw new TTransportException("HTTP Response code: " + responseCode);
|
||||
}
|
||||
@ -268,10 +297,10 @@ public class THttpClient extends TTransport {
|
||||
// thrift struct is being read up the chain).
|
||||
// Proceeding differently might lead to exhaustion of connections and thus
|
||||
// to app failure.
|
||||
|
||||
|
||||
byte[] buf = new byte[1024];
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
||||
|
||||
int len = 0;
|
||||
do {
|
||||
len = is.read(buf);
|
||||
@ -279,7 +308,7 @@ public class THttpClient extends TTransport {
|
||||
baos.write(buf, 0, len);
|
||||
}
|
||||
} while (-1 != len);
|
||||
|
||||
|
||||
try {
|
||||
// Indicate we're done with the content.
|
||||
consume(response.getEntity());
|
||||
@ -287,7 +316,7 @@ public class THttpClient extends TTransport {
|
||||
// We ignore this exception, it might only mean the server has no
|
||||
// keep-alive capability.
|
||||
}
|
||||
|
||||
|
||||
inputStream_ = new ByteArrayInputStream(baos.toByteArray());
|
||||
} catch (IOException ioe) {
|
||||
// Abort method so the connection gets released back to the connection manager
|
||||
@ -296,6 +325,7 @@ public class THttpClient extends TTransport {
|
||||
}
|
||||
throw new TTransportException(ioe);
|
||||
} finally {
|
||||
resetConsumedMessageSize(-1);
|
||||
if (null != is) {
|
||||
// Close the entity's input stream, this will release the underlying connection
|
||||
try {
|
||||
@ -357,6 +387,8 @@ public class THttpClient extends TTransport {
|
||||
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox);
|
||||
} finally {
|
||||
resetConsumedMessageSize(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -33,7 +34,7 @@ import java.io.OutputStream;
|
||||
* has to provide a variety of types of streams.
|
||||
*
|
||||
*/
|
||||
public class TIOStreamTransport extends TTransport {
|
||||
public class TIOStreamTransport extends TEndpointTransport {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
|
||||
|
||||
@ -47,23 +48,69 @@ public class TIOStreamTransport extends TTransport {
|
||||
* Subclasses can invoke the default constructor and then assign the input
|
||||
* streams in the open method.
|
||||
*/
|
||||
protected TIOStreamTransport() {}
|
||||
protected TIOStreamTransport(TConfiguration config) throws TTransportException {
|
||||
super(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can invoke the default constructor and then assign the input
|
||||
* streams in the open method.
|
||||
*/
|
||||
protected TIOStreamTransport() throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Input stream constructor, constructs an input only transport.
|
||||
*
|
||||
* @param config
|
||||
* @param is Input stream to read from
|
||||
*/
|
||||
public TIOStreamTransport(TConfiguration config, InputStream is) throws TTransportException {
|
||||
super(config);
|
||||
inputStream_ = is;
|
||||
}
|
||||
/**
|
||||
* Input stream constructor, constructs an input only transport.
|
||||
*
|
||||
* @param is Input stream to read from
|
||||
*/
|
||||
public TIOStreamTransport(InputStream is) {
|
||||
public TIOStreamTransport(InputStream is) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
inputStream_ = is;
|
||||
}
|
||||
|
||||
/**
|
||||
* Output stream constructor, constructs an output only transport.
|
||||
*
|
||||
* @param config
|
||||
* @param os Output stream to write to
|
||||
*/
|
||||
public TIOStreamTransport(TConfiguration config, OutputStream os) throws TTransportException {
|
||||
super(config);
|
||||
outputStream_ = os;
|
||||
}
|
||||
|
||||
/**
|
||||
* Output stream constructor, constructs an output only transport.
|
||||
*
|
||||
* @param os Output stream to write to
|
||||
*/
|
||||
public TIOStreamTransport(OutputStream os) {
|
||||
public TIOStreamTransport(OutputStream os) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
outputStream_ = os;
|
||||
}
|
||||
|
||||
/**
|
||||
* Two-way stream constructor.
|
||||
*
|
||||
* @param config
|
||||
* @param is Input stream to read from
|
||||
* @param os Output stream to read from
|
||||
*/
|
||||
public TIOStreamTransport(TConfiguration config, InputStream is, OutputStream os) throws TTransportException {
|
||||
super(config);
|
||||
inputStream_ = is;
|
||||
outputStream_ = os;
|
||||
}
|
||||
|
||||
@ -73,7 +120,8 @@ public class TIOStreamTransport extends TTransport {
|
||||
* @param is Input stream to read from
|
||||
* @param os Output stream to read from
|
||||
*/
|
||||
public TIOStreamTransport(InputStream is, OutputStream os) {
|
||||
public TIOStreamTransport(InputStream is, OutputStream os) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
inputStream_ = is;
|
||||
outputStream_ = os;
|
||||
}
|
||||
@ -158,6 +206,9 @@ public class TIOStreamTransport extends TTransport {
|
||||
}
|
||||
try {
|
||||
outputStream_.flush();
|
||||
|
||||
resetConsumedMessageSize(-1);
|
||||
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(TTransportException.UNKNOWN, iox);
|
||||
}
|
||||
|
@ -20,12 +20,14 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* Memory buffer-based implementation of the TTransport interface.
|
||||
*/
|
||||
public class TMemoryBuffer extends TTransport {
|
||||
public class TMemoryBuffer extends TEndpointTransport {
|
||||
/**
|
||||
* Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
|
||||
* internal buffer will grow as necessary to accommodate the size of the data
|
||||
@ -33,8 +35,24 @@ public class TMemoryBuffer extends TTransport {
|
||||
*
|
||||
* @param size the initial size of the buffer
|
||||
*/
|
||||
public TMemoryBuffer(int size) {
|
||||
public TMemoryBuffer(int size) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
arr_ = new TByteArrayOutputStream(size);
|
||||
updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
|
||||
* internal buffer will grow as necessary to accommodate the size of the data
|
||||
* being written to it.
|
||||
*
|
||||
* @param config
|
||||
* @param size the initial size of the buffer
|
||||
*/
|
||||
public TMemoryBuffer(TConfiguration config, int size) throws TTransportException {
|
||||
super(config);
|
||||
arr_ = new TByteArrayOutputStream(size);
|
||||
updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -53,9 +71,11 @@ public class TMemoryBuffer extends TTransport {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) {
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
checkReadBytesAvailable(len);
|
||||
byte[] src = arr_.get();
|
||||
int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len);
|
||||
|
||||
if (amtToRead > 0) {
|
||||
System.arraycopy(src, pos_, buf, off, amtToRead);
|
||||
pos_ += amtToRead;
|
||||
|
@ -18,21 +18,38 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
public final class TMemoryInputTransport extends TTransport {
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
public final class TMemoryInputTransport extends TEndpointTransport {
|
||||
|
||||
private byte[] buf_;
|
||||
private int pos_;
|
||||
private int endPos_;
|
||||
|
||||
public TMemoryInputTransport() {
|
||||
public TMemoryInputTransport() throws TTransportException {
|
||||
this(new TConfiguration());
|
||||
}
|
||||
|
||||
public TMemoryInputTransport(byte[] buf) {
|
||||
reset(buf);
|
||||
public TMemoryInputTransport(TConfiguration _configuration) throws TTransportException {
|
||||
this(_configuration, new byte[0]);
|
||||
}
|
||||
|
||||
public TMemoryInputTransport(byte[] buf, int offset, int length) {
|
||||
public TMemoryInputTransport(byte[] buf) throws TTransportException {
|
||||
this(new TConfiguration(), buf);
|
||||
}
|
||||
|
||||
public TMemoryInputTransport(TConfiguration _configuration, byte[] buf) throws TTransportException {
|
||||
this(_configuration, buf, 0, buf.length);
|
||||
}
|
||||
|
||||
public TMemoryInputTransport(byte[] buf, int offset, int length) throws TTransportException {
|
||||
this(new TConfiguration(), buf, offset, length);
|
||||
}
|
||||
|
||||
public TMemoryInputTransport(TConfiguration _configuration, byte[] buf, int offset, int length) throws TTransportException {
|
||||
super(_configuration);
|
||||
reset(buf, offset, length);
|
||||
updateKnownMessageSize(length);
|
||||
}
|
||||
|
||||
public void reset(byte[] buf) {
|
||||
@ -43,10 +60,20 @@ public final class TMemoryInputTransport extends TTransport {
|
||||
buf_ = buf;
|
||||
pos_ = offset;
|
||||
endPos_ = offset + length;
|
||||
try {
|
||||
resetConsumedMessageSize(-1);
|
||||
} catch (TTransportException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
buf_ = null;
|
||||
try {
|
||||
resetConsumedMessageSize(-1);
|
||||
} catch (TTransportException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -67,6 +94,7 @@ public final class TMemoryInputTransport extends TTransport {
|
||||
if (amtToRead > 0) {
|
||||
System.arraycopy(buf_, pos_, buf, off, amtToRead);
|
||||
consumeBuffer(amtToRead);
|
||||
countConsumedMessageBytes(amtToRead);
|
||||
}
|
||||
return amtToRead;
|
||||
}
|
||||
|
@ -22,18 +22,28 @@ package org.apache.thrift.transport;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
/**
|
||||
* In memory transport with separate buffers for input and output.
|
||||
*/
|
||||
public class TMemoryTransport extends TTransport {
|
||||
public class TMemoryTransport extends TEndpointTransport {
|
||||
|
||||
private final ByteBuffer inputBuffer;
|
||||
private final TByteArrayOutputStream outputBuffer;
|
||||
|
||||
public TMemoryTransport(byte[] input) {
|
||||
public TMemoryTransport(byte[] input) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
inputBuffer = ByteBuffer.wrap(input);
|
||||
outputBuffer = new TByteArrayOutputStream(1024);
|
||||
updateKnownMessageSize(input.length);
|
||||
}
|
||||
|
||||
public TMemoryTransport(TConfiguration config, byte[] input) throws TTransportException {
|
||||
super(config);
|
||||
inputBuffer = ByteBuffer.wrap(input);
|
||||
outputBuffer = new TByteArrayOutputStream(1024);
|
||||
updateKnownMessageSize(input.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -56,6 +66,7 @@ public class TMemoryTransport extends TTransport {
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
checkReadBytesAvailable(len);
|
||||
int remaining = inputBuffer.remaining();
|
||||
if (remaining < len) {
|
||||
throw new TTransportException(TTransportException.END_OF_FILE,
|
||||
|
@ -30,6 +30,7 @@ import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -47,7 +48,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
|
||||
|
||||
private final SocketChannel socketChannel_;
|
||||
|
||||
public TNonblockingSocket(String host, int port) throws IOException {
|
||||
public TNonblockingSocket(String host, int port) throws IOException, TTransportException {
|
||||
this(host, port, 0);
|
||||
}
|
||||
|
||||
@ -57,7 +58,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
|
||||
* @param port
|
||||
* @throws IOException
|
||||
*/
|
||||
public TNonblockingSocket(String host, int port, int timeout) throws IOException {
|
||||
public TNonblockingSocket(String host, int port, int timeout) throws IOException, TTransportException {
|
||||
this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
|
||||
}
|
||||
|
||||
@ -67,13 +68,19 @@ public class TNonblockingSocket extends TNonblockingTransport {
|
||||
* @param socketChannel Already created SocketChannel object
|
||||
* @throws IOException if there is an error setting up the streams
|
||||
*/
|
||||
public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
|
||||
public TNonblockingSocket(SocketChannel socketChannel) throws IOException, TTransportException {
|
||||
this(socketChannel, 0, null);
|
||||
if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
|
||||
}
|
||||
|
||||
private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
|
||||
throws IOException {
|
||||
throws IOException, TTransportException {
|
||||
this(new TConfiguration(), socketChannel, timeout, socketAddress);
|
||||
}
|
||||
|
||||
private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
|
||||
throws IOException, TTransportException {
|
||||
super(config);
|
||||
socketChannel_ = socketChannel;
|
||||
socketAddress_ = socketAddress;
|
||||
|
||||
|
@ -19,13 +19,19 @@
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
|
||||
public abstract class TNonblockingTransport extends TTransport {
|
||||
public abstract class TNonblockingTransport extends TEndpointTransport {
|
||||
|
||||
public TNonblockingTransport(TConfiguration config) throws TTransportException {
|
||||
super(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Non-blocking connection initialization.
|
||||
|
@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
|
||||
* TSocket and TServerSocket
|
||||
*/
|
||||
public class TSSLTransportFactory {
|
||||
|
||||
|
||||
private static final Logger LOGGER =
|
||||
LoggerFactory.getLogger(TSSLTransportFactory.class);
|
||||
|
||||
@ -350,7 +350,7 @@ public class TSSLTransportFactory {
|
||||
}
|
||||
isKeyStoreSet = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the keystore, password, certificate type and the store type
|
||||
*
|
||||
@ -363,7 +363,7 @@ public class TSSLTransportFactory {
|
||||
this.keyStoreStream = keyStoreStream;
|
||||
setKeyStore("", keyPass, keyManagerType, keyStoreType);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the keystore and password
|
||||
*
|
||||
@ -373,7 +373,7 @@ public class TSSLTransportFactory {
|
||||
public void setKeyStore(String keyStore, String keyPass) {
|
||||
setKeyStore(keyStore, keyPass, null, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the keystore and password
|
||||
*
|
||||
@ -383,7 +383,7 @@ public class TSSLTransportFactory {
|
||||
public void setKeyStore(InputStream keyStoreStream, String keyPass) {
|
||||
setKeyStore(keyStoreStream, keyPass, null, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the truststore, password, certificate type and the store type
|
||||
*
|
||||
@ -403,7 +403,7 @@ public class TSSLTransportFactory {
|
||||
}
|
||||
isTrustStoreSet = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the truststore, password, certificate type and the store type
|
||||
*
|
||||
@ -426,7 +426,7 @@ public class TSSLTransportFactory {
|
||||
public void setTrustStore(String trustStore, String trustPass) {
|
||||
setTrustStore(trustStore, trustPass, null, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the truststore and password
|
||||
*
|
||||
|
@ -47,14 +47,14 @@ public class TSaslClientTransport extends TSaslTransport {
|
||||
|
||||
/**
|
||||
* Uses the given <code>SaslClient</code>.
|
||||
*
|
||||
*
|
||||
* @param saslClient
|
||||
* The <code>SaslClient</code> to use for the subsequent SASL
|
||||
* negotiation.
|
||||
* @param transport
|
||||
* Transport underlying this one.
|
||||
*/
|
||||
public TSaslClientTransport(SaslClient saslClient, TTransport transport) {
|
||||
public TSaslClientTransport(SaslClient saslClient, TTransport transport) throws TTransportException {
|
||||
super(saslClient, transport);
|
||||
mechanism = saslClient.getMechanismName();
|
||||
}
|
||||
@ -63,14 +63,14 @@ public class TSaslClientTransport extends TSaslTransport {
|
||||
* Creates a <code>SaslClient</code> using the given SASL-specific parameters.
|
||||
* See the Java documentation for <code>Sasl.createSaslClient</code> for the
|
||||
* details of the parameters.
|
||||
*
|
||||
*
|
||||
* @param transport
|
||||
* The underlying Thrift transport.
|
||||
* @throws SaslException
|
||||
*/
|
||||
public TSaslClientTransport(String mechanism, String authorizationId, String protocol,
|
||||
String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport)
|
||||
throws SaslException {
|
||||
throws SaslException, TTransportException {
|
||||
super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName,
|
||||
props, cbh), transport);
|
||||
this.mechanism = mechanism;
|
||||
|
@ -58,7 +58,7 @@ public class TSaslServerTransport extends TSaslTransport {
|
||||
* @param transport
|
||||
* Transport underlying this one.
|
||||
*/
|
||||
public TSaslServerTransport(TTransport transport) {
|
||||
public TSaslServerTransport(TTransport transport) throws TTransportException {
|
||||
super(transport);
|
||||
}
|
||||
|
||||
@ -71,12 +71,12 @@ public class TSaslServerTransport extends TSaslTransport {
|
||||
* The underlying Thrift transport.
|
||||
*/
|
||||
public TSaslServerTransport(String mechanism, String protocol, String serverName,
|
||||
Map<String, String> props, CallbackHandler cbh, TTransport transport) {
|
||||
Map<String, String> props, CallbackHandler cbh, TTransport transport) throws TTransportException {
|
||||
super(transport);
|
||||
addServerDefinition(mechanism, protocol, serverName, props, cbh);
|
||||
}
|
||||
|
||||
private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) {
|
||||
private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) throws TTransportException {
|
||||
super(transport);
|
||||
this.serverDefinitionMap.putAll(serverDefinitionMap);
|
||||
}
|
||||
@ -190,7 +190,7 @@ public class TSaslServerTransport extends TSaslTransport {
|
||||
* receives the same <code>TSaslServerTransport</code>.
|
||||
*/
|
||||
@Override
|
||||
public TTransport getTransport(TTransport base) {
|
||||
public TTransport getTransport(TTransport base) throws TTransportException {
|
||||
WeakReference<TSaslServerTransport> ret = transportMap.get(base);
|
||||
if (ret == null || ret.get() == null) {
|
||||
LOGGER.debug("transport map does not contain key", base);
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslClient;
|
||||
@ -28,6 +29,8 @@ import javax.security.sasl.SaslServer;
|
||||
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.sasl.NegotiationStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -36,7 +39,7 @@ import org.slf4j.LoggerFactory;
|
||||
* A superclass for SASL client/server thrift transports. A subclass need only
|
||||
* implement the <code>open</code> method.
|
||||
*/
|
||||
abstract class TSaslTransport extends TTransport {
|
||||
abstract class TSaslTransport extends TEndpointTransport {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class);
|
||||
|
||||
@ -83,7 +86,8 @@ abstract class TSaslTransport extends TTransport {
|
||||
* @param underlyingTransport
|
||||
* The thrift transport which this transport is wrapping.
|
||||
*/
|
||||
protected TSaslTransport(TTransport underlyingTransport) {
|
||||
protected TSaslTransport(TTransport underlyingTransport) throws TTransportException {
|
||||
super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration());
|
||||
this.underlyingTransport = underlyingTransport;
|
||||
}
|
||||
|
||||
@ -96,7 +100,8 @@ abstract class TSaslTransport extends TTransport {
|
||||
* @param underlyingTransport
|
||||
* The thrift transport which this transport is wrapping.
|
||||
*/
|
||||
protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) {
|
||||
protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) throws TTransportException {
|
||||
super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration());
|
||||
sasl = new SaslParticipant(saslClient);
|
||||
this.underlyingTransport = underlyingTransport;
|
||||
}
|
||||
@ -151,7 +156,7 @@ abstract class TSaslTransport extends TTransport {
|
||||
}
|
||||
|
||||
int payloadBytes = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES);
|
||||
if (payloadBytes < 0 || payloadBytes > 104857600 /* 100 MB */) {
|
||||
if (payloadBytes < 0 || payloadBytes > getConfiguration().getMaxMessageSize() /* 100 MB */) {
|
||||
throw sendAndThrowMessage(
|
||||
NegotiationStatus.ERROR, "Invalid payload header length: " + payloadBytes);
|
||||
}
|
||||
|
@ -18,6 +18,8 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
@ -25,26 +27,43 @@ import java.io.RandomAccessFile;
|
||||
/**
|
||||
* Basic file support for the TTransport interface
|
||||
*/
|
||||
public final class TSimpleFileTransport extends TTransport {
|
||||
public final class TSimpleFileTransport extends TEndpointTransport {
|
||||
|
||||
private RandomAccessFile file = null;
|
||||
private boolean readable;
|
||||
private boolean writable;
|
||||
private String path_;
|
||||
private RandomAccessFile file = null;
|
||||
private boolean readable;
|
||||
private boolean writable;
|
||||
private String path_;
|
||||
|
||||
|
||||
/**
|
||||
* Create a transport backed by a simple file
|
||||
*
|
||||
* Create a transport backed by a simple file
|
||||
*
|
||||
* @param path the path to the file to open/create
|
||||
* @param read true to support read operations
|
||||
* @param write true to support write operations
|
||||
* @param openFile true to open the file on construction
|
||||
* @throws TTransportException if file open fails
|
||||
*/
|
||||
public TSimpleFileTransport(String path, boolean read,
|
||||
public TSimpleFileTransport(String path, boolean read,
|
||||
boolean write, boolean openFile)
|
||||
throws TTransportException {
|
||||
this(new TConfiguration(), path, read, write, openFile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a transport backed by a simple file
|
||||
*
|
||||
* @param config
|
||||
* @param path the path to the file to open/create
|
||||
* @param read true to support read operations
|
||||
* @param write true to support write operations
|
||||
* @param openFile true to open the file on construction
|
||||
* @throws TTransportException if file open fails
|
||||
*/
|
||||
public TSimpleFileTransport(TConfiguration config, String path, boolean read,
|
||||
boolean write, boolean openFile)
|
||||
throws TTransportException {
|
||||
super(config);
|
||||
if (path.length() <= 0) {
|
||||
throw new TTransportException("No path specified");
|
||||
}
|
||||
@ -58,11 +77,11 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
open();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a transport backed by a simple file
|
||||
* Create a transport backed by a simple file
|
||||
* Implicitly opens file to conform to C++ behavior.
|
||||
*
|
||||
*
|
||||
* @param path the path to the file to open/create
|
||||
* @param read true to support read operations
|
||||
* @param write true to support write operations
|
||||
@ -72,7 +91,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
throws TTransportException {
|
||||
this(path, read, write, true);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a transport backed by a simple read only disk file (implicitly opens
|
||||
* file)
|
||||
@ -95,7 +114,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
/**
|
||||
* Open file if not previously opened.
|
||||
* Open file if not previously opened.
|
||||
*
|
||||
* @throws TTransportException if open fails
|
||||
*/
|
||||
@ -111,7 +130,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
} catch (IOException ioe) {
|
||||
file = null;
|
||||
throw new TTransportException(ioe.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,7 +150,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
/**
|
||||
* Read up to len many bytes into buf at offset
|
||||
* Read up to len many bytes into buf at offset
|
||||
*
|
||||
* @param buf houses bytes read
|
||||
* @param off offset into buff to begin writing to
|
||||
@ -144,6 +163,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
if (!readable) {
|
||||
throw new TTransportException("Read operation on write only file");
|
||||
}
|
||||
checkReadBytesAvailable(len);
|
||||
int iBytesRead = 0;
|
||||
try {
|
||||
iBytesRead = file.read(buf, off, len);
|
||||
@ -155,7 +175,7 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
}
|
||||
|
||||
/**
|
||||
* Write len many bytes from buff starting at offset
|
||||
* Write len many bytes from buff starting at offset
|
||||
*
|
||||
* @param buf buffer containing bytes to write
|
||||
* @param off offset into buffer to begin writing from
|
||||
@ -213,4 +233,4 @@ public final class TSimpleFileTransport extends TTransport {
|
||||
throw new TTransportException(ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -69,6 +70,7 @@ public class TSocket extends TIOStreamTransport {
|
||||
* @throws TTransportException if there is an error setting up the streams
|
||||
*/
|
||||
public TSocket(Socket socket) throws TTransportException {
|
||||
super(new TConfiguration());
|
||||
socket_ = socket;
|
||||
try {
|
||||
socket_.setSoLinger(false, 0);
|
||||
@ -93,23 +95,36 @@ public class TSocket extends TIOStreamTransport {
|
||||
* Creates a new unconnected socket that will connect to the given host
|
||||
* on the given port.
|
||||
*
|
||||
* @param config check config
|
||||
* @param host Remote host
|
||||
* @param port Remote port
|
||||
*/
|
||||
public TSocket(String host, int port) {
|
||||
this(host, port, 0);
|
||||
public TSocket(TConfiguration config, String host, int port) throws TTransportException {
|
||||
this(config, host, port, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new unconnected socket that will connect to the given host
|
||||
* on the given port.
|
||||
*
|
||||
* @param host Remote host
|
||||
* @param port Remote port
|
||||
*/
|
||||
public TSocket(String host, int port) throws TTransportException {
|
||||
this(new TConfiguration(), host, port, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new unconnected socket that will connect to the given host
|
||||
* on the given port.
|
||||
*
|
||||
* @param config check config
|
||||
* @param host Remote host
|
||||
* @param port Remote port
|
||||
* @param timeout Socket timeout and connection timeout
|
||||
*/
|
||||
public TSocket(String host, int port, int timeout) {
|
||||
this(host, port, timeout, timeout);
|
||||
public TSocket(TConfiguration config, String host, int port, int timeout) throws TTransportException {
|
||||
this(config, host, port, timeout, timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,12 +132,14 @@ public class TSocket extends TIOStreamTransport {
|
||||
* on the given port, with a specific connection timeout and a
|
||||
* specific socket timeout.
|
||||
*
|
||||
* @param config check config
|
||||
* @param host Remote host
|
||||
* @param port Remote port
|
||||
* @param socketTimeout Socket timeout
|
||||
* @param connectTimeout Connection timeout
|
||||
*/
|
||||
public TSocket(String host, int port, int socketTimeout, int connectTimeout) {
|
||||
public TSocket(TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) throws TTransportException {
|
||||
super(config);
|
||||
host_ = host;
|
||||
port_ = port;
|
||||
socketTimeout_ = socketTimeout;
|
||||
|
@ -19,6 +19,8 @@
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
@ -160,4 +162,10 @@ public abstract class TTransport implements Closeable {
|
||||
* @param len
|
||||
*/
|
||||
public void consumeBuffer(int len) {}
|
||||
|
||||
public abstract TConfiguration getConfiguration();
|
||||
|
||||
public abstract void updateKnownMessageSize(long size) throws TTransportException;
|
||||
|
||||
public abstract void checkReadBytesAvailable(long numBytes) throws TTransportException;
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class TTransportFactory {
|
||||
* @param trans The base transport
|
||||
* @return Wrapped Transport
|
||||
*/
|
||||
public TTransport getTransport(TTransport trans) {
|
||||
public TTransport getTransport(TTransport trans) throws TTransportException {
|
||||
return trans;
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,12 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Objects;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.Inflater;
|
||||
@ -38,7 +41,7 @@ public class TZlibTransport extends TIOStreamTransport {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTransport getTransport(TTransport base) {
|
||||
public TTransport getTransport(TTransport base) throws TTransportException {
|
||||
return new TZlibTransport(base);
|
||||
}
|
||||
}
|
||||
@ -47,7 +50,7 @@ public class TZlibTransport extends TIOStreamTransport {
|
||||
* Constructs a new TZlibTransport instance.
|
||||
* @param transport the underlying transport to read from and write to
|
||||
*/
|
||||
public TZlibTransport(TTransport transport) {
|
||||
public TZlibTransport(TTransport transport) throws TTransportException {
|
||||
this(transport, Deflater.BEST_COMPRESSION);
|
||||
}
|
||||
|
||||
@ -56,7 +59,8 @@ public class TZlibTransport extends TIOStreamTransport {
|
||||
* @param transport the underlying transport to read from and write to
|
||||
* @param compressionLevel 0 for no compression, 9 for maximum compression
|
||||
*/
|
||||
public TZlibTransport(TTransport transport, int compressionLevel) {
|
||||
public TZlibTransport(TTransport transport, int compressionLevel) throws TTransportException {
|
||||
super(Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration());
|
||||
transport_ = transport;
|
||||
inputStream_ = new InflaterInputStream(new TTransportInputStream(transport_), new Inflater());
|
||||
outputStream_ = new DeflaterOutputStream(new TTransportOutputStream(transport_), new Deflater(compressionLevel, false), true);
|
||||
|
@ -16,7 +16,13 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
package org.apache.thrift.transport.layered;
|
||||
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.transport.*;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* This transport is wire compatible with {@link TFramedTransport}, but makes
|
||||
@ -27,18 +33,18 @@ package org.apache.thrift.transport;
|
||||
*
|
||||
* This implementation is NOT threadsafe.
|
||||
*/
|
||||
public class TFastFramedTransport extends TTransport {
|
||||
public class TFastFramedTransport extends TLayeredTransport {
|
||||
|
||||
public static class Factory extends TTransportFactory {
|
||||
private final int initialCapacity;
|
||||
private final int maxLength;
|
||||
|
||||
public Factory() {
|
||||
this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
|
||||
this(DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
|
||||
}
|
||||
|
||||
public Factory(int initialCapacity) {
|
||||
this(initialCapacity, DEFAULT_MAX_LENGTH);
|
||||
this(initialCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
|
||||
}
|
||||
|
||||
public Factory(int initialCapacity, int maxLength) {
|
||||
@ -47,7 +53,7 @@ public class TFastFramedTransport extends TTransport {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTransport getTransport(TTransport trans) {
|
||||
public TTransport getTransport(TTransport trans) throws TTransportException {
|
||||
return new TFastFramedTransport(trans,
|
||||
initialCapacity,
|
||||
maxLength);
|
||||
@ -58,12 +64,7 @@ public class TFastFramedTransport extends TTransport {
|
||||
* How big should the default read and write buffers be?
|
||||
*/
|
||||
public static final int DEFAULT_BUF_CAPACITY = 1024;
|
||||
/**
|
||||
* How big is the largest allowable frame? Defaults to 16MB.
|
||||
*/
|
||||
public static final int DEFAULT_MAX_LENGTH = 16384000;
|
||||
|
||||
private final TTransport underlying;
|
||||
private final AutoExpandingBufferWriteTransport writeBuffer;
|
||||
private AutoExpandingBufferReadTransport readBuffer;
|
||||
private final int initialBufferCapacity;
|
||||
@ -75,8 +76,8 @@ public class TFastFramedTransport extends TTransport {
|
||||
* for initial buffer size and max frame length.
|
||||
* @param underlying Transport that real reads and writes will go through to.
|
||||
*/
|
||||
public TFastFramedTransport(TTransport underlying) {
|
||||
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
|
||||
public TFastFramedTransport(TTransport underlying) throws TTransportException {
|
||||
this(underlying, DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -87,8 +88,8 @@ public class TFastFramedTransport extends TTransport {
|
||||
* In practice, it's not critical to set this unless you know in advance that
|
||||
* your messages are going to be very large.
|
||||
*/
|
||||
public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) {
|
||||
this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH);
|
||||
public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) throws TTransportException {
|
||||
this(underlying, initialBufferCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -102,27 +103,29 @@ public class TFastFramedTransport extends TTransport {
|
||||
* @param maxLength The max frame size you are willing to read. You can use
|
||||
* this parameter to limit how much memory can be allocated.
|
||||
*/
|
||||
public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
|
||||
this.underlying = underlying;
|
||||
public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) throws TTransportException {
|
||||
super(underlying);
|
||||
TConfiguration config = Objects.isNull(underlying.getConfiguration()) ? new TConfiguration() : underlying.getConfiguration();
|
||||
this.maxLength = maxLength;
|
||||
config.setMaxFrameSize(maxLength);
|
||||
this.initialBufferCapacity = initialBufferCapacity;
|
||||
readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
|
||||
writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4);
|
||||
readBuffer = new AutoExpandingBufferReadTransport(config, initialBufferCapacity);
|
||||
writeBuffer = new AutoExpandingBufferWriteTransport(config, initialBufferCapacity, 4);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
underlying.close();
|
||||
getInnerTransport().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return underlying.isOpen();
|
||||
return getInnerTransport().isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws TTransportException {
|
||||
underlying.open();
|
||||
getInnerTransport().open();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -139,7 +142,7 @@ public class TFastFramedTransport extends TTransport {
|
||||
}
|
||||
|
||||
private void readFrame() throws TTransportException {
|
||||
underlying.readAll(i32buf , 0, 4);
|
||||
getInnerTransport().readAll(i32buf , 0, 4);
|
||||
int size = TFramedTransport.decodeFrameSize(i32buf);
|
||||
|
||||
if (size < 0) {
|
||||
@ -147,13 +150,13 @@ public class TFastFramedTransport extends TTransport {
|
||||
throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
|
||||
}
|
||||
|
||||
if (size > maxLength) {
|
||||
if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) {
|
||||
close();
|
||||
throw new TTransportException(TTransportException.CORRUPTED_DATA,
|
||||
"Frame size (" + size + ") larger than max length (" + maxLength + ")!");
|
||||
}
|
||||
|
||||
readBuffer.fill(underlying, size);
|
||||
readBuffer.fill(getInnerTransport(), size);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -169,18 +172,18 @@ public class TFastFramedTransport extends TTransport {
|
||||
/**
|
||||
* Only clears the read buffer!
|
||||
*/
|
||||
public void clear() {
|
||||
readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
|
||||
public void clear() throws TTransportException {
|
||||
readBuffer = new AutoExpandingBufferReadTransport(getConfiguration(), initialBufferCapacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws TTransportException {
|
||||
int payloadLength = writeBuffer.getLength() - 4;
|
||||
int payloadLength = writeBuffer.getLength() - 4;
|
||||
byte[] data = writeBuffer.getBuf().array();
|
||||
TFramedTransport.encodeFrameSize(payloadLength, data);
|
||||
underlying.write(data, 0, payloadLength + 4);
|
||||
getInnerTransport().write(data, 0, payloadLength + 4);
|
||||
writeBuffer.reset();
|
||||
underlying.flush();
|
||||
getInnerTransport().flush();
|
||||
}
|
||||
|
||||
@Override
|
@ -17,24 +17,22 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
package org.apache.thrift.transport.layered;
|
||||
|
||||
import org.apache.thrift.TByteArrayOutputStream;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.transport.TMemoryInputTransport;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.thrift.transport.TTransportFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* TFramedTransport is a buffered TTransport that ensures a fully read message
|
||||
* every time by preceding messages with a 4-byte frame size.
|
||||
*/
|
||||
public class TFramedTransport extends TTransport {
|
||||
|
||||
protected static final int DEFAULT_MAX_LENGTH = 16384000;
|
||||
|
||||
private int maxLength_;
|
||||
|
||||
/**
|
||||
* Underlying transport
|
||||
*/
|
||||
private TTransport transport_ = null;
|
||||
public class TFramedTransport extends TLayeredTransport {
|
||||
|
||||
/**
|
||||
* Buffer for output
|
||||
@ -45,14 +43,13 @@ public class TFramedTransport extends TTransport {
|
||||
/**
|
||||
* Buffer for input
|
||||
*/
|
||||
private final TMemoryInputTransport readBuffer_ =
|
||||
new TMemoryInputTransport(new byte[0]);
|
||||
private final TMemoryInputTransport readBuffer_;
|
||||
|
||||
public static class Factory extends TTransportFactory {
|
||||
private int maxLength_;
|
||||
|
||||
public Factory() {
|
||||
maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
|
||||
maxLength_ = TConfiguration.DEFAULT_MAX_FRAME_SIZE;
|
||||
}
|
||||
|
||||
public Factory(int maxLength) {
|
||||
@ -60,7 +57,7 @@ public class TFramedTransport extends TTransport {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTransport getTransport(TTransport base) {
|
||||
public TTransport getTransport(TTransport base) throws TTransportException {
|
||||
return new TFramedTransport(base, maxLength_);
|
||||
}
|
||||
}
|
||||
@ -75,28 +72,28 @@ public class TFramedTransport extends TTransport {
|
||||
/**
|
||||
* Constructor wraps around another transport
|
||||
*/
|
||||
public TFramedTransport(TTransport transport, int maxLength) {
|
||||
transport_ = transport;
|
||||
maxLength_ = maxLength;
|
||||
public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
|
||||
super(transport);
|
||||
TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration();
|
||||
_configuration.setMaxFrameSize(maxLength);
|
||||
writeBuffer_.write(sizeFiller_, 0, 4);
|
||||
readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]);
|
||||
}
|
||||
|
||||
public TFramedTransport(TTransport transport) {
|
||||
transport_ = transport;
|
||||
maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
|
||||
writeBuffer_.write(sizeFiller_, 0, 4);
|
||||
public TFramedTransport(TTransport transport) throws TTransportException {
|
||||
this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
|
||||
}
|
||||
|
||||
public void open() throws TTransportException {
|
||||
transport_.open();
|
||||
getInnerTransport().open();
|
||||
}
|
||||
|
||||
public boolean isOpen() {
|
||||
return transport_.isOpen();
|
||||
return getInnerTransport().isOpen();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
transport_.close();
|
||||
getInnerTransport().close();
|
||||
}
|
||||
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
@ -138,7 +135,7 @@ public class TFramedTransport extends TTransport {
|
||||
private final byte[] i32buf = new byte[4];
|
||||
|
||||
private void readFrame() throws TTransportException {
|
||||
transport_.readAll(i32buf, 0, 4);
|
||||
getInnerTransport().readAll(i32buf, 0, 4);
|
||||
int size = decodeFrameSize(i32buf);
|
||||
|
||||
if (size < 0) {
|
||||
@ -146,14 +143,14 @@ public class TFramedTransport extends TTransport {
|
||||
throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
|
||||
}
|
||||
|
||||
if (size > maxLength_) {
|
||||
if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) {
|
||||
close();
|
||||
throw new TTransportException(TTransportException.CORRUPTED_DATA,
|
||||
"Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
|
||||
"Frame size (" + size + ") larger than max length (" + getInnerTransport().getConfiguration().getMaxFrameSize() + ")!");
|
||||
}
|
||||
|
||||
byte[] buff = new byte[size];
|
||||
transport_.readAll(buff, 0, size);
|
||||
getInnerTransport().readAll(buff, 0, size);
|
||||
readBuffer_.reset(buff);
|
||||
}
|
||||
|
||||
@ -169,8 +166,8 @@ public class TFramedTransport extends TTransport {
|
||||
writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data
|
||||
|
||||
encodeFrameSize(len, buf); // this is the frame length without the filler
|
||||
transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data
|
||||
transport_.flush();
|
||||
getInnerTransport().write(buf, 0, len + 4); // we have to write the frame size and frame data
|
||||
getInnerTransport().flush();
|
||||
}
|
||||
|
||||
public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.thrift.transport.layered;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class TLayeredTransport extends TTransport{
|
||||
|
||||
private TTransport innerTransport;
|
||||
|
||||
public TConfiguration getConfiguration() {
|
||||
return innerTransport.getConfiguration();
|
||||
}
|
||||
|
||||
public TLayeredTransport(TTransport transport)
|
||||
{
|
||||
Objects.requireNonNull(transport, "TTransport cannot be null.");
|
||||
innerTransport = transport;
|
||||
}
|
||||
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
innerTransport.updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
innerTransport.checkReadBytesAvailable(numBytes);
|
||||
}
|
||||
|
||||
public TTransport getInnerTransport() {
|
||||
return innerTransport;
|
||||
}
|
||||
}
|
@ -34,8 +34,8 @@ public class TestOptionals extends TestCase {
|
||||
assertEquals(true, EncodingUtils.testBit((short)0x8, 3));
|
||||
assertEquals(false, EncodingUtils.testBit((short)0x8, 4));
|
||||
|
||||
assertEquals((short)Short.MIN_VALUE, EncodingUtils.setBit((short)0, 15, true));
|
||||
assertEquals((short)0, EncodingUtils.setBit((short)Short.MIN_VALUE, 15, false));
|
||||
assertEquals(Short.MIN_VALUE, EncodingUtils.setBit((short)0, 15, true));
|
||||
assertEquals((short)0, EncodingUtils.setBit(Short.MIN_VALUE, 15, false));
|
||||
assertEquals(true, EncodingUtils.testBit(Short.MIN_VALUE, 15));
|
||||
assertEquals(false, EncodingUtils.testBit(Short.MIN_VALUE, 14));
|
||||
}
|
||||
|
@ -35,21 +35,21 @@ public class TestReuse extends TestStruct {
|
||||
|
||||
Reuse ru1 = new Reuse();
|
||||
HashSet<String> hs1 = new HashSet<String>();
|
||||
byte[] serBytes;
|
||||
byte[] serBytes;
|
||||
String st1 = new String("string1");
|
||||
String st2 = new String("string2");
|
||||
|
||||
ru1.setVal1(11);
|
||||
ru1.setVal2(hs1);
|
||||
ru1.addToVal2(st1);
|
||||
|
||||
|
||||
serBytes = binarySerializer.serialize(ru1);
|
||||
|
||||
// update hash set after serialization
|
||||
hs1.add(st2);
|
||||
|
||||
binaryDeserializer.deserialize(ru1, serBytes);
|
||||
|
||||
|
||||
assertTrue( ru1.getVal2() == hs1 );
|
||||
assertTrue( hs1.size() == 2 );
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ import org.apache.thrift.server.THsHaServer.Args;
|
||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||
import org.apache.thrift.transport.TNonblockingSocket;
|
||||
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import thrift.test.CompactProtoTestStruct;
|
||||
import thrift.test.ExceptionWithAMap;
|
||||
import thrift.test.Srv;
|
||||
@ -233,7 +234,7 @@ public class TestTAsyncClientManager extends TestCase {
|
||||
assertEquals(numThreads * numCallsPerThread, numSuccesses);
|
||||
}
|
||||
|
||||
private Srv.AsyncClient getClient() throws IOException {
|
||||
private Srv.AsyncClient getClient() throws IOException, TTransportException {
|
||||
TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
|
||||
return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket);
|
||||
}
|
||||
|
@ -18,24 +18,26 @@
|
||||
*/
|
||||
package org.apache.thrift.protocol;
|
||||
|
||||
import java.lang.Exception;
|
||||
import java.lang.Integer;
|
||||
import java.lang.String;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.thrift.Fixtures;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
import org.apache.thrift.*;
|
||||
import org.apache.thrift.server.ServerTestBase;
|
||||
import org.apache.thrift.transport.TMemoryBuffer;
|
||||
|
||||
import thrift.test.CompactProtoTestStruct;
|
||||
import thrift.test.HolyMoley;
|
||||
import thrift.test.Nesting;
|
||||
import thrift.test.OneOfEach;
|
||||
import thrift.test.Srv;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import thrift.test.*;
|
||||
|
||||
public abstract class ProtocolTestBase extends TestCase {
|
||||
|
||||
@ -409,7 +411,7 @@ public abstract class ProtocolTestBase extends TestCase {
|
||||
}
|
||||
long serEnd = System.currentTimeMillis();
|
||||
long serElapsed = serEnd - serStart;
|
||||
System.out.println("Ser:\t" + serElapsed + "ms\t"
|
||||
System.out.println("Ser:\t" + serElapsed + "ms\t"
|
||||
+ ((double)serElapsed / NUM_REPS) + "ms per serialization");
|
||||
|
||||
HolyMoley cpts = new HolyMoley();
|
||||
@ -420,8 +422,109 @@ public abstract class ProtocolTestBase extends TestCase {
|
||||
}
|
||||
long deserEnd = System.currentTimeMillis();
|
||||
long deserElapsed = deserEnd - deserStart;
|
||||
System.out.println("Des:\t" + deserElapsed + "ms\t"
|
||||
System.out.println("Des:\t" + deserElapsed + "ms\t"
|
||||
+ ((double)deserElapsed / NUM_REPS) + "ms per deserialization");
|
||||
}
|
||||
}
|
||||
|
||||
private ServerTestBase.TestHandler testHandler = new ServerTestBase.TestHandler() {
|
||||
@Override
|
||||
public String testString(String thing) {
|
||||
thing = thing + " Apache Thrift Java " + thing;
|
||||
return thing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> testList(List<Integer> thing) {
|
||||
thing.addAll(thing);
|
||||
thing.addAll(thing);
|
||||
return thing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Integer> testSet(Set<Integer> thing) {
|
||||
thing.addAll(thing.stream().map( x -> x + 100).collect(Collectors.toSet()));
|
||||
return thing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> testStringMap(Map<String, String> thing) {
|
||||
thing.put("a", "123");
|
||||
thing.put(" x y ", " with spaces ");
|
||||
thing.put("same", "same");
|
||||
thing.put("0", "numeric key");
|
||||
thing.put("1", "");
|
||||
thing.put("ok", "2355555");
|
||||
thing.put("end", "0");
|
||||
return thing;
|
||||
}
|
||||
};
|
||||
|
||||
private TProtocol initConfig(int maxSize) throws TException{
|
||||
TConfiguration config = TConfiguration.custom().setMaxMessageSize(maxSize).build();
|
||||
TMemoryBuffer bufferTrans = new TMemoryBuffer(config, 0);
|
||||
return getFactory().getProtocol(bufferTrans);
|
||||
}
|
||||
|
||||
public void testReadCheckMaxMessageRequestForString() throws TException{
|
||||
TProtocol clientOutProto = initConfig(15);
|
||||
TProtocol clientInProto = initConfig(15);
|
||||
ThriftTest.Client testClient = new ThriftTest.Client(clientInProto, clientOutProto);
|
||||
ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
|
||||
try {
|
||||
testClient.send_testString("test");
|
||||
testProcessor.process(clientOutProto, clientInProto);
|
||||
String result = testClient.recv_testString();
|
||||
System.out.println("----result: "+result);
|
||||
} catch (TException e) {
|
||||
assertEquals("MaxMessageSize reached", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadCheckMaxMessageRequestForList() throws TException{
|
||||
TProtocol clientOutProto = initConfig(15);
|
||||
TProtocol clientInProto = initConfig(15);
|
||||
ThriftTest.Client testClient = new ThriftTest.Client(clientInProto, clientOutProto);
|
||||
ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
|
||||
try {
|
||||
testClient.send_testList(Arrays.asList(1, 23242346, 888888, 90));
|
||||
testProcessor.process(clientOutProto, clientInProto);
|
||||
testClient.recv_testList();
|
||||
fail("Limitations not achieved as expected");
|
||||
} catch (TTransportException e) {
|
||||
assertEquals("MaxMessageSize reached", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadCheckMaxMessageRequestForMap() throws TException{
|
||||
TProtocol clientOutProto = initConfig(13);
|
||||
TProtocol clientInProto = initConfig(13);
|
||||
ThriftTest.Client testClient = new ThriftTest.Client(clientInProto, clientOutProto);
|
||||
ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
|
||||
Map<String, String> thing = new HashMap<>();
|
||||
thing.put("key", "Thrift");
|
||||
try {
|
||||
testClient.send_testStringMap(thing);
|
||||
testProcessor.process(clientOutProto, clientInProto);
|
||||
testClient.recv_testStringMap();
|
||||
fail("Limitations not achieved as expected");
|
||||
} catch (TTransportException e) {
|
||||
assertEquals("MaxMessageSize reached", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadCheckMaxMessageRequestForSet() throws TException{
|
||||
TProtocol clientOutProto = initConfig(10);
|
||||
TProtocol clientInProto = initConfig(10);
|
||||
ThriftTest.Client testClient = new ThriftTest.Client(clientInProto, clientOutProto);
|
||||
ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
|
||||
try {
|
||||
testClient.send_testSet(Stream.of(234, 0, 987087, 45, 88888888, 9).collect(Collectors.toSet()));
|
||||
testProcessor.process(clientOutProto, clientInProto);
|
||||
testClient.recv_testSet();
|
||||
fail("Limitations not achieved as expected");
|
||||
} catch (TTransportException e) {
|
||||
assertEquals("MaxMessageSize reached", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.thrift.protocol;
|
||||
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
import thrift.test.Bonk;
|
||||
|
||||
public class TestTBinaryProtocol extends ProtocolTestBase {
|
||||
@Override
|
||||
protected TProtocolFactory getFactory() {
|
||||
return new TBinaryProtocol.Factory();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canBeUsedNaked() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -12,6 +12,7 @@ import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.transport.TMemoryBuffer;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import thrift.test.HolyMoley;
|
||||
import thrift.test.Nesting;
|
||||
import thrift.test.OneOfEach;
|
||||
@ -20,6 +21,9 @@ public class TestStandardScheme extends TestCase {
|
||||
TSerializer serializer = new TSerializer();
|
||||
TDeserializer deserializer = new TDeserializer();
|
||||
|
||||
public TestStandardScheme() throws TTransportException {
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests whether the Standard Scheme properly reads structs serialized
|
||||
* using an older version of thrift.
|
||||
|
@ -37,11 +37,12 @@ import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TCompactProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportFactory;
|
||||
import org.apache.thrift.transport.TFramedTransport.Factory;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport.Factory;
|
||||
|
||||
import thrift.test.Insanity;
|
||||
import thrift.test.Numberz;
|
||||
@ -203,7 +204,7 @@ public abstract class ServerTestBase extends TestCase {
|
||||
System.out.print("testInsanity()\n");
|
||||
|
||||
HashMap<Numberz,Insanity> first_map = new HashMap<Numberz, Insanity>();
|
||||
HashMap<Numberz,Insanity> second_map = new HashMap<Numberz, Insanity>();;
|
||||
HashMap<Numberz,Insanity> second_map = new HashMap<Numberz, Insanity>();
|
||||
|
||||
first_map.put(Numberz.TWO, argument);
|
||||
first_map.put(Numberz.THREE, argument);
|
||||
@ -222,7 +223,7 @@ public abstract class ServerTestBase extends TestCase {
|
||||
public Xtruct testMulti(byte arg0, int arg1, long arg2, Map<Short,String> arg3, Numberz arg4, long arg5) {
|
||||
System.out.print("testMulti()\n");
|
||||
|
||||
Xtruct hello = new Xtruct();;
|
||||
Xtruct hello = new Xtruct();
|
||||
hello.string_thing = "Hello2";
|
||||
hello.byte_thing = arg0;
|
||||
hello.i32_thing = arg1;
|
||||
@ -268,7 +269,7 @@ public abstract class ServerTestBase extends TestCase {
|
||||
}
|
||||
|
||||
public void testOneway(int sleepFor) {
|
||||
System.out.println("testOneway(" + Integer.toString(sleepFor) +
|
||||
System.out.println("testOneway(" + sleepFor +
|
||||
") => sleeping...");
|
||||
try {
|
||||
Thread.sleep(sleepFor * SLEEP_DELAY);
|
||||
@ -533,7 +534,7 @@ public abstract class ServerTestBase extends TestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTransport getTransport(TTransport trans) {
|
||||
public TTransport getTransport(TTransport trans) throws TTransportException {
|
||||
count++;
|
||||
return factory.getTransport(trans);
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import org.apache.thrift.TProcessor;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.server.TNonblockingServer.Args;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
|
@ -22,6 +22,7 @@ package org.apache.thrift.test;
|
||||
|
||||
import org.apache.thrift.Fixtures;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
@ -34,34 +35,37 @@ import thrift.test.OneOfEach;
|
||||
|
||||
public class SerializationBenchmark {
|
||||
private final static int HOW_MANY = 10000000;
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TProtocolFactory factory = new TBinaryProtocol.Factory();
|
||||
|
||||
testSerialization(factory, Fixtures.oneOfEach);
|
||||
testDeserialization(factory, Fixtures.oneOfEach, OneOfEach.class);
|
||||
}
|
||||
|
||||
|
||||
public static void testSerialization(TProtocolFactory factory, TBase object) throws Exception {
|
||||
TTransport trans = new TTransport() {
|
||||
public void write(byte[] bin, int x, int y) throws TTransportException {}
|
||||
public TConfiguration getConfiguration() {return new TConfiguration(); }
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {}
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
|
||||
public int read(byte[] bin, int x, int y) throws TTransportException {return 0;}
|
||||
public void close() {}
|
||||
public void open() {}
|
||||
public boolean isOpen() {return true;}
|
||||
};
|
||||
|
||||
|
||||
TProtocol proto = factory.getProtocol(trans);
|
||||
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < HOW_MANY; i++) {
|
||||
object.write(proto);
|
||||
}
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
|
||||
System.out.println("Serialization test time: " + (endTime - startTime) + " ms");
|
||||
}
|
||||
|
||||
|
||||
public static <T extends TBase> void testDeserialization(TProtocolFactory factory, T object, Class<T> klass) throws Exception {
|
||||
TMemoryBuffer buf = new TMemoryBuffer(0);
|
||||
object.write(factory.getProtocol(buf));
|
||||
@ -71,10 +75,10 @@ public class SerializationBenchmark {
|
||||
long startTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < HOW_MANY; i++) {
|
||||
T o2 = klass.newInstance();
|
||||
o2.read(factory.getProtocol(new TMemoryInputTransport(serialized)));
|
||||
o2.read(factory.getProtocol(new TMemoryInputTransport(new TConfiguration(), serialized)));
|
||||
}
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Deserialization test time: " + (endTime - startTime) + " ms");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -37,8 +37,8 @@ import org.apache.thrift.protocol.TJSONProtocol;
|
||||
import org.apache.thrift.protocol.TMultiplexedProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TSimpleJSONProtocol;
|
||||
import org.apache.thrift.transport.TFastFramedTransport;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFastFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.THttpClient;
|
||||
import org.apache.thrift.transport.TSSLTransportFactory;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
@ -106,7 +106,7 @@ public class TestClient {
|
||||
} else if (args[i].equals("--zlib")) {
|
||||
zlib = true;
|
||||
} else if (args[i].equals("--client")) {
|
||||
http_client = true;
|
||||
http_client = true;
|
||||
} else if (args[i].equals("--help")) {
|
||||
System.out.println("Allowed options:");
|
||||
System.out.println(" --help\t\t\tProduce help message");
|
||||
@ -157,7 +157,7 @@ public class TestClient {
|
||||
if (transport_type.equals("http")) {
|
||||
String url = "http://" + host + ":" + port + "/test/service";
|
||||
if (http_client == true) {
|
||||
|
||||
|
||||
transport = new THttpClient(url, HttpClients.createDefault());
|
||||
} else {
|
||||
transport = new THttpClient(url);
|
||||
@ -775,7 +775,7 @@ public class TestClient {
|
||||
long onewayElapsedMillis = (System.nanoTime() - startOneway) / 1000000;
|
||||
if (onewayElapsedMillis > 200) {
|
||||
System.out.println("Oneway test took too long to execute failed: took " +
|
||||
Long.toString(onewayElapsedMillis) +
|
||||
onewayElapsedMillis +
|
||||
"ms");
|
||||
System.out.println("oneway calls are 'fire and forget' and therefore should not cause blocking.");
|
||||
System.out.println("Some transports (HTTP) have a required response, and typically this failure");
|
||||
@ -786,7 +786,7 @@ public class TestClient {
|
||||
returnCode |= ERR_BASETYPES;
|
||||
} else {
|
||||
System.out.println("Success - fire and forget only took " +
|
||||
Long.toString(onewayElapsedMillis) +
|
||||
onewayElapsedMillis +
|
||||
"ms");
|
||||
}
|
||||
|
||||
|
@ -19,29 +19,21 @@
|
||||
|
||||
package org.apache.thrift.test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TCompactProtocol;
|
||||
import org.apache.thrift.protocol.TJSONProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.protocol.TMultiplexedProtocol;
|
||||
import org.apache.thrift.server.ServerContext;
|
||||
import org.apache.thrift.server.TServer;
|
||||
import org.apache.thrift.server.TServer.Args;
|
||||
import org.apache.thrift.server.TSimpleServer;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.server.ServerTestBase.TestHandler;
|
||||
import org.apache.thrift.server.TServerEventHandler;
|
||||
import org.apache.thrift.server.TThreadedSelectorServer;
|
||||
import org.apache.thrift.server.TNonblockingServer;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.TFastFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFastFramedTransport;
|
||||
import org.apache.thrift.transport.TZlibTransport;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TSSLTransportFactory;
|
||||
@ -50,14 +42,8 @@ import org.apache.thrift.transport.TTransportFactory;
|
||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||
import org.apache.thrift.TMultiplexedProcessor;
|
||||
|
||||
import thrift.test.Insanity;
|
||||
import thrift.test.Numberz;
|
||||
import thrift.test.SecondService;
|
||||
import thrift.test.ThriftTest;
|
||||
import thrift.test.Xception;
|
||||
import thrift.test.Xception2;
|
||||
import thrift.test.Xtruct;
|
||||
import thrift.test.Xtruct2;
|
||||
|
||||
public class TestServer {
|
||||
|
||||
|
@ -19,6 +19,8 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
public class ReadCountingTransport extends TTransport {
|
||||
public int readCount = 0;
|
||||
private TTransport trans;
|
||||
@ -58,4 +60,19 @@ public class ReadCountingTransport extends TTransport {
|
||||
throw new TTransportException(TTransportException.NOT_OPEN, "Transport is closed");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TConfiguration getConfiguration() {
|
||||
return trans.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
trans.updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
trans.checkReadBytesAvailable(numBytes);
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.apache.thrift.transport;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
public class TestAutoExpandingBufferReadTransport extends TestCase {
|
||||
private static final byte[] HUNDRED_BYTES = new byte[100];
|
||||
@ -32,9 +33,9 @@ public class TestAutoExpandingBufferReadTransport extends TestCase {
|
||||
}
|
||||
|
||||
public void testIt() throws Exception {
|
||||
AutoExpandingBufferReadTransport t = new AutoExpandingBufferReadTransport(150);
|
||||
AutoExpandingBufferReadTransport t = new AutoExpandingBufferReadTransport(new TConfiguration(), 150);
|
||||
|
||||
TMemoryInputTransport membuf = new TMemoryInputTransport(HUNDRED_BYTES);
|
||||
TMemoryInputTransport membuf = new TMemoryInputTransport(new TConfiguration(), HUNDRED_BYTES);
|
||||
|
||||
t.fill(membuf, 100);
|
||||
assertEquals(100, t.getBytesRemainingInBuffer());
|
||||
|
@ -19,14 +19,18 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestAutoExpandingBufferWriteTransport {
|
||||
|
||||
private TConfiguration config = new TConfiguration();
|
||||
|
||||
@Test
|
||||
public void testIt() throws Exception {
|
||||
AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 0);
|
||||
AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(config, 1, 0);
|
||||
assertEquals(0, t.getLength());
|
||||
assertEquals(1, t.getBuf().array().length);
|
||||
byte[] b1 = new byte[]{1,2,3};
|
||||
@ -43,7 +47,7 @@ public class TestAutoExpandingBufferWriteTransport {
|
||||
assertEquals(2, t.getLength());
|
||||
assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2));
|
||||
|
||||
AutoExpandingBufferWriteTransport uut = new AutoExpandingBufferWriteTransport(8, 4);
|
||||
AutoExpandingBufferWriteTransport uut = new AutoExpandingBufferWriteTransport(config, 8, 4);
|
||||
assertEquals(4, uut.getLength());
|
||||
assertEquals(8, uut.getBuf().array().length);
|
||||
uut.write(b1);
|
||||
@ -53,17 +57,17 @@ public class TestAutoExpandingBufferWriteTransport {
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testBadInitialSize() throws IllegalArgumentException {
|
||||
new AutoExpandingBufferWriteTransport(0, 0);
|
||||
public void testBadInitialSize() throws IllegalArgumentException, TTransportException {
|
||||
new AutoExpandingBufferWriteTransport(config, 0, 0);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testBadFrontReserveSize() throws IllegalArgumentException {
|
||||
new AutoExpandingBufferWriteTransport(4, -1);
|
||||
public void testBadFrontReserveSize() throws IllegalArgumentException, TTransportException {
|
||||
new AutoExpandingBufferWriteTransport(config, 4, -1);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testTooSmallFrontReserveSize() throws IllegalArgumentException {
|
||||
new AutoExpandingBufferWriteTransport(4, 5);
|
||||
public void testTooSmallFrontReserveSize() throws IllegalArgumentException, TTransportException {
|
||||
new AutoExpandingBufferWriteTransport(config, 4, 5);
|
||||
}
|
||||
}
|
||||
|
@ -18,16 +18,18 @@
|
||||
*/
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.transport.layered.TFastFramedTransport;
|
||||
|
||||
public class TestTFastFramedTransport extends TestTFramedTransport {
|
||||
protected final static int INITIAL_CAPACITY = 50;
|
||||
|
||||
@Override
|
||||
protected TTransport getTransport(TTransport underlying) {
|
||||
protected TTransport getTransport(TTransport underlying) throws TTransportException {
|
||||
return new TFastFramedTransport(underlying, INITIAL_CAPACITY, 10 * 1024 * 1024);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TTransport getTransport(TTransport underlying, int maxLength) {
|
||||
protected TTransport getTransport(TTransport underlying, int maxLength) throws TTransportException {
|
||||
return new TFastFramedTransport(underlying, INITIAL_CAPACITY, maxLength);
|
||||
}
|
||||
}
|
||||
|
@ -27,14 +27,16 @@ import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.thrift.transport.layered.TFastFramedTransport;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
|
||||
public class TestTFramedTransport extends TestCase {
|
||||
|
||||
protected TTransport getTransport(TTransport underlying) {
|
||||
protected TTransport getTransport(TTransport underlying) throws TTransportException {
|
||||
return new TFramedTransport(underlying);
|
||||
}
|
||||
|
||||
protected TTransport getTransport(TTransport underlying, int maxLength) {
|
||||
protected TTransport getTransport(TTransport underlying, int maxLength) throws TTransportException {
|
||||
return new TFramedTransport(underlying, maxLength);
|
||||
}
|
||||
|
||||
@ -73,6 +75,7 @@ public class TestTFramedTransport extends TestCase {
|
||||
assertEquals(30, trans.read(new byte[30], 0, 30));
|
||||
assertEquals(2, countTrans.readCount);
|
||||
|
||||
// Known message size exceeded
|
||||
readBuf = new byte[220];
|
||||
assertEquals(220, trans.read(readBuf, 0, 220));
|
||||
assertTrue(Arrays.equals(readBuf, byteSequence(0, 219)));
|
||||
@ -149,8 +152,8 @@ public class TestTFramedTransport extends TestCase {
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
dos.writeInt(50);
|
||||
dos.write(byteSequence(0, 49));
|
||||
dos.writeInt(75);
|
||||
dos.write(byteSequence(125, 200));
|
||||
dos.writeInt(50);
|
||||
dos.write(byteSequence(125, 175));
|
||||
|
||||
TMemoryBuffer membuf = new TMemoryBuffer(0);
|
||||
membuf.write(baos.toByteArray());
|
||||
@ -177,10 +180,11 @@ public class TestTFramedTransport extends TestCase {
|
||||
assertEquals(0, trans.getBytesRemainingInBuffer());
|
||||
assertEquals(50, trans.getBufferPosition());
|
||||
|
||||
// Known message size exceeded
|
||||
trans.read(readBuf, 0, 10);
|
||||
assertEquals(4, countTrans.readCount);
|
||||
assertTrue(Arrays.equals(readBuf, byteSequence(125,134)));
|
||||
assertEquals(65, trans.getBytesRemainingInBuffer());
|
||||
assertEquals(40, trans.getBytesRemainingInBuffer());
|
||||
assertEquals(10, trans.getBufferPosition());
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ public class TestTMemoryInputTransport extends TestCase {
|
||||
assertEquals(3, trans.getBytesRemainingInBuffer());
|
||||
}
|
||||
|
||||
public void testWithOffsetAndLength() throws Exception {
|
||||
public void testWithOffsetAndLength() throws TTransportException {
|
||||
byte[] input_buf = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
TMemoryInputTransport trans = new TMemoryInputTransport(input_buf, 1, 3);
|
||||
assertEquals(1, trans.getBufferPosition());
|
||||
|
@ -40,6 +40,7 @@ import javax.security.sasl.SaslServerFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.TProcessor;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
import org.apache.thrift.server.ServerTestBase;
|
||||
@ -409,9 +410,10 @@ public class TestTSaslTransports extends TestCase {
|
||||
private static class MockTTransport extends TTransport {
|
||||
|
||||
byte[] badHeader = null;
|
||||
private TMemoryInputTransport readBuffer = new TMemoryInputTransport();
|
||||
private TMemoryInputTransport readBuffer;
|
||||
|
||||
public MockTTransport(int mode) {
|
||||
public MockTTransport(int mode) throws TTransportException {
|
||||
readBuffer = new TMemoryInputTransport();
|
||||
if (mode==1) {
|
||||
// Invalid status byte
|
||||
badHeader = new byte[] { (byte)0xFF, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x05 };
|
||||
@ -443,25 +445,41 @@ public class TestTSaslTransports extends TestCase {
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int off, int len) throws TTransportException {}
|
||||
|
||||
@Override
|
||||
public TConfiguration getConfiguration() {
|
||||
return readBuffer.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
readBuffer.updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
readBuffer.checkReadBytesAvailable(numBytes);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBadHeader() {
|
||||
TSaslTransport saslTransport = new TSaslServerTransport(new MockTTransport(1));
|
||||
TSaslTransport saslTransport;
|
||||
try {
|
||||
saslTransport = new TSaslServerTransport(new MockTTransport(1));
|
||||
saslTransport.receiveSaslMessage();
|
||||
fail("Should have gotten an error due to incorrect status byte value.");
|
||||
} catch (TTransportException e) {
|
||||
assertEquals(e.getMessage(), "Invalid status -1");
|
||||
}
|
||||
saslTransport = new TSaslServerTransport(new MockTTransport(2));
|
||||
try {
|
||||
saslTransport = new TSaslServerTransport(new MockTTransport(2));
|
||||
saslTransport.receiveSaslMessage();
|
||||
fail("Should have gotten an error due to negative payload length.");
|
||||
} catch (TTransportException e) {
|
||||
assertEquals(e.getMessage(), "Invalid payload header length: -1");
|
||||
}
|
||||
saslTransport = new TSaslServerTransport(new MockTTransport(3));
|
||||
try {
|
||||
saslTransport = new TSaslServerTransport(new MockTTransport(3));
|
||||
saslTransport.receiveSaslMessage();
|
||||
fail("Should have gotten an error due to bogus (large) payload length.");
|
||||
} catch (TTransportException e) {
|
||||
|
@ -33,7 +33,7 @@ import junit.framework.TestCase;
|
||||
|
||||
public class TestTZlibTransport extends TestCase {
|
||||
|
||||
protected TTransport getTransport(TTransport underlying) {
|
||||
protected TTransport getTransport(TTransport underlying) throws TTransportException {
|
||||
return new TZlibTransport(underlying);
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,8 @@
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
|
||||
import org.apache.thrift.TConfiguration;
|
||||
|
||||
public class WriteCountingTransport extends TTransport {
|
||||
public int writeCount = 0;
|
||||
private final TTransport trans;
|
||||
@ -51,4 +53,19 @@ public class WriteCountingTransport extends TTransport {
|
||||
public void flush() throws TTransportException {
|
||||
trans.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TConfiguration getConfiguration() {
|
||||
return trans.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateKnownMessageSize(long size) throws TTransportException {
|
||||
trans.updateKnownMessageSize(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
|
||||
trans.checkReadBytesAvailable(numBytes);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user