mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-2382 contrib: sample for connecting Thrift with STOMP
Patch: Jens Geyer
This commit is contained in:
parent
c975bbcc9c
commit
7bea35a4c2
18
contrib/Stomp/README
Normal file
18
contrib/Stomp/README
Normal file
@ -0,0 +1,18 @@
|
||||
Sample code for STOMP-based Thrift clients and/or servers.
|
||||
|
||||
Although the sample Thrift STOMP Transport is written in
|
||||
Delphi/Pascal, it can easily serve as a starting point for
|
||||
similar implementations in other languages.
|
||||
|
||||
STOMP is a protocol widely supported by many messaging systems,
|
||||
such as Apache ActiveMQ, RabbitMQ and many others. In particular,
|
||||
it can be used to communicate with Service-Bus products like Rebus
|
||||
or NServiceBus, when running against a STOMP-capable MQ system.
|
||||
|
||||
A prerequisite for this sample is the Delphi STOMP Adapter written
|
||||
by Daniele Teti (http://www.danieleteti.it/stomp-client), currently
|
||||
hosted at Google Code (http://code.google.com/p/delphistompclient).
|
||||
|
||||
At the time of writing, the STOMP adapter does not fully support
|
||||
binary data. Please check whether this has been fixed, otherwise
|
||||
you have to use the JSON protocol (or to fix it on your own).
|
200
contrib/Stomp/Thrift.Transport.STOMP.pas
Normal file
200
contrib/Stomp/Thrift.Transport.STOMP.pas
Normal file
@ -0,0 +1,200 @@
|
||||
(*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*)
|
||||
|
||||
unit Thrift.Transport.STOMP;
|
||||
|
||||
interface
|
||||
|
||||
uses
|
||||
Classes,Windows, SysUtils,
|
||||
Thrift,
|
||||
Thrift.Transport,
|
||||
Thrift.Protocol,
|
||||
Thrift.Stream,
|
||||
StompClient,
|
||||
StompTypes;
|
||||
|
||||
type
|
||||
TStompTransportImpl = class( TStreamTransportImpl)
|
||||
strict private
|
||||
FData : TStringStream;
|
||||
FServer : string;
|
||||
FOutQueue : string;
|
||||
FStompCli : IStompClient;
|
||||
protected
|
||||
function GetIsOpen: Boolean; override;
|
||||
function Peek: Boolean; override;
|
||||
public
|
||||
constructor Create( const aServerAndPort, aOutQueue : string);
|
||||
destructor Destroy; override;
|
||||
|
||||
procedure Open(); override;
|
||||
procedure Close(); override;
|
||||
procedure Flush; override;
|
||||
end;
|
||||
|
||||
|
||||
TStompServerTransportImpl = class( TServerTransportImpl)
|
||||
strict private
|
||||
FServer : string;
|
||||
FInQueue : string;
|
||||
FClient : IStompClient;
|
||||
protected
|
||||
procedure Listen; override;
|
||||
procedure Close; override;
|
||||
function Accept( const fnAccepting: TProc): ITransport; override;
|
||||
public
|
||||
constructor Create( const aServerAndPort, aInQueue : string);
|
||||
destructor Destroy; override;
|
||||
end;
|
||||
|
||||
|
||||
const
|
||||
QUEUE_PREFIX = '/queue/';
|
||||
TOPIC_PREFIX = '/topic/';
|
||||
EXCHANGE_PREFIX = '/exchange/';
|
||||
|
||||
|
||||
implementation
|
||||
|
||||
|
||||
|
||||
constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
|
||||
var adapter : IThriftStream;
|
||||
begin
|
||||
FData := TStringStream.Create;
|
||||
FServer := aServerAndPort;
|
||||
FOutQueue := aOutQueue;
|
||||
|
||||
adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
|
||||
inherited Create( nil, adapter); // output only
|
||||
end;
|
||||
|
||||
|
||||
destructor TStompTransportImpl.Destroy;
|
||||
begin
|
||||
inherited Destroy;
|
||||
FreeAndNil( FData);
|
||||
FStompCli := nil;
|
||||
end;
|
||||
|
||||
|
||||
function TStompTransportImpl.GetIsOpen: Boolean;
|
||||
begin
|
||||
result := (FStompCli <> nil);
|
||||
end;
|
||||
|
||||
|
||||
function TStompTransportImpl.Peek: Boolean;
|
||||
begin
|
||||
result := FALSE; // output only
|
||||
end;
|
||||
|
||||
|
||||
procedure TStompTransportImpl.Open;
|
||||
begin
|
||||
if FStompCli <> nil
|
||||
then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
|
||||
else FStompCli := StompUtils.NewStomp( FServer);
|
||||
end;
|
||||
|
||||
|
||||
procedure TStompTransportImpl.Close;
|
||||
begin
|
||||
FStompCli := nil;
|
||||
FData.Clear;
|
||||
end;
|
||||
|
||||
|
||||
procedure TStompTransportImpl.Flush;
|
||||
begin
|
||||
if FStompCli = nil
|
||||
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
|
||||
|
||||
FStompCli.Send( FOutQueue, FData.DataString);
|
||||
FData.Clear;
|
||||
end;
|
||||
|
||||
|
||||
//--- TStompServerTransportImpl --------------------------------------------
|
||||
|
||||
|
||||
constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
|
||||
begin
|
||||
inherited Create;
|
||||
FServer := aServerAndPort;
|
||||
FInQueue := aInQueue;
|
||||
end;
|
||||
|
||||
|
||||
destructor TStompServerTransportImpl.Destroy;
|
||||
begin
|
||||
try
|
||||
Close;
|
||||
finally
|
||||
inherited Destroy;
|
||||
end;
|
||||
end;
|
||||
|
||||
|
||||
procedure TStompServerTransportImpl.Listen;
|
||||
begin
|
||||
FClient := StompUtils.NewStomp(FServer);
|
||||
FClient.Subscribe( FInQueue);
|
||||
end;
|
||||
|
||||
|
||||
procedure TStompServerTransportImpl.Close;
|
||||
begin
|
||||
if FClient <> nil then begin
|
||||
FClient.Unsubscribe( FInQueue);
|
||||
FClient := nil;
|
||||
end;
|
||||
end;
|
||||
|
||||
|
||||
function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
|
||||
var frame : IStompFrame;
|
||||
adapter : IThriftStream;
|
||||
stream : TStringStream;
|
||||
begin
|
||||
if FClient = nil
|
||||
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
|
||||
'Not connected.');
|
||||
|
||||
if Assigned(fnAccepting)
|
||||
then fnAccepting();
|
||||
|
||||
try
|
||||
frame := FClient.Receive(MAXINT);
|
||||
if frame = nil then Exit(nil);
|
||||
|
||||
stream := TStringStream.Create( frame.GetBody);
|
||||
adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
|
||||
result := TStreamTransportImpl.Create( adapter, nil);
|
||||
|
||||
except
|
||||
on E: Exception
|
||||
do raise TTransportException.Create( E.ToString );
|
||||
end;
|
||||
end;
|
||||
|
||||
|
||||
end.
|
||||
|
Loading…
Reference in New Issue
Block a user