mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-08 03:08:53 +00:00
417 lines
13 KiB
D
417 lines
13 KiB
D
|
/*
|
|||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|||
|
* or more contributor license agreements. See the NOTICE file
|
|||
|
* distributed with this work for additional information
|
|||
|
* regarding copyright ownership. The ASF licenses this file
|
|||
|
* to you under the Apache License, Version 2.0 (the
|
|||
|
* "License"); you may not use this file except in compliance
|
|||
|
* with the License. You may obtain a copy of the License at
|
|||
|
*
|
|||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
|
*
|
|||
|
* Unless required by applicable law or agreed to in writing,
|
|||
|
* software distributed under the License is distributed on an
|
|||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|||
|
* KIND, either express or implied. See the License for the
|
|||
|
* specific language governing permissions and limitations
|
|||
|
* under the License.
|
|||
|
*/
|
|||
|
module client_pool_test;
|
|||
|
|
|||
|
import core.time : Duration, dur;
|
|||
|
import core.thread : Thread;
|
|||
|
import std.algorithm;
|
|||
|
import std.array;
|
|||
|
import std.conv;
|
|||
|
import std.exception;
|
|||
|
import std.getopt;
|
|||
|
import std.range;
|
|||
|
import std.stdio;
|
|||
|
import std.typecons;
|
|||
|
import thrift.base;
|
|||
|
import thrift.async.libevent;
|
|||
|
import thrift.async.socket;
|
|||
|
import thrift.codegen.base;
|
|||
|
import thrift.codegen.async_client;
|
|||
|
import thrift.codegen.async_client_pool;
|
|||
|
import thrift.codegen.client;
|
|||
|
import thrift.codegen.client_pool;
|
|||
|
import thrift.codegen.processor;
|
|||
|
import thrift.protocol.binary;
|
|||
|
import thrift.server.simple;
|
|||
|
import thrift.server.transport.socket;
|
|||
|
import thrift.transport.buffered;
|
|||
|
import thrift.transport.socket;
|
|||
|
import thrift.util.cancellation;
|
|||
|
import thrift.util.future;
|
|||
|
|
|||
|
// We use this as our RPC-layer exception here to make sure socket/… problems
|
|||
|
// (that would usually considered to be RPC layer faults) cause the tests to
|
|||
|
// fail, even though we are testing the RPC exception handling.
|
|||
|
class TestServiceException : TException {
|
|||
|
int port;
|
|||
|
}
|
|||
|
|
|||
|
interface TestService {
|
|||
|
int getPort();
|
|||
|
alias .TestServiceException TestServiceException;
|
|||
|
enum methodMeta = [TMethodMeta("getPort", [],
|
|||
|
[TExceptionMeta("a", 1, "TestServiceException")])];
|
|||
|
}
|
|||
|
|
|||
|
// Use some derived service, just to check that the pools handle inheritance
|
|||
|
// correctly.
|
|||
|
interface ExTestService : TestService {
|
|||
|
int[] getPortInArray();
|
|||
|
enum methodMeta = [TMethodMeta("getPortInArray", [],
|
|||
|
[TExceptionMeta("a", 1, "TestServiceException")])];
|
|||
|
}
|
|||
|
|
|||
|
class ExTestHandler : ExTestService {
|
|||
|
this(ushort port, Duration delay, bool failing, bool trace) {
|
|||
|
this.port = port;
|
|||
|
this.delay = delay;
|
|||
|
this.failing = failing;
|
|||
|
this.trace = trace;
|
|||
|
}
|
|||
|
|
|||
|
override int getPort() {
|
|||
|
if (trace) {
|
|||
|
stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
|
|||
|
delay, failing);
|
|||
|
}
|
|||
|
sleep();
|
|||
|
failIfEnabled();
|
|||
|
return port;
|
|||
|
}
|
|||
|
|
|||
|
override int[] getPortInArray() {
|
|||
|
return [getPort()];
|
|||
|
}
|
|||
|
|
|||
|
ushort port;
|
|||
|
Duration delay;
|
|||
|
bool failing;
|
|||
|
bool trace;
|
|||
|
|
|||
|
private:
|
|||
|
void sleep() {
|
|||
|
if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
|
|||
|
}
|
|||
|
|
|||
|
void failIfEnabled() {
|
|||
|
if (!failing) return;
|
|||
|
|
|||
|
auto e = new TestServiceException;
|
|||
|
e.port = port;
|
|||
|
throw e;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
class ServerThread : Thread {
|
|||
|
this(ExTestHandler handler, TCancellation cancellation) {
|
|||
|
super(&run);
|
|||
|
handler_ = handler;
|
|||
|
cancellation_ = cancellation;
|
|||
|
}
|
|||
|
private:
|
|||
|
void run() {
|
|||
|
try {
|
|||
|
auto protocolFactory = new TBinaryProtocolFactory!();
|
|||
|
auto processor = new TServiceProcessor!ExTestService(handler_);
|
|||
|
auto serverTransport = new TServerSocket(handler_.port);
|
|||
|
serverTransport.recvTimeout = dur!"seconds"(3);
|
|||
|
auto transportFactory = new TBufferedTransportFactory;
|
|||
|
|
|||
|
auto server = new TSimpleServer(
|
|||
|
processor, serverTransport, transportFactory, protocolFactory);
|
|||
|
server.serve(cancellation_);
|
|||
|
} catch (Exception e) {
|
|||
|
writefln("Server thread on port %s failed: %s", handler_.port, e);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
TCancellation cancellation_;
|
|||
|
ExTestHandler handler_;
|
|||
|
}
|
|||
|
|
|||
|
void main(string[] args) {
|
|||
|
bool trace;
|
|||
|
ushort port = 9090;
|
|||
|
getopt(args, "port", &port, "trace", &trace);
|
|||
|
|
|||
|
auto serverCancellation = new TCancellationOrigin;
|
|||
|
scope (exit) serverCancellation.trigger();
|
|||
|
|
|||
|
immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
|
|||
|
|
|||
|
version (none) {
|
|||
|
// Cannot use this due to multiple DMD @@BUG@@s:
|
|||
|
// 1. »function D main is a nested function and cannot be accessed from array«
|
|||
|
// when calling array() on the result of the outer map() – would have to
|
|||
|
// manually do the eager evaluation/array conversion.
|
|||
|
// 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
|
|||
|
// can be worked around by calling array() on the map result first.
|
|||
|
// 3. Even when using the workarounds for the last two points, the DMD-built
|
|||
|
// executable crashes when building without (sic!) inlining enabled,
|
|||
|
// the backtrace points into the first delegate literal.
|
|||
|
auto handlers = array(map!((args){
|
|||
|
return new ExTestHandler(args._0, args._1, args._2, trace);
|
|||
|
})(zip(
|
|||
|
ports,
|
|||
|
map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
|
|||
|
[false, false, false, true, true, true]
|
|||
|
)));
|
|||
|
} else {
|
|||
|
auto handlers = [
|
|||
|
new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
|
|||
|
new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
|
|||
|
new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
|
|||
|
new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
|
|||
|
new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
|
|||
|
new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
|
|||
|
];
|
|||
|
}
|
|||
|
|
|||
|
// Fire up the server threads.
|
|||
|
foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
|
|||
|
|
|||
|
// Give the servers some time to get up. This should really be accomplished
|
|||
|
// via a barrier here and in the preServe() hook.
|
|||
|
Thread.sleep(dur!"msecs"(10));
|
|||
|
|
|||
|
syncClientPoolTest(ports, handlers);
|
|||
|
asyncClientPoolTest(ports, handlers);
|
|||
|
asyncFastestClientPoolTest(ports, handlers);
|
|||
|
asyncAggregatorTest(ports, handlers);
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
|
|||
|
auto clients = array(map!((a){
|
|||
|
return cast(TClientBase!ExTestService)tClient!ExTestService(
|
|||
|
tBinaryProtocol(new TSocket("127.0.0.1", a))
|
|||
|
);
|
|||
|
})(ports));
|
|||
|
|
|||
|
scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
|
|||
|
|
|||
|
// Try the case where the first client succeeds.
|
|||
|
{
|
|||
|
enforce(makePool(clients).getPort() == ports[0]);
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where all clients fail.
|
|||
|
{
|
|||
|
auto pool = makePool(clients[3 .. $]);
|
|||
|
auto e = cast(TCompoundOperationException)collectException(pool.getPort());
|
|||
|
enforce(e);
|
|||
|
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
|
|||
|
ports[3 .. $]));
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where the first clients fail, but a later one succeeds.
|
|||
|
{
|
|||
|
auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
|
|||
|
enforce(pool.getPortInArray() == [ports[0]]);
|
|||
|
}
|
|||
|
|
|||
|
// Make sure a client is properly deactivated when it has failed too often.
|
|||
|
{
|
|||
|
auto pool = makePool(clients);
|
|||
|
pool.faultDisableCount = 1;
|
|||
|
pool.faultDisableDuration = dur!"msecs"(50);
|
|||
|
|
|||
|
handlers[0].failing = true;
|
|||
|
enforce(pool.getPort() == ports[1]);
|
|||
|
|
|||
|
handlers[0].failing = false;
|
|||
|
enforce(pool.getPort() == ports[1]);
|
|||
|
|
|||
|
Thread.sleep(dur!"msecs"(50));
|
|||
|
enforce(pool.getPort() == ports[0]);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
auto makePool(TClientBase!ExTestService[] clients) {
|
|||
|
auto p = tClientPool(clients);
|
|||
|
p.permuteClients = false;
|
|||
|
p.rpcFaultFilter = (Exception e) {
|
|||
|
return (cast(TestServiceException)e !is null);
|
|||
|
};
|
|||
|
return p;
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
|
|||
|
auto manager = new TLibeventAsyncManager;
|
|||
|
scope (exit) manager.stop(dur!"hnsecs"(0));
|
|||
|
|
|||
|
auto clients = makeAsyncClients(manager, ports);
|
|||
|
scope(exit) foreach (c; clients) c.transport.close();
|
|||
|
|
|||
|
// Try the case where the first client succeeds.
|
|||
|
{
|
|||
|
enforce(makeAsyncPool(clients).getPort() == ports[0]);
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where all clients fail.
|
|||
|
{
|
|||
|
auto pool = makeAsyncPool(clients[3 .. $]);
|
|||
|
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
|
|||
|
enforce(e);
|
|||
|
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
|
|||
|
ports[3 .. $]));
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where the first clients fail, but a later one succeeds.
|
|||
|
{
|
|||
|
auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
|
|||
|
enforce(pool.getPortInArray() == [ports[0]]);
|
|||
|
}
|
|||
|
|
|||
|
// Make sure a client is properly deactivated when it has failed too often.
|
|||
|
{
|
|||
|
auto pool = makeAsyncPool(clients);
|
|||
|
pool.faultDisableCount = 1;
|
|||
|
pool.faultDisableDuration = dur!"msecs"(50);
|
|||
|
|
|||
|
handlers[0].failing = true;
|
|||
|
enforce(pool.getPort() == ports[1]);
|
|||
|
|
|||
|
handlers[0].failing = false;
|
|||
|
enforce(pool.getPort() == ports[1]);
|
|||
|
|
|||
|
Thread.sleep(dur!"msecs"(50));
|
|||
|
enforce(pool.getPort() == ports[0]);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
|
|||
|
auto p = tAsyncClientPool(clients);
|
|||
|
p.permuteClients = false;
|
|||
|
p.rpcFaultFilter = (Exception e) {
|
|||
|
return (cast(TestServiceException)e !is null);
|
|||
|
};
|
|||
|
return p;
|
|||
|
}
|
|||
|
|
|||
|
auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
|
|||
|
// DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
|
|||
|
// to »function D main is a nested function and cannot be accessed from array«.
|
|||
|
// Thus, we manually do the array conversion.
|
|||
|
auto lazyClients = map!((a){
|
|||
|
return new TAsyncClient!ExTestService(
|
|||
|
new TAsyncSocket(manager, "127.0.0.1", a),
|
|||
|
new TBufferedTransportFactory,
|
|||
|
new TBinaryProtocolFactory!(TBufferedTransport)
|
|||
|
);
|
|||
|
})(ports);
|
|||
|
TAsyncClientBase!ExTestService[] clients;
|
|||
|
foreach (c; lazyClients) clients ~= c;
|
|||
|
return clients;
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
|
|||
|
auto manager = new TLibeventAsyncManager;
|
|||
|
scope (exit) manager.stop(dur!"hnsecs"(0));
|
|||
|
|
|||
|
auto clients = makeAsyncClients(manager, ports);
|
|||
|
scope(exit) foreach (c; clients) c.transport.close();
|
|||
|
|
|||
|
// Make sure the fastest client wins, even if they are called in some other
|
|||
|
// order.
|
|||
|
{
|
|||
|
auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
|
|||
|
enforce(result == ports[0]);
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where all clients fail.
|
|||
|
{
|
|||
|
auto pool = makeAsyncFastestPool(clients[3 .. $]);
|
|||
|
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
|
|||
|
enforce(e);
|
|||
|
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
|
|||
|
ports[3 .. $]));
|
|||
|
}
|
|||
|
|
|||
|
// Try the case where the first clients fail, but a later one succeeds.
|
|||
|
{
|
|||
|
auto pool = makeAsyncFastestPool(clients[1 .. $]);
|
|||
|
enforce(pool.getPortInArray() == [ports[1]]);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
|
|||
|
auto p = tAsyncFastestClientPool(clients);
|
|||
|
p.rpcFaultFilter = (Exception e) {
|
|||
|
return (cast(TestServiceException)e !is null);
|
|||
|
};
|
|||
|
return p;
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
|
|||
|
auto manager = new TLibeventAsyncManager;
|
|||
|
scope (exit) manager.stop(dur!"hnsecs"(0));
|
|||
|
|
|||
|
auto clients = makeAsyncClients(manager, ports);
|
|||
|
scope(exit) foreach (c; clients) c.transport.close();
|
|||
|
|
|||
|
auto aggregator = tAsyncAggregator(
|
|||
|
cast(TAsyncClientBase!ExTestService[])clients);
|
|||
|
|
|||
|
// Test aggregator range interface.
|
|||
|
{
|
|||
|
auto range = aggregator.getPort().range(dur!"msecs"(50));
|
|||
|
enforce(equal(range, ports[0 .. 2][]));
|
|||
|
enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
|
|||
|
ports[3 .. $ - 1]));
|
|||
|
enforce(range.completedCount == 4);
|
|||
|
}
|
|||
|
|
|||
|
// Test default accumulator for scalars.
|
|||
|
{
|
|||
|
auto fullResult = aggregator.getPort().accumulate();
|
|||
|
enforce(fullResult.waitGet() == ports[0 .. 3]);
|
|||
|
|
|||
|
auto partialResult = aggregator.getPort().accumulate();
|
|||
|
Thread.sleep(dur!"msecs"(20));
|
|||
|
enforce(partialResult.finishGet() == ports[0 .. 2]);
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
// Test default accumulator for arrays.
|
|||
|
{
|
|||
|
auto fullResult = aggregator.getPortInArray().accumulate();
|
|||
|
enforce(fullResult.waitGet() == ports[0 .. 3]);
|
|||
|
|
|||
|
auto partialResult = aggregator.getPortInArray().accumulate();
|
|||
|
Thread.sleep(dur!"msecs"(20));
|
|||
|
enforce(partialResult.finishGet() == ports[0 .. 2]);
|
|||
|
}
|
|||
|
|
|||
|
// Test custom accumulator.
|
|||
|
{
|
|||
|
auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
|
|||
|
return reduce!"a + b"(results);
|
|||
|
})();
|
|||
|
enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
|
|||
|
|
|||
|
auto partialResult = aggregator.getPort().accumulate!(
|
|||
|
function(int[] results, Exception[] exceptions) {
|
|||
|
// Return a tuple of the parameters so we can check them outside of
|
|||
|
// this function (to verify the values, we need access to »ports«, but
|
|||
|
// due to DMD @@BUG5710@@, we can't use a delegate literal).f
|
|||
|
return tuple(results, exceptions);
|
|||
|
}
|
|||
|
)();
|
|||
|
Thread.sleep(dur!"msecs"(20));
|
|||
|
auto resultTuple = partialResult.finishGet();
|
|||
|
enforce(resultTuple._0 == ports[0 .. 2]);
|
|||
|
enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple._1),
|
|||
|
ports[3 .. $ - 1]));
|
|||
|
}
|
|||
|
}
|