THRIFT-3449 TBaseAsyncProcessor fb.responseReady() never called for oneway functions

This commit is contained in:
Nobuaki Sukegawa 2016-03-18 22:27:38 +09:00
parent e134cbc6ff
commit 400ae6e76e
3 changed files with 55 additions and 25 deletions

View File

@ -3325,14 +3325,22 @@ void t_java_generator::generate_process_async_function(t_service* tservice, t_fu
indent(f_service_) << "try {" << endl;
indent(f_service_)
<< " fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"
<< " fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"
<< endl;
indent(f_service_) << " return;" << endl;
indent(f_service_) << "} catch (org.apache.thrift.transport.TTransportException e) {" << endl;
indent_up();
f_service_ << indent()
<< "LOGGER.error(\"TTransportException writing to internal frame buffer\", e);"
<< endl
<< indent() << "fb.close();" << endl;
indent_down();
indent(f_service_) << "} catch (Exception e) {" << endl;
indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", e);"
<< endl;
indent_up();
f_service_ << indent() << "LOGGER.error(\"Exception writing to internal frame buffer\", e);"
<< endl
<< indent() << "onError(e);" << endl;
indent_down();
indent(f_service_) << "}" << endl;
indent(f_service_) << "fb.close();" << endl;
}
indent_down();
indent(f_service_) << "}" << endl;
@ -3341,7 +3349,21 @@ void t_java_generator::generate_process_async_function(t_service* tservice, t_fu
indent_up();
if (tfunction->is_oneway()) {
indent(f_service_) << "if (e instanceof org.apache.thrift.transport.TTransportException) {"
<< endl;
indent_up();
f_service_ << indent() << "LOGGER.error(\"TTransportException inside handler\", e);" << endl
<< indent() << "fb.close();" << endl;
indent_down();
indent(f_service_) << "} else {" << endl;
indent_up();
f_service_ << indent() << "LOGGER.error(\"Exception inside oneway handler\", e);" << endl;
indent_down();
indent(f_service_) << "}" << endl;
} else {
indent(f_service_) << "byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;" << endl;
indent(f_service_) << "org.apache.thrift.TSerializable msg;" << endl;

View File

@ -54,11 +54,13 @@ public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor {
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
if (!fn.isOneway()) {
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
}
fb.responseReady();
return true;
}
@ -70,19 +72,24 @@ public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor {
args.read(in);
} catch (TProtocolException e) {
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
if (!fn.isOneway()) {
TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
}
fb.responseReady();
return true;
}
in.readMessageEnd();
if (fn.isOneway()) {
fb.responseReady();
}
//start off processing function
AsyncMethodCallback resultHandler = fn.getResultHandler(fb,msg.seqid);
AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid);
try {
fn.start(iface, args, resultHandler);
} catch (Exception e) {

View File

@ -281,7 +281,7 @@ public abstract class ServerTestBase extends TestCase {
public static final String HOST = "localhost";
public static final int PORT = Integer.valueOf(
System.getProperty("test.port", "9090"));
protected static final int SOCKET_TIMEOUT = 1000;
protected static final int SOCKET_TIMEOUT = 1500;
private static final Xtruct XSTRUCT = new Xtruct("Zero", (byte) 1, -3, -5);
private static final Xtruct2 XSTRUCT2 = new Xtruct2((byte)1, XSTRUCT, 5);
@ -418,8 +418,7 @@ public abstract class ServerTestBase extends TestCase {
testInsanity(testClient);
testException(testClient);
testOneway(testClient);
// FIXME: a call after oneway does not work for async client
// testI32(testClient);
testI32(testClient);
transport.close();
stopServer();
@ -486,7 +485,10 @@ public abstract class ServerTestBase extends TestCase {
}
private void testOneway(ThriftTest.Client testClient) throws Exception {
testClient.testOneway(3);
long begin = System.currentTimeMillis();
testClient.testOneway(1);
long elapsed = System.currentTimeMillis() - begin;
assertTrue(elapsed < 500);
}
private void testSet(ThriftTest.Client testClient) throws TException {
@ -531,21 +533,20 @@ public abstract class ServerTestBase extends TestCase {
}
public void testTransportFactory() throws Exception {
for (TProtocolFactory protoFactory : getProtocols()) {
TestHandler handler = new TestHandler();
ThriftTest.Processor processor = new ThriftTest.Processor(handler);
final CallCountingTransportFactory factory = new CallCountingTransportFactory(new TFramedTransport.Factory());
startServer(processor, protoFactory, factory);
assertEquals(0, factory.count);
TSocket socket = new TSocket(HOST, PORT);
socket.setTimeout(SOCKET_TIMEOUT);
TTransport transport = getClientTransport(socket);
open(transport);
TProtocol protocol = protoFactory.getProtocol(transport);
ThriftTest.Client testClient = new ThriftTest.Client(protocol);
assertEquals(0, testClient.testByte((byte) 0));