THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Jens Geyer
This commit is contained in:
Jens Geyer 2013-03-25 01:28:12 +02:00
parent cac2c5761e
commit 8a70196d00
8 changed files with 1118 additions and 0 deletions

View File

@ -0,0 +1,182 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Thrift.Processor.Multiplex;
interface
uses
SysUtils,
Generics.Collections,
Thrift,
Thrift.Protocol,
Thrift.Protocol.Multiplex;
{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
To do so, you instantiate the processor and then register additional processors with it,
as shown in the following example:
TMultiplexedProcessor processor = new TMultiplexedProcessor();
processor.registerProcessor(
"Calculator",
new Calculator.Processor(new CalculatorHandler()));
processor.registerProcessor(
"WeatherReport",
new WeatherReport.Processor(new WeatherReportHandler()));
TServerTransport t = new TServerSocket(9090);
TSimpleServer server = new TSimpleServer(processor, t);
server.serve();
}
type
IMultiplexedProcessor = interface( IProcessor)
['{810FF32D-22A2-4D58-B129-B0590703ECEC}']
// Register a service with this TMultiplexedProcessor. This allows us
// to broker requests to individual services by using the service name
// to select them at request time.
procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
end;
TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
private type
// Our goal was to work with any protocol. In order to do that, we needed
// to allow them to call readMessageBegin() and get a TMessage in exactly
// the standard format, without the service name prepended to TMessage.name.
TStoredMessageProtocol = class( TProtocolDecorator)
private
FMessageBegin : IMessage;
public
constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage);
function ReadMessageBegin: IMessage; override;
end;
private
FServiceProcessorMap : TDictionary<String, IProcessor>;
public
constructor Create;
destructor Destroy; override;
// Register a service with this TMultiplexedProcessorImpl. This allows us
// to broker requests to individual services by using the service name
// to select them at request time.
procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
{ This implementation of process performs the following steps:
- Read the beginning of the message.
- Extract the service name from the message.
- Using the service name to locate the appropriate processor.
- Dispatch to the processor, with a decorated instance of TProtocol
that allows readMessageBegin() to return the original TMessage.
An exception is thrown if the message type is not CALL or ONEWAY
or if the service is unknown (or not properly registered).
}
function Process(const iprot, oprot : IProtocol) : Boolean;
end;
implementation
constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage);
begin
inherited Create( protocol);
FMessageBegin := aMsgBegin;
end;
function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage;
begin
result := FMessageBegin;
end;
constructor TMultiplexedProcessorImpl.Create;
begin
inherited Create;
FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
end;
destructor TMultiplexedProcessorImpl.Destroy;
begin
try
FreeAndNil( FServiceProcessorMap);
finally
inherited Destroy;
end;
end;
procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
begin
FServiceProcessorMap.Add( serviceName, processor);
end;
function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean;
var msg, newMsg : IMessage;
idx : Integer;
sService : string;
processor : IProcessor;
protocol : IProtocol;
const
ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
begin
// Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
// This pulls the message "off the wire", which we'll deal with at the end of this method.
msg := iprot.readMessageBegin();
if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway])
then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType,
ERROR_INVALID_MSGTYPE);
// Extract the service name
idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
if idx < 1
then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol,
Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
// Create a new TMessage, something that can be consumed by any TProtocol
sService := Copy( msg.Name, 1, idx-1);
if not FServiceProcessorMap.TryGetValue( sService, processor)
then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError,
Format(ERROR_UNKNOWN_SERVICE,[sService]));
// Create a new TMessage, removing the service name
Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
// Dispatch processing to the stored processor
protocol := TStoredMessageProtocol.Create( iprot, newMsg);
result := processor.process( protocol, oprot);
end;
end.

View File

