THRIFT-4251: Fix JDK Epoll Bug in Thrift of TThreadedSelectorServer model.

Client: Java

This closes #1313
This commit is contained in:
Johnny-Liao 2017-07-25 14:23:28 +08:00 committed by James E. King, III
parent 8506121b3e
commit 9ffb41d94c
2 changed files with 115 additions and 39 deletions

View File

@ -19,15 +19,6 @@
package org.apache.thrift.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
@ -42,6 +33,15 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* Provides common methods and classes used by nonblocking TServer
* implementations.
@ -102,7 +102,7 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Starts any threads required for serving.
*
*
* @return true if everything went ok, false if threads could not be started.
*/
protected abstract boolean startThreads();
@ -115,7 +115,7 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Have the server transport start accepting connections.
*
*
* @return true if we started listening successfully, false if something went
* wrong.
*/
@ -139,7 +139,7 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Perform an invocation. This method could behave several different ways -
* invoke immediately inline, queue for separate execution, etc.
*
*
* @return true if invocation was successfully requested, which is not a
* guarantee that invocation has completed. False if the request
* failed.
@ -152,7 +152,7 @@ public abstract class AbstractNonblockingServer extends TServer {
* corresponding to requests.
*/
protected abstract class AbstractSelectThread extends Thread {
protected final Selector selector;
protected Selector selector;
// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
@ -285,21 +285,21 @@ public abstract class AbstractNonblockingServer extends TServer {
protected ByteBuffer buffer_;
protected final TByteArrayOutputStream response_;
// the frame that the TTransport should wrap.
protected final TMemoryInputTransport frameTrans_;
// the transport that should be used to connect to clients
protected final TTransport inTrans_;
protected final TTransport outTrans_;
// the input protocol to use on frames
protected final TProtocol inProt_;
// the output protocol to use on frames
protected final TProtocol outProt_;
// context associated with this connection
protected final ServerContext context_;
@ -328,7 +328,7 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Give this FrameBuffer a chance to read. The selector loop should have
* received a read event for this FrameBuffer.
*
*
* @return true if the connection should live on, false if it should be
* closed
*/
@ -455,7 +455,7 @@ public abstract class AbstractNonblockingServer extends TServer {
public void close() {
// if we're being closed due to an error, we might have allocated a
// buffer that we need to subtract for our memory accounting.
if (state_ == FrameBufferState.READING_FRAME ||
if (state_ == FrameBufferState.READING_FRAME ||
state_ == FrameBufferState.READ_FRAME_COMPLETE ||
state_ == FrameBufferState.AWAITING_CLOSE) {
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
@ -510,7 +510,7 @@ public abstract class AbstractNonblockingServer extends TServer {
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
try {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
@ -530,7 +530,7 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Perform a read into buffer.
*
*
* @return true if the read succeeded, false if there was an error or the
* connection closed.
*/

View File

@ -19,7 +19,15 @@
package org.apache.thrift.server;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
@ -37,24 +45,18 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Half-Sync/Half-Async server with a separate pool of threads to handle
* non-blocking I/O. Accepts are handled on a single thread, and a configurable
* number of nonblocking selector threads manage reading and writing of client
* connections. A synchronous worker thread pool handles processing of requests.
*
*
* Performs better than TNonblockingServer/THsHaServer in multi-core
* environments when the the bottleneck is CPU on the single selector thread
* handling I/O. In addition, because the accept handling is decoupled from
* reads/writes and invocation, the server has better ability to handle back-
* pressure from new connections (e.g. stop accepting when busy).
*
*
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
@ -205,7 +207,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
/**
* Start the accept and selector threads running to deal with clients.
*
*
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
@ -349,7 +351,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
/**
* Set up the AcceptThead
*
*
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
@ -478,10 +480,13 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
// Accepted connections added by the accept thread.
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
private long MONITOR_PERIOD = 1000L;
private int jvmBug = 0;
/**
* Set up the SelectorThread with an unbounded queue for incoming accepts.
*
*
* @throws IOException
* if a selector cannot be created
*/
@ -491,7 +496,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
/**
* Set up the SelectorThread with an bounded queue for incoming accepts.
*
*
* @throws IOException
* if a selector cannot be created
*/
@ -501,7 +506,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
/**
* Set up the SelectorThread with a specified queue for connections.
*
*
* @param acceptedQueue
* The BlockingQueue implementation for holding incoming accepted
* connections.
@ -515,7 +520,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
/**
* Hands off an accepted connection to be handled by this thread. This
* method will block if the queue for new connections is at capacity.
*
*
* @param accepted
* The connection that has been accepted.
* @return true if the connection has been successfully added.
@ -566,8 +571,8 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
*/
private void select() {
try {
// wait for io events.
selector.select();
doSelect();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
@ -596,6 +601,77 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
}
}
/**
* Do select and judge epoll bug happen.
* See : https://issues.apache.org/jira/browse/THRIFT-4251
*/
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();
if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}
long selectedTime = afterSelect - beforeSelect;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}
}
/**
* Replaces the current Selector of this SelectorThread with newly created Selector to work
* around the infamous epoll 100% CPU bug.
*/
private synchronized void rebuildSelector() {
final Selector oldSelector = selector;
if (oldSelector == null) {
return;
}
Selector newSelector = null;
try {
newSelector = Selector.open();
LOGGER.warn("Created new Selector.");
} catch (IOException e) {
LOGGER.error("Create new Selector error.", e);
}
for (SelectionKey key : oldSelector.selectedKeys()) {
if (!key.isValid() && key.readyOps() == 0)
continue;
SelectableChannel channel = key.channel();
Object attachment = key.attachment();
try {
if (attachment == null) {
channel.register(newSelector, key.readyOps());
} else {
channel.register(newSelector, key.readyOps(), attachment);
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);
}
}
selector = newSelector;
try {
oldSelector.close();
} catch (IOException e) {
LOGGER.error("Close old selector error.", e);
}
LOGGER.warn("Replace new selector success.");
}
private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {