mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
7bea35a4c2
Patch: Jens Geyer
201 lines
4.5 KiB
ObjectPascal
201 lines
4.5 KiB
ObjectPascal
(*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*)
|
|
|
|
unit Thrift.Transport.STOMP;
|
|
|
|
interface
|
|
|
|
uses
|
|
Classes,Windows, SysUtils,
|
|
Thrift,
|
|
Thrift.Transport,
|
|
Thrift.Protocol,
|
|
Thrift.Stream,
|
|
StompClient,
|
|
StompTypes;
|
|
|
|
type
|
|
TStompTransportImpl = class( TStreamTransportImpl)
|
|
strict private
|
|
FData : TStringStream;
|
|
FServer : string;
|
|
FOutQueue : string;
|
|
FStompCli : IStompClient;
|
|
protected
|
|
function GetIsOpen: Boolean; override;
|
|
function Peek: Boolean; override;
|
|
public
|
|
constructor Create( const aServerAndPort, aOutQueue : string);
|
|
destructor Destroy; override;
|
|
|
|
procedure Open(); override;
|
|
procedure Close(); override;
|
|
procedure Flush; override;
|
|
end;
|
|
|
|
|
|
TStompServerTransportImpl = class( TServerTransportImpl)
|
|
strict private
|
|
FServer : string;
|
|
FInQueue : string;
|
|
FClient : IStompClient;
|
|
protected
|
|
procedure Listen; override;
|
|
procedure Close; override;
|
|
function Accept( const fnAccepting: TProc): ITransport; override;
|
|
public
|
|
constructor Create( const aServerAndPort, aInQueue : string);
|
|
destructor Destroy; override;
|
|
end;
|
|
|
|
|
|
const
|
|
QUEUE_PREFIX = '/queue/';
|
|
TOPIC_PREFIX = '/topic/';
|
|
EXCHANGE_PREFIX = '/exchange/';
|
|
|
|
|
|
implementation
|
|
|
|
|
|
|
|
constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
|
|
var adapter : IThriftStream;
|
|
begin
|
|
FData := TStringStream.Create;
|
|
FServer := aServerAndPort;
|
|
FOutQueue := aOutQueue;
|
|
|
|
adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
|
|
inherited Create( nil, adapter); // output only
|
|
end;
|
|
|
|
|
|
destructor TStompTransportImpl.Destroy;
|
|
begin
|
|
inherited Destroy;
|
|
FreeAndNil( FData);
|
|
FStompCli := nil;
|
|
end;
|
|
|
|
|
|
function TStompTransportImpl.GetIsOpen: Boolean;
|
|
begin
|
|
result := (FStompCli <> nil);
|
|
end;
|
|
|
|
|
|
function TStompTransportImpl.Peek: Boolean;
|
|
begin
|
|
result := FALSE; // output only
|
|
end;
|
|
|
|
|
|
procedure TStompTransportImpl.Open;
|
|
begin
|
|
if FStompCli <> nil
|
|
then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
|
|
else FStompCli := StompUtils.NewStomp( FServer);
|
|
end;
|
|
|
|
|
|
procedure TStompTransportImpl.Close;
|
|
begin
|
|
FStompCli := nil;
|
|
FData.Clear;
|
|
end;
|
|
|
|
|
|
procedure TStompTransportImpl.Flush;
|
|
begin
|
|
if FStompCli = nil
|
|
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
|
|
|
|
FStompCli.Send( FOutQueue, FData.DataString);
|
|
FData.Clear;
|
|
end;
|
|
|
|
|
|
//--- TStompServerTransportImpl --------------------------------------------
|
|
|
|
|
|
constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
|
|
begin
|
|
inherited Create;
|
|
FServer := aServerAndPort;
|
|
FInQueue := aInQueue;
|
|
end;
|
|
|
|
|
|
destructor TStompServerTransportImpl.Destroy;
|
|
begin
|
|
try
|
|
Close;
|
|
finally
|
|
inherited Destroy;
|
|
end;
|
|
end;
|
|
|
|
|
|
procedure TStompServerTransportImpl.Listen;
|
|
begin
|
|
FClient := StompUtils.NewStomp(FServer);
|
|
FClient.Subscribe( FInQueue);
|
|
end;
|
|
|
|
|
|
procedure TStompServerTransportImpl.Close;
|
|
begin
|
|
if FClient <> nil then begin
|
|
FClient.Unsubscribe( FInQueue);
|
|
FClient := nil;
|
|
end;
|
|
end;
|
|
|
|
|
|
function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
|
|
var frame : IStompFrame;
|
|
adapter : IThriftStream;
|
|
stream : TStringStream;
|
|
begin
|
|
if FClient = nil
|
|
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
|
|
'Not connected.');
|
|
|
|
if Assigned(fnAccepting)
|
|
then fnAccepting();
|
|
|
|
try
|
|
frame := FClient.Receive(MAXINT);
|
|
if frame = nil then Exit(nil);
|
|
|
|
stream := TStringStream.Create( frame.GetBody);
|
|
adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
|
|
result := TStreamTransportImpl.Create( adapter, nil);
|
|
|
|
except
|
|
on E: Exception
|
|
do raise TTransportException.Create( E.ToString );
|
|
end;
|
|
end;
|
|
|
|
|
|
end.
|
|
|