@ -0,0 +1,107 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Thrift.Protocol.Multiplex;
interface
uses Thrift.Protocol;
{ TMultiplexedProtocol is a protocol-independent concrete decorator
that allows a Thrift client to communicate with a multiplexing Thrift server,
by prepending the service name to the function name during function calls.
NOTE: THIS IS NOT USED BY SERVERS.
On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
This example uses a single socket transport to invoke two services:
TSocket transport = new TSocket("localhost", 9090);
transport.open();
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
Calculator.Client service = new Calculator.Client(mp);
TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
WeatherReport.Client service2 = new WeatherReport.Client(mp2);
System.out.println(service.add(2,2));
System.out.println(service2.getTemperature());
}
type
TMultiplexedProtocol = class( TProtocolDecorator)
public const
{ Used to delimit the service name from the function name }
SEPARATOR = ':';
private
FServiceName : String;
public
{ Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server.
The serviceName is required as it is prepended to the message header so that the multiplexing
server can broker the function call to the proper service.
Args:
protocol ....... Your communication protocol of choice, e.g. TBinaryProtocol.
serviceName .... The service name of the service communicating via this protocol.
}
constructor Create( const aProtocol : IProtocol; const aServiceName : string);
{ Prepends the service name to the function name, separated by SEPARATOR.
Args: The original message.
}
procedure WriteMessageBegin( const msg: IMessage); override;
end;
implementation
constructor TMultiplexedProtocol.Create(const aProtocol: IProtocol; const aServiceName: string);
begin
ASSERT( aServiceName <> '');
inherited Create(aProtocol);
FServiceName := aServiceName;
end;
procedure TMultiplexedProtocol.WriteMessageBegin( const msg: IMessage);
// Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
var newMsg : IMessage;
begin
case msg.Type_ of
TMessageType.Call,
TMessageType.Oneway : begin
newMsg := TMessageImpl.Create( FServiceName + SEPARATOR + msg.Name, msg.Type_, msg.SeqID);
inherited WriteMessageBegin( newMsg);
end;
else
inherited WriteMessageBegin( msg);
end;
end;
end.

View File

