THRIFT-2408 Named Pipe Transport Option for C#

Patch: Carl Yeksigian & Jens Geyer
This commit is contained in:
Jens Geyer 2014-03-20 00:52:18 +02:00
parent ec8daae710
commit fd62df75fa
6 changed files with 253 additions and 33 deletions

View File

@ -58,6 +58,8 @@ THRIFTCODE= \
src/Transport/THttpClient.cs \ src/Transport/THttpClient.cs \
src/Transport/THttpHandler.cs \ src/Transport/THttpHandler.cs \
src/Transport/TMemoryBuffer.cs \ src/Transport/TMemoryBuffer.cs \
src/Transport/TNamedPipeClientTransport.cs \
src/Transport/TNamedPipeServerTransport.cs \
src/TProcessor.cs \ src/TProcessor.cs \
src/TException.cs \ src/TException.cs \
src/TApplicationException.cs src/TApplicationException.cs

View File

@ -112,6 +112,8 @@
<Compile Include="Transport\TFramedTransport.cs" /> <Compile Include="Transport\TFramedTransport.cs" />
<Compile Include="Transport\THttpClient.cs" /> <Compile Include="Transport\THttpClient.cs" />
<Compile Include="Transport\THttpHandler.cs" /> <Compile Include="Transport\THttpHandler.cs" />
<Compile Include="Transport\TNamedPipeClientTransport.cs" />
<Compile Include="Transport\TNamedPipeServerTransport.cs" />
<Compile Include="Transport\TServerSocket.cs" /> <Compile Include="Transport\TServerSocket.cs" />
<Compile Include="Transport\TServerTransport.cs" /> <Compile Include="Transport\TServerTransport.cs" />
<Compile Include="Transport\TSocket.cs" /> <Compile Include="Transport\TSocket.cs" />

View File

