THRIFT-1864 java: implement event handler for non-blocking server

Patch: Vitali Lovich
This commit is contained in:
Roger Meier 2013-03-24 21:42:35 +01:00
parent ce6d1d709a
commit 9cda78844d
3 changed files with 49 additions and 31 deletions

View File

@ -281,7 +281,24 @@ public abstract class AbstractNonblockingServer extends TServer {
// the ByteBuffer we'll be using to write and read, depending on the state
private ByteBuffer buffer_;
private TByteArrayOutputStream response_;
private final TByteArrayOutputStream response_;
// the frame that the TTransport should wrap.
private final TMemoryInputTransport frameTrans_;
// the transport that should be used to connect to clients
private final TTransport inTrans_;
private final TTransport outTrans_;
// the input protocol to use on frames
private final TProtocol inProt_;
// the output protocol to use on frames
private final TProtocol outProt_;
// context associated with this connection
private final ServerContext context_;
public FrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
@ -290,6 +307,19 @@ public abstract class AbstractNonblockingServer extends TServer {
selectionKey_ = selectionKey;
selectThread_ = selectThread;
buffer_ = ByteBuffer.allocate(4);
frameTrans_ = new TMemoryInputTransport();
response_ = new TByteArrayOutputStream();
inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
if (eventHandler_ != null) {
context_ = eventHandler_.createContext(inProt_, outProt_);
} else {
context_ = null;
}
}
/**
@ -426,6 +456,9 @@ public abstract class AbstractNonblockingServer extends TServer {
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
}
trans_.close();
if (eventHandler_ != null) {
eventHandler_.deleteContext(context_, inProt_, outProt_);
}
}
/**
@ -470,12 +503,14 @@ public abstract class AbstractNonblockingServer extends TServer {
* Actually invoke the method signified by this FrameBuffer.
*/
public void invoke() {
TTransport inTrans = getInputTransport();
TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
frameTrans_.reset(buffer_.array());
response_.reset();
try {
processorFactory_.getProcessor(inTrans).process(inProt, outProt);
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
@ -488,22 +523,6 @@ public abstract class AbstractNonblockingServer extends TServer {
requestSelectInterestChange();
}
/**
* Wrap the read buffer in a memory-based transport so a processor can read
* the data it needs to handle an invocation.
*/
private TTransport getInputTransport() {
return inputTransportFactory_.getTransport(new TMemoryInputTransport(buffer_.array()));
}
/**
* Get the transport that should be used by the invoker for responding.
*/
private TTransport getOutputTransport() {
response_ = new TByteArrayOutputStream();
return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
}
/**
* Perform a read into buffer.
*
@ -550,13 +569,4 @@ public abstract class AbstractNonblockingServer extends TServer {
}
}
} // FrameBuffer
public void setServerEventHandler(TServerEventHandler eventHandler) {
throw new UnsupportedOperationException("Not supported yet.");
}
public TServerEventHandler getEventHandler() {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -150,6 +150,10 @@ public class TNonblockingServer extends AbstractNonblockingServer {
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
processInterestChanges();

View File

@ -371,6 +371,10 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
}