@ -437,6 +437,69 @@ type
procedure SetReadLength( readLength: Integer ); procedure SetReadLength( readLength: Integer );
end; end;
{ TProtocolDecorator forwards all requests to an enclosed TProtocol instance,
providing a way to author concise concrete decorator subclasses. The decorator
does not (and should not) modify the behaviour of the enclosed TProtocol
See p.175 of Design Patterns (by Gamma et al.)
}
TProtocolDecorator = class( TProtocolImpl)
private
FWrappedProtocol : IProtocol;
public
// Encloses the specified protocol.
// All operations will be forward to the given protocol. Must be non-null.
constructor Create( const aProtocol : IProtocol);
procedure WriteMessageBegin( const msg: IMessage); override;
procedure WriteMessageEnd; override;
procedure WriteStructBegin( const struc: IStruct); override;
procedure WriteStructEnd; override;
procedure WriteFieldBegin( const field: IField); override;
procedure WriteFieldEnd; override;
procedure WriteFieldStop; override;
procedure WriteMapBegin( const map: IMap); override;
procedure WriteMapEnd; override;
procedure WriteListBegin( const list: IList); override;
procedure WriteListEnd(); override;
procedure WriteSetBegin( const set_: ISet ); override;
procedure WriteSetEnd(); override;
procedure WriteBool( b: Boolean); override;
procedure WriteByte( b: ShortInt); override;
procedure WriteI16( i16: SmallInt); override;
procedure WriteI32( i32: Integer); override;
procedure WriteI64( const i64: Int64); override;
procedure WriteDouble( const d: Double); override;
procedure WriteString( const s: string ); override;
procedure WriteAnsiString( const s: AnsiString); override;
procedure WriteBinary( const b: TBytes); override;
function ReadMessageBegin: IMessage; override;
procedure ReadMessageEnd(); override;
function ReadStructBegin: IStruct; override;
procedure ReadStructEnd; override;
function ReadFieldBegin: IField; override;
procedure ReadFieldEnd(); override;
function ReadMapBegin: IMap; override;
procedure ReadMapEnd(); override;
function ReadListBegin: IList; override;
procedure ReadListEnd(); override;
function ReadSetBegin: ISet; override;
procedure ReadSetEnd(); override;
function ReadBool: Boolean; override;
function ReadByte: ShortInt; override;
function ReadI16: SmallInt; override;
function ReadI32: Integer; override;
function ReadI64: Int64; override;
function ReadDouble:Double; override;
function ReadBinary: TBytes; override;
function ReadString: string; override;
function ReadAnsiString: AnsiString; override;
end;
implementation implementation
function ConvertInt64ToDouble( const n: Int64): Double; function ConvertInt64ToDouble( const n: Int64): Double;
@ -1228,5 +1291,275 @@ begin
Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite); Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite);
end; end;
{ TProtocolDecorator }
constructor TProtocolDecorator.Create( const aProtocol : IProtocol);
begin
ASSERT( aProtocol <> nil);
inherited Create( aProtocol.Transport);
FWrappedProtocol := aProtocol;
end;
procedure TProtocolDecorator.WriteMessageBegin( const msg: IMessage);
begin
FWrappedProtocol.WriteMessageBegin( msg);
end;
procedure TProtocolDecorator.WriteMessageEnd;
begin
FWrappedProtocol.WriteMessageEnd;
end;
procedure TProtocolDecorator.WriteStructBegin( const struc: IStruct);
begin
FWrappedProtocol.WriteStructBegin( struc);
end;
procedure TProtocolDecorator.WriteStructEnd;
begin
FWrappedProtocol.WriteStructEnd;
end;
procedure TProtocolDecorator.WriteFieldBegin( const field: IField);
begin
FWrappedProtocol.WriteFieldBegin( field);
end;
procedure TProtocolDecorator.WriteFieldEnd;
begin
FWrappedProtocol.WriteFieldEnd;
end;
procedure TProtocolDecorator.WriteFieldStop;
begin
FWrappedProtocol.WriteFieldStop;
end;
procedure TProtocolDecorator.WriteMapBegin( const map: IMap);
begin
FWrappedProtocol.WriteMapBegin( map);
end;
procedure TProtocolDecorator.WriteMapEnd;
begin
FWrappedProtocol.WriteMapEnd;
end;
procedure TProtocolDecorator.WriteListBegin( const list: IList);
begin
FWrappedProtocol.WriteListBegin( list);
end;
procedure TProtocolDecorator.WriteListEnd();
begin
FWrappedProtocol.WriteListEnd();
end;
procedure TProtocolDecorator.WriteSetBegin( const set_: ISet );
begin
FWrappedProtocol.WriteSetBegin( set_);
end;
procedure TProtocolDecorator.WriteSetEnd();
begin
FWrappedProtocol.WriteSetEnd();
end;
procedure TProtocolDecorator.WriteBool( b: Boolean);
begin
FWrappedProtocol.WriteBool( b);
end;
procedure TProtocolDecorator.WriteByte( b: ShortInt);
begin
FWrappedProtocol.WriteByte( b);
end;
procedure TProtocolDecorator.WriteI16( i16: SmallInt);
begin
FWrappedProtocol.WriteI16( i16);
end;
procedure TProtocolDecorator.WriteI32( i32: Integer);
begin
FWrappedProtocol.WriteI32( i32);
end;
procedure TProtocolDecorator.WriteI64( const i64: Int64);
begin
FWrappedProtocol.WriteI64( i64);
end;
procedure TProtocolDecorator.WriteDouble( const d: Double);
begin
FWrappedProtocol.WriteDouble( d);
end;
procedure TProtocolDecorator.WriteString( const s: string );
begin
FWrappedProtocol.WriteString( s);
end;
procedure TProtocolDecorator.WriteAnsiString( const s: AnsiString);
begin
FWrappedProtocol.WriteAnsiString( s);
end;
procedure TProtocolDecorator.WriteBinary( const b: TBytes);
begin
FWrappedProtocol.WriteBinary( b);
end;
function TProtocolDecorator.ReadMessageBegin: IMessage;
begin
result := FWrappedProtocol.ReadMessageBegin;
end;
procedure TProtocolDecorator.ReadMessageEnd();
begin
FWrappedProtocol.ReadMessageEnd();
end;
function TProtocolDecorator.ReadStructBegin: IStruct;
begin
result := FWrappedProtocol.ReadStructBegin;
end;
procedure TProtocolDecorator.ReadStructEnd;
begin
FWrappedProtocol.ReadStructEnd;
end;
function TProtocolDecorator.ReadFieldBegin: IField;
begin
result := FWrappedProtocol.ReadFieldBegin;
end;
procedure TProtocolDecorator.ReadFieldEnd();
begin
FWrappedProtocol.ReadFieldEnd();
end;
function TProtocolDecorator.ReadMapBegin: IMap;
begin
result := FWrappedProtocol.ReadMapBegin;
end;
procedure TProtocolDecorator.ReadMapEnd();
begin
FWrappedProtocol.ReadMapEnd();
end;
function TProtocolDecorator.ReadListBegin: IList;
begin
result := FWrappedProtocol.ReadListBegin;
end;
procedure TProtocolDecorator.ReadListEnd();
begin
FWrappedProtocol.ReadListEnd();
end;
function TProtocolDecorator.ReadSetBegin: ISet;
begin
result := FWrappedProtocol.ReadSetBegin;
end;
procedure TProtocolDecorator.ReadSetEnd();
begin
FWrappedProtocol.ReadSetEnd();
end;
function TProtocolDecorator.ReadBool: Boolean;
begin
result := FWrappedProtocol.ReadBool;
end;
function TProtocolDecorator.ReadByte: ShortInt;
begin
result := FWrappedProtocol.ReadByte;
end;
function TProtocolDecorator.ReadI16: SmallInt;
begin
result := FWrappedProtocol.ReadI16;
end;
function TProtocolDecorator.ReadI32: Integer;
begin
result := FWrappedProtocol.ReadI32;
end;
function TProtocolDecorator.ReadI64: Int64;
begin
result := FWrappedProtocol.ReadI64;
end;
function TProtocolDecorator.ReadDouble:Double;
begin
result := FWrappedProtocol.ReadDouble;
end;
function TProtocolDecorator.ReadBinary: TBytes;
begin
result := FWrappedProtocol.ReadBinary;
end;
function TProtocolDecorator.ReadString: string;
begin
result := FWrappedProtocol.ReadString;
end;
function TProtocolDecorator.ReadAnsiString: AnsiString;
begin
result := FWrappedProtocol.ReadAnsiString;
end;
end. end.