@ -0,0 +1,72 @@
using System.IO.Pipes;
namespace Thrift.Transport
{
public class TNamedPipeClientTransport : TTransport
{
private NamedPipeClientStream client;
private string ServerName;
private string PipeName;
public TNamedPipeClientTransport(string pipe)
{
ServerName = ".";
PipeName = pipe;
}
public TNamedPipeClientTransport(string server, string pipe)
{
ServerName = (server != "") ? server : ".";
PipeName = pipe;
}
public override bool IsOpen
{
get { return client != null && client.IsConnected; }
}
public override void Open()
{
if (IsOpen)
{
throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
}
client = new NamedPipeClientStream(ServerName, PipeName, PipeDirection.InOut, PipeOptions.None);
client.Connect();
}
public override void Close()
{
if (client != null)
{
client.Close();
client = null;
}
}
public override int Read(byte[] buf, int off, int len)
{
if (client == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
return client.Read(buf, off, len);
}
public override void Write(byte[] buf, int off, int len)
{
if (client == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
client.Write(buf, off, len);
}
protected override void Dispose(bool disposing)
{
client.Dispose();
}
}
}

View File

@ -0,0 +1,113 @@
using System;
using System.Collections.Generic;
using System.IO.Pipes;
namespace Thrift.Transport
{
public class TNamedPipeServerTransport : TServerTransport
{
/// <summary>
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string pipeAddress;
NamedPipeServerStream stream = null;
public TNamedPipeServerTransport(string pipeAddress)
{
this.pipeAddress = pipeAddress;
}
public override void Listen()
{
// nothing to do here
}
public override void Close()
{
if (stream != null)
{
try
{
stream.Close();
stream.Dispose();
}
finally
{
stream = null;
}
}
}
private void EnsurePipeInstance()
{
if( stream == null)
stream = new NamedPipeServerStream(
pipeAddress, PipeDirection.InOut, 254,
PipeTransmissionMode.Byte,
PipeOptions.None, 4096, 4096 /*TODO: security*/);
}
protected override TTransport AcceptImpl()
{
try
{
EnsurePipeInstance();
stream.WaitForConnection();
var trans = new ServerTransport(stream);
stream = null; // pass ownership to ServerTransport
return trans;
}
catch (Exception e)
{
Close();
throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
}
}
private class ServerTransport : TTransport
{
private NamedPipeServerStream server;
public ServerTransport(NamedPipeServerStream server)
{
this.server = server;
}
public override bool IsOpen
{
get { return server != null && server.IsConnected; }
}
public override void Open()
{
}
public override void Close()
{
if (server != null) server.Close();
}
public override int Read(byte[] buf, int off, int len)
{
if (server == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
return server.Read(buf, off, len);
}
public override void Write(byte[] buf, int off, int len)
{
if (server == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
server.Write(buf, off, len);
}
protected override void Dispose(bool disposing)
{
server.Dispose();
}
}
}
}

View File

@ -37,7 +37,7 @@ namespace Test
{ {
string host = "localhost"; string host = "localhost";
int port = 9090; int port = 9090;
string url = null; string url = null, pipe = null;
int numThreads = 1; int numThreads = 1;
bool buffered = false, framed = false; bool buffered = false, framed = false;
@ -72,6 +72,11 @@ namespace Test
framed = true; framed = true;
Console.WriteLine("Using framed transport"); Console.WriteLine("Using framed transport");
} }
else if (args[i] == "-pipe") // -pipe <name>
{
pipe = args[++i];
Console.WriteLine("Using named pipes transport");
}
else if (args[i] == "-t") else if (args[i] == "-t")
{ {
numThreads = Convert.ToInt32(args[++i]); numThreads = Convert.ToInt32(args[++i]);
@ -94,7 +99,14 @@ namespace Test
threads[test] = t; threads[test] = t;
if (url == null) if (url == null)
{ {
TTransport trans = new TSocket(host, port); // endpoint transport
TTransport trans = null;
if( pipe != null)
trans = new TNamedPipeClientTransport(pipe);
else
trans = new TSocket(host, port);
// layered transport
if (buffered) if (buffered)
trans = new TBufferedTransport(trans as TStreamTransport); trans = new TBufferedTransport(trans as TStreamTransport);
if (framed) if (framed)

View File

@ -322,29 +322,39 @@ namespace Test
try try
{ {
bool useBufferedSockets = false, useFramed = false; bool useBufferedSockets = false, useFramed = false;
int port = 9090; int port = 9090, i = 0;
string pipe = null;
if (args.Length > 0) if (args.Length > 0)
{ {
port = int.Parse(args[0]); i = 0;
if (args[i] == "-pipe") // -pipe name
if (args.Length > 1)
{ {
if ( args[1] == "raw" ) pipe = args[++i];
}
else // default to port number (compatibility)
{
port = int.Parse(args[i]);
}
++i;
if (args.Length > i)
{
if ( args[i] == "raw" )
{ {
// as default // as default
} }
else if ( args[1] == "buffered" ) else if ( args[i] == "buffered" )
{ {
useBufferedSockets = true; useBufferedSockets = true;
} }
else if ( args[1] == "framed" ) else if (args[i] == "framed")
{ {
useFramed = true; useFramed = true;
} }
else else
{ {
// Fall back to the older boolean syntax // Fall back to the older boolean syntax
bool.TryParse(args[1], out useBufferedSockets); bool.TryParse(args[i], out useBufferedSockets);
} }
} }
} }
@ -354,14 +364,22 @@ namespace Test
ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler); ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
// Transport // Transport
TServerSocket tServerSocket = new TServerSocket(port, 0, useBufferedSockets); TServerTransport trans;
if( pipe != null)
{
trans = new TNamedPipeServerTransport(pipe);
}
else
{
trans = new TServerSocket(port, 0, useBufferedSockets);
}
// Simple Server // Simple Server
TServer serverEngine; TServer serverEngine;
if ( useFramed ) if ( useFramed )
serverEngine = new TSimpleServer(testProcessor, tServerSocket, new TFramedTransport.Factory()); serverEngine = new TSimpleServer(testProcessor, trans, new TFramedTransport.Factory());
else else
serverEngine = new TSimpleServer(testProcessor, tServerSocket); serverEngine = new TSimpleServer(testProcessor, trans);
// ThreadPool Server // ThreadPool Server
// serverEngine = new TThreadPoolServer(testProcessor, tServerSocket); // serverEngine = new TThreadPoolServer(testProcessor, tServerSocket);
@ -372,7 +390,8 @@ namespace Test
testHandler.server = serverEngine; testHandler.server = serverEngine;
// Run it // Run it
Console.WriteLine("Starting the server on port " + port + string where = ( pipe != null ? "on pipe "+pipe : "on port " + port);
Console.WriteLine("Starting the server " +where+
(useBufferedSockets ? " with buffered socket" : "") + (useBufferedSockets ? " with buffered socket" : "") +
(useFramed ? " with framed transport" : "") + (useFramed ? " with framed transport" : "") +
"..."); "...");