mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 10:48:51 +00:00
b95b0ffa72
Client: D Patch: David Nadlinger D program language library and additions git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1304085 13f79535-47bb-0310-9956-ffa450edef68
397 lines
12 KiB
D
397 lines
12 KiB
D
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless enforced by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
module async_test;
|
|
|
|
import core.atomic;
|
|
import core.sync.condition : Condition;
|
|
import core.sync.mutex : Mutex;
|
|
import core.thread : dur, Thread, ThreadGroup;
|
|
import std.conv : text;
|
|
import std.datetime;
|
|
import std.getopt;
|
|
import std.exception : collectException, enforce;
|
|
import std.parallelism : TaskPool;
|
|
import std.stdio;
|
|
import std.string;
|
|
import std.variant : Variant;
|
|
import thrift.base;
|
|
import thrift.async.base;
|
|
import thrift.async.libevent;
|
|
import thrift.async.socket;
|
|
import thrift.async.ssl;
|
|
import thrift.codegen.async_client;
|
|
import thrift.codegen.async_client_pool;
|
|
import thrift.codegen.base;
|
|
import thrift.codegen.processor;
|
|
import thrift.protocol.base;
|
|
import thrift.protocol.binary;
|
|
import thrift.server.base;
|
|
import thrift.server.simple;
|
|
import thrift.server.transport.socket;
|
|
import thrift.server.transport.ssl;
|
|
import thrift.transport.base;
|
|
import thrift.transport.buffered;
|
|
import thrift.transport.ssl;
|
|
import thrift.util.cancellation;
|
|
|
|
version (Posix) {
|
|
import core.stdc.signal;
|
|
import core.sys.posix.signal;
|
|
|
|
// Disable SIGPIPE because SSL server will write to broken socket after
|
|
// client disconnected (see TSSLSocket docs).
|
|
shared static this() {
|
|
signal(SIGPIPE, SIG_IGN);
|
|
}
|
|
}
|
|
|
|
interface AsyncTest {
|
|
string echo(string value);
|
|
string delayedEcho(string value, long milliseconds);
|
|
|
|
void fail(string reason);
|
|
void delayedFail(string reason, long milliseconds);
|
|
|
|
enum methodMeta = [
|
|
TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]),
|
|
TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")])
|
|
];
|
|
alias .AsyncTestException AsyncTestException;
|
|
}
|
|
|
|
class AsyncTestException : TException {
|
|
string reason;
|
|
mixin TStructHelpers!();
|
|
}
|
|
|
|
void main(string[] args) {
|
|
ushort port = 9090;
|
|
ushort managerCount = 2;
|
|
ushort serversPerManager = 5;
|
|
ushort threadsPerServer = 10;
|
|
uint iterations = 10;
|
|
bool ssl;
|
|
bool trace;
|
|
|
|
getopt(args,
|
|
"iterations", &iterations,
|
|
"managers", &managerCount,
|
|
"port", &port,
|
|
"servers-per-manager", &serversPerManager,
|
|
"ssl", &ssl,
|
|
"threads-per-server", &threadsPerServer,
|
|
"trace", &trace,
|
|
);
|
|
|
|
TTransportFactory clientTransportFactory;
|
|
TSSLContext serverSSLContext;
|
|
if (ssl) {
|
|
auto clientSSLContext = new TSSLContext();
|
|
with (clientSSLContext) {
|
|
ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
|
|
authenticate = true;
|
|
loadTrustedCertificates("./trusted-ca-certificate.pem");
|
|
}
|
|
clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext);
|
|
|
|
serverSSLContext = new TSSLContext();
|
|
with (serverSSLContext) {
|
|
serverSide = true;
|
|
loadCertificate("./server-certificate.pem");
|
|
loadPrivateKey("./server-private-key.pem");
|
|
ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
|
|
}
|
|
} else {
|
|
clientTransportFactory = new TBufferedTransportFactory;
|
|
}
|
|
|
|
|
|
auto serverCancel = new TCancellationOrigin;
|
|
scope(exit) {
|
|
writeln("Triggering server shutdown...");
|
|
serverCancel.trigger();
|
|
writeln("done.");
|
|
}
|
|
|
|
auto managers = new TLibeventAsyncManager[managerCount];
|
|
scope (exit) foreach (ref m; managers) clear(m);
|
|
|
|
auto clientsThreads = new ThreadGroup;
|
|
foreach (managerIndex, ref manager; managers) {
|
|
manager = new TLibeventAsyncManager;
|
|
foreach (serverIndex; 0 .. serversPerManager) {
|
|
auto currentPort = cast(ushort)
|
|
(port + managerIndex * serversPerManager + serverIndex);
|
|
|
|
// Start the server and wait until it is up and running.
|
|
auto servingMutex = new Mutex;
|
|
auto servingCondition = new Condition(servingMutex);
|
|
auto handler = new PreServeNotifyHandler(servingMutex, servingCondition);
|
|
synchronized (servingMutex) {
|
|
(new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace,
|
|
serverCancel, handler)).start();
|
|
servingCondition.wait();
|
|
}
|
|
|
|
// We only run the timing tests for the first server on each async
|
|
// manager, so that we don't get spurious timing errors becaue of
|
|
// ordering issues.
|
|
auto runTimingTests = (serverIndex == 0);
|
|
|
|
auto c = new ClientsThread(manager, currentPort, clientTransportFactory,
|
|
threadsPerServer, iterations, runTimingTests, trace);
|
|
clientsThreads.add(c);
|
|
c.start();
|
|
}
|
|
}
|
|
clientsThreads.joinAll();
|
|
}
|
|
|
|
class AsyncTestHandler : AsyncTest {
|
|
this(bool trace) {
|
|
trace_ = trace;
|
|
}
|
|
|
|
override string echo(string value) {
|
|
if (trace_) writefln(`echo("%s")`, value);
|
|
return value;
|
|
}
|
|
|
|
override string delayedEcho(string value, long milliseconds) {
|
|
if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds);
|
|
Thread.sleep(dur!"msecs"(milliseconds));
|
|
if (trace_) writeln("returning.");
|
|
|
|
return value;
|
|
}
|
|
|
|
override void fail(string reason) {
|
|
if (trace_) writefln(`fail("%s")`, reason);
|
|
auto ate = new AsyncTestException;
|
|
ate.reason = reason;
|
|
throw ate;
|
|
}
|
|
|
|
override void delayedFail(string reason, long milliseconds) {
|
|
if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds);
|
|
Thread.sleep(dur!"msecs"(milliseconds));
|
|
if (trace_) writeln("returning.");
|
|
|
|
auto ate = new AsyncTestException;
|
|
ate.reason = reason;
|
|
throw ate;
|
|
}
|
|
|
|
private:
|
|
bool trace_;
|
|
AsyncTestException ate_;
|
|
}
|
|
|
|
class PreServeNotifyHandler : TServerEventHandler {
|
|
this(Mutex servingMutex, Condition servingCondition) {
|
|
servingMutex_ = servingMutex;
|
|
servingCondition_ = servingCondition;
|
|
}
|
|
|
|
void preServe() {
|
|
synchronized (servingMutex_) {
|
|
servingCondition_.notifyAll();
|
|
}
|
|
}
|
|
Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
|
|
void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
|
|
void preProcess(Variant serverContext, TTransport transport) {}
|
|
|
|
private:
|
|
Mutex servingMutex_;
|
|
Condition servingCondition_;
|
|
}
|
|
|
|
class ServerThread(ServerType) : Thread {
|
|
this(ushort port, TSSLContext sslContext, bool trace,
|
|
TCancellation cancellation, TServerEventHandler eventHandler
|
|
) {
|
|
port_ = port;
|
|
sslContext_ = sslContext;
|
|
trace_ = trace;
|
|
cancellation_ = cancellation;
|
|
eventHandler_ = eventHandler;
|
|
|
|
super(&run);
|
|
}
|
|
|
|
void run() {
|
|
TServerSocket serverSocket;
|
|
if (sslContext_) {
|
|
serverSocket = new TSSLServerSocket(port_, sslContext_);
|
|
} else {
|
|
serverSocket = new TServerSocket(port_);
|
|
}
|
|
auto transportFactory = new TBufferedTransportFactory;
|
|
auto protocolFactory = new TBinaryProtocolFactory!();
|
|
auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_));
|
|
|
|
auto server = new ServerType(processor, serverSocket, transportFactory,
|
|
protocolFactory);
|
|
server.eventHandler = eventHandler_;
|
|
|
|
writefln("Starting server on port %s...", port_);
|
|
server.serve(cancellation_);
|
|
writefln("Server thread on port %s done.", port_);
|
|
}
|
|
|
|
private:
|
|
ushort port_;
|
|
bool trace_;
|
|
TCancellation cancellation_;
|
|
TSSLContext sslContext_;
|
|
TServerEventHandler eventHandler_;
|
|
}
|
|
|
|
class ClientsThread : Thread {
|
|
this(TAsyncSocketManager manager, ushort port, TTransportFactory tf,
|
|
ushort threads, uint iterations, bool runTimingTests, bool trace
|
|
) {
|
|
manager_ = manager;
|
|
port_ = port;
|
|
transportFactory_ = tf;
|
|
threads_ = threads;
|
|
iterations_ = iterations;
|
|
runTimingTests_ = runTimingTests;
|
|
trace_ = trace;
|
|
super(&run);
|
|
}
|
|
|
|
void run() {
|
|
auto transport = new TAsyncSocket(manager_, "localhost", port_);
|
|
|
|
{
|
|
auto client = new TAsyncClient!AsyncTest(
|
|
transport,
|
|
transportFactory_,
|
|
new TBinaryProtocolFactory!()
|
|
);
|
|
transport.open();
|
|
auto clientThreads = new ThreadGroup;
|
|
foreach (clientId; 0 .. threads_) {
|
|
clientThreads.create({
|
|
auto c = clientId;
|
|
return {
|
|
foreach (i; 0 .. iterations_) {
|
|
immutable id = text(port_, ":", c, ":", i);
|
|
|
|
{
|
|
if (trace_) writefln(`Calling echo("%s")... `, id);
|
|
auto a = client.echo(id);
|
|
enforce(a == id);
|
|
if (trace_) writefln(`echo("%s") done.`, id);
|
|
}
|
|
|
|
{
|
|
if (trace_) writefln(`Calling fail("%s")... `, id);
|
|
auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet());
|
|
enforce(a && a.reason == id);
|
|
if (trace_) writefln(`fail("%s") done.`, id);
|
|
}
|
|
}
|
|
};
|
|
}());
|
|
}
|
|
clientThreads.joinAll();
|
|
transport.close();
|
|
}
|
|
|
|
if (runTimingTests_) {
|
|
auto client = new TAsyncClient!AsyncTest(
|
|
transport,
|
|
transportFactory_,
|
|
new TBinaryProtocolFactory!TBufferedTransport
|
|
);
|
|
|
|
// Temporarily redirect error logs to stdout, as SSL errors on the server
|
|
// side are expected when the client terminates aburptly (as is the case
|
|
// in the timeout test).
|
|
auto oldErrorLogSink = g_errorLogSink;
|
|
g_errorLogSink = g_infoLogSink;
|
|
scope (exit) g_errorLogSink = oldErrorLogSink;
|
|
|
|
foreach (i; 0 .. iterations_) {
|
|
transport.open();
|
|
|
|
immutable id = text(port_, ":", i);
|
|
|
|
{
|
|
if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id);
|
|
auto a = client.delayedEcho(id, 100);
|
|
enforce(!a.completion.wait(dur!"usecs"(1)),
|
|
text("wait() succeded early (", a.get(), ", ", id, ")."));
|
|
enforce(!a.completion.wait(dur!"usecs"(1)),
|
|
text("wait() succeded early (", a.get(), ", ", id, ")."));
|
|
enforce(a.completion.wait(dur!"msecs"(200)),
|
|
text("wait() didn't succeed as expected (", id, ")."));
|
|
enforce(a.get() == id);
|
|
if (trace_) writefln(`... delayedEcho("%s") done.`, id);
|
|
}
|
|
|
|
{
|
|
if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id);
|
|
auto a = client.delayedFail(id, 100);
|
|
enforce(!a.completion.wait(dur!"usecs"(1)),
|
|
text("wait() succeded early (", id, ", ", collectException(a.get()), ")."));
|
|
enforce(!a.completion.wait(dur!"usecs"(1)),
|
|
text("wait() succeded early (", id, ", ", collectException(a.get()), ")."));
|
|
enforce(a.completion.wait(dur!"msecs"(200)),
|
|
text("wait() didn't succeed as expected (", id, ")."));
|
|
auto e = cast(AsyncTestException)collectException(a.get());
|
|
enforce(e && e.reason == id);
|
|
if (trace_) writefln(`... delayedFail("%s") done.`, id);
|
|
}
|
|
|
|
{
|
|
transport.recvTimeout = dur!"msecs"(50);
|
|
|
|
if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `);
|
|
auto a = client.delayedEcho("socketTimeout", 100);
|
|
auto e = cast(TTransportException)collectException(a.waitGet());
|
|
enforce(e, text("Operation didn't fail as expected (", id, ")."));
|
|
enforce(e.type == TTransportException.Type.TIMED_OUT,
|
|
text("Wrong timeout exception type (", id, "): ", e));
|
|
if (trace_) writeln(`timed out as expected.`);
|
|
|
|
// Wait until the server thread reset before the next iteration.
|
|
Thread.sleep(dur!"msecs"(50));
|
|
transport.recvTimeout = dur!"hnsecs"(0);
|
|
}
|
|
|
|
transport.close();
|
|
}
|
|
}
|
|
|
|
writefln("Clients thread for port %s done.", port_);
|
|
}
|
|
|
|
TAsyncSocketManager manager_;
|
|
ushort port_;
|
|
TTransportFactory transportFactory_;
|
|
ushort threads_;
|
|
uint iterations_;
|
|
bool runTimingTests_;
|
|
bool trace_;
|
|
}
|