View File

@ -0,0 +1,130 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Multiplex.Client.Main;
{.$DEFINE StressTest} // activate to stress-test the server with frequent connects/disconnects
{.$DEFINE PerfTest} // activate to activate the performance test
interface
uses
Windows, SysUtils, Classes,
DateUtils,
Generics.Collections,
Thrift,
Thrift.Protocol,
Thrift.Protocol.Multiplex,
Thrift.Transport.Pipes,
Thrift.Transport,
Thrift.Stream,
Thrift.Collections,
Benchmark, // in gen-delphi folder
Aggr, // in gen-delphi folder
Multiplex.Test.Common;
type
TTestClient = class
protected
FProtocol : IProtocol;
procedure ParseArgs( const args: array of string);
procedure Setup;
procedure Run;
public
constructor Create( const args: array of string);
class procedure Execute( const args: array of string);
end;
implementation
type
IServiceClient = interface
['{7745C1C2-AB20-43BA-B6F0-08BF92DE0BAC}']
procedure Test;
end;
//--- TTestClient -------------------------------------
class procedure TTestClient.Execute( const args: array of string);
var client : TTestClient;
begin
client := TTestClient.Create(args);
try
client.Run;
finally
client.Free;
end;
end;
constructor TTestClient.Create( const args: array of string);
begin
ParseArgs(args);
Setup;
end;
procedure TTestClient.ParseArgs( const args: array of string);
begin
if Length(args) <> 0
then raise Exception.Create('No args accepted so far');
end;
procedure TTestClient.Setup;
var trans : ITransport;
begin
trans := TSocketImpl.Create( 'localhost', 9090);
trans := TFramedTransportImpl.Create( trans);
trans.Open;
FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE);
end;
procedure TTestClient.Run;
var bench : TBenchmarkService.Iface;
aggr : TAggr.Iface;
multiplex : IProtocol;
i : Integer;
begin
try
multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_BENCHMARKSERVICE);
bench := TBenchmarkService.TClient.Create( multiplex);
multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_AGGR);
aggr := TAggr.TClient.Create( multiplex);
for i := 1 to 10
do aggr.addValue( bench.fibonacci(i));
for i in aggr.getValues
do Write(IntToStr(i)+' ');
WriteLn;
except
on e:Exception do Writeln(#10+e.Message);
end;
end;
end.

View File

@ -0,0 +1,201 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Multiplex.Server.Main;
{$WARN SYMBOL_PLATFORM OFF}
{.$DEFINE RunEndless} // activate to interactively stress-test the server stop routines via Ctrl+C
interface
uses
Windows, SysUtils,
Generics.Collections,
Thrift.Console,
Thrift.Server,
Thrift.Transport,
Thrift.Transport.Pipes,
Thrift.Protocol,
Thrift.Protocol.Multiplex,
Thrift.Processor.Multiplex,
Thrift.Collections,
Thrift.Utils,
Thrift,
Benchmark, // in gen-delphi folder
Aggr, // in gen-delphi folder
Multiplex.Test.Common,
Contnrs;
type
TTestServer = class
public type
ITestHandler = interface
['{CAE09AAB-80FB-48E9-B3A8-7F9B96F5419A}']
procedure SetServer( const AServer : IServer );
end;
protected type
TTestHandlerImpl = class( TInterfacedObject, ITestHandler)
private
FServer : IServer;
protected
// ITestHandler
procedure SetServer( const AServer : IServer );
property Server : IServer read FServer write SetServer;
end;
TBenchmarkServiceImpl = class( TTestHandlerImpl, TBenchmarkService.Iface)
protected
// TBenchmarkService.Iface
function fibonacci(n: ShortInt): Integer;
end;
TAggrImpl = class( TTestHandlerImpl, TAggr.Iface)
protected
FList : IThriftList<Integer>;
// TAggr.Iface
procedure addValue(value: Integer);
function getValues(): IThriftList<Integer>;
public
constructor Create;
destructor Destroy; override;
end;
public
class procedure Execute( const args: array of string);
end;
implementation
{ TTestServer.TTestHandlerImpl }
procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
begin
FServer := AServer;
end;
{ TTestServer.TBenchmarkServiceImpl }
function TTestServer.TBenchmarkServiceImpl.fibonacci(n: ShortInt): Integer;
var prev, next : Integer;
begin
prev := 0;
result := 1;
while n > 0 do begin
next := result + prev;
prev := result;
result := next;
Dec(n);
end;
end;
{ TTestServer.TAggrImpl }
constructor TTestServer.TAggrImpl.Create;
begin
inherited Create;
FList := TThriftListImpl<Integer>.Create;
end;
destructor TTestServer.TAggrImpl.Destroy;
begin
try
FreeAndNil( FList);
finally
inherited Destroy;
end;
end;
procedure TTestServer.TAggrImpl.addValue(value: Integer);
begin
FList.Add( value);
end;
function TTestServer.TAggrImpl.getValues(): IThriftList<Integer>;
begin
result := FList;
end;
{ TTestServer }
class procedure TTestServer.Execute( const args: array of string);
var
TransportFactory : ITransportFactory;
ProtocolFactory : IProtocolFactory;
ServerTrans : IServerTransport;
benchHandler : TBenchmarkService.Iface;
aggrHandler : TAggr.Iface;
benchProcessor : IProcessor;
aggrProcessor : IProcessor;
multiplex : IMultiplexedProcessor;
ServerEngine : IServer;
begin
try
// create protocol factory, default to BinaryProtocol
ProtocolFactory := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE);
servertrans := TServerSocketImpl.Create( 9090, 0, FALSE);
TransportFactory := TFramedTransportImpl.TFactory.Create;
benchHandler := TBenchmarkServiceImpl.Create;
benchProcessor := TBenchmarkService.TProcessorImpl.Create( benchHandler);
aggrHandler := TAggrImpl.Create;
aggrProcessor := TAggr.TProcessorImpl.Create( aggrHandler);
multiplex := TMultiplexedProcessorImpl.Create;
multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor);
multiplex.RegisterProcessor( NAME_AGGR, aggrProcessor);
ServerEngine := TSimpleServer.Create( multiplex,
ServerTrans,
TransportFactory,
ProtocolFactory);
(benchHandler as ITestHandler).SetServer( ServerEngine);
(aggrHandler as ITestHandler).SetServer( ServerEngine);
Console.WriteLine('Starting the server ...');
ServerEngine.serve();
(benchHandler as ITestHandler).SetServer( nil);
(aggrHandler as ITestHandler).SetServer( nil);
except
on E: Exception do
begin
Console.Write( E.Message);
end;
end;
Console.WriteLine( 'done.');
end;
end.

View File

@ -0,0 +1,65 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
program Multiplex.Test.Client;
{$APPTYPE CONSOLE}
uses
SysUtils,
Multiplex.Client.Main in 'Multiplex.Client.Main.pas',
Thrift in '..\..\src\Thrift.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',
Thrift.Collections in '..\..\src\Thrift.Collections.pas',
Thrift.Server in '..\..\src\Thrift.Server.pas',
Thrift.Stream in '..\..\src\Thrift.Stream.pas',
Thrift.Console in '..\..\src\Thrift.Console.pas',
Thrift.Utils in '..\..\src\Thrift.Utils.pas';
var
nParamCount : Integer;
args : array of string;
i : Integer;
arg : string;
s : string;
begin
try
Writeln( 'Multiplex TestClient '+Thrift.Version);
nParamCount := ParamCount;
SetLength( args, nParamCount);
for i := 1 to nParamCount do
begin
arg := ParamStr( i );
args[i-1] := arg;
end;
TTestClient.Execute( args );
Readln;
except
on E: Exception do begin
Writeln(E.ClassName, ': ', E.Message);
ExitCode := $FFFF;
end;
end;
end.

View File

@ -0,0 +1,35 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Multiplex.Test.Common;
interface
const
NAME_BENCHMARKSERVICE = 'BenchmarkService';
NAME_AGGR = 'Aggr';
implementation
// nix
end.

View File

@ -0,0 +1,65 @@
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
program Multiplex.Test.Server;
{$APPTYPE CONSOLE}
uses
SysUtils,
Multiplex.Server.Main in 'Multiplex.Server.Main.pas',
Thrift in '..\..\src\Thrift.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',
Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas',
Thrift.Collections in '..\..\src\Thrift.Collections.pas',
Thrift.Server in '..\..\src\Thrift.Server.pas',
Thrift.Console in '..\..\src\Thrift.Console.pas',
Thrift.Utils in '..\..\src\Thrift.Utils.pas',
Thrift.Stream in '..\..\src\Thrift.Stream.pas';
var
nParamCount : Integer;
args : array of string;
i : Integer;
arg : string;
s : string;
begin
try
Writeln( 'Multiplex TestServer '+Thrift.Version);
nParamCount := ParamCount;
SetLength( args, nParamCount);
for i := 1 to nParamCount do
begin
arg := ParamStr( i );
args[i-1] := arg;
end;
TTestServer.Execute( args );
Writeln('Press ENTER to close ... '); Readln;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
end.