THRIFT-5624 suboptimal performance of the c# named pipe server transport in multithread servers

Client: netstd
Patch: Jens Geyer
This commit is contained in:
Jens Geyer 2022-09-09 13:39:33 +02:00 committed by Jens Geyer
parent 72d5912424
commit 60970c4e10
3 changed files with 184 additions and 80 deletions

View File

@ -28,7 +28,7 @@ namespace Thrift.Transport.Client
{
private NamedPipeClientStream PipeStream;
private readonly int ConnectTimeout;
private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
public TNamedPipeTransport(string pipe, TConfiguration config, int timeout = DEFAULT_CONNECT_TIMEOUT)
: this(".", pipe, config, timeout)
@ -61,6 +61,8 @@ namespace Thrift.Transport.Client
{
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Close();
PipeStream.Dispose();
PipeStream = null;
}
@ -107,20 +109,24 @@ namespace Thrift.Transport.Client
}
}
public override Task FlushAsync(CancellationToken cancellationToken)
public override async Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if(disposing)
if (disposing)
{
PipeStream?.Dispose();
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Close();
PipeStream.Dispose();
PipeStream = null;
}
}
}
}

View File

@ -24,41 +24,78 @@ using System.Threading.Tasks;
using System.ComponentModel;
using System.Security.AccessControl;
using System.Security.Principal;
using System.Collections.Generic;
using System.IO;
using System.Diagnostics;
#pragma warning disable CS1998 // async no await
namespace Thrift.Transport.Server
{
[Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
[Flags]
public enum NamedPipeClientFlags {
public enum NamedPipeClientFlags { // bad name
None = 0x00,
OnlyLocalClients = 0x01
};
[Flags]
public enum NamedPipeServerFlags
{
None = 0x00,
OnlyLocalClients = 0x01,
};
// ReSharper disable once InconsistentNaming
public class TNamedPipeServerTransport : TServerTransport
{
// to manage incoming connections, we set up a task for each stream to listen on
private struct TaskStreamPair
{
public NamedPipeServerStream Stream;
public Task Task;
public TaskStreamPair(NamedPipeServerStream stream, Task task)
{
Stream = stream;
Task = task;
}
}
/// <summary>
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string _pipeAddress;
private bool _asyncMode = true;
private volatile bool _isPending = true;
private NamedPipeServerStream _stream = null;
private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>();
private readonly bool _onlyLocalClients = false; // compatibility default
private readonly byte _numListenPipes = 1; // compatibility default
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags)
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
: base(config)
{
if ((numListenPipes < 1) || (numListenPipes > 254))
throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
_pipeAddress = pipeAddress;
_onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
_numListenPipes = (byte)numListenPipes;
}
[Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
: base(config)
{
if ((numListenPipes < 1) || (numListenPipes > 254))
throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
_pipeAddress = pipeAddress;
_onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
_numListenPipes = (byte)numListenPipes;
}
[Obsolete("This CTOR is deprecated, please use the other one instead.")]
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config)
: base(config)
{
_pipeAddress = pipeAddress;
_onlyLocalClients = false;
}
public override bool IsOpen() {
return true;
@ -69,75 +106,112 @@ namespace Thrift.Transport.Server
// nothing to do here
}
public override void Close()
private static void Close(NamedPipeServerStream pipe)
{
if (_stream != null)
if (pipe != null)
{
try
{
if (_stream.IsConnected)
_stream.Disconnect();
_stream.Dispose();
if (pipe.IsConnected)
pipe.Disconnect();
}
finally
{
_stream = null;
_isPending = false;
pipe.Dispose();
}
}
}
public override void Close()
{
try
{
if (_streams != null)
{
while(_streams.Count > 0)
{
Close(_streams[0].Stream);
_streams.RemoveAt(0);
}
}
}
finally
{
_streams.Clear();
_isPending = false;
}
}
public override bool IsClientPending()
{
return _isPending;
}
private void EnsurePipeInstance()
private void EnsurePipeInstances()
{
if (_stream == null)
// set up a pool for accepting multiple calls when in multithread mode
// once connected, we hand that stream over to the processor and create a fresh one
try
{
const PipeDirection direction = PipeDirection.InOut;
const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
const int inbuf = 4096;
const int outbuf = 4096;
var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
while (_streams.Count < _numListenPipes)
_streams.Add(CreatePipeInstance());
}
catch
{
// we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
// if we have at least one pipe to listen on -> Good Enough(tm)
if (_streams.Count < 1)
throw; // no pipes is really bad
}
}
private TaskStreamPair CreatePipeInstance()
{
const PipeDirection direction = PipeDirection.InOut;
const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
const int inbuf = 4096;
const int outbuf = 4096;
var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
// TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
// - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
// - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
// EITHER WAY,
// - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
// TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
// - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
// - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
// EITHER WAY,
// - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
try
NamedPipeServerStream instance;
try
{
var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
if ((handle != null) && (!handle.IsInvalid))
{
var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
if ((handle != null) && (!handle.IsInvalid))
{
_stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
handle = null; // we don't own it any longer
}
else
{
handle?.Dispose();
_stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
handle = null; // we don't own it any longer
}
catch (NotImplementedException) // Mono still does not support async, fallback to sync
else
{
if (_asyncMode)
{
options &= (~PipeOptions.Asynchronous);
_stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
_asyncMode = false;
}
else
{
throw;
}
handle?.Dispose();
instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
}
catch (NotImplementedException) // Mono still does not support async, fallback to sync
{
if (_asyncMode)
{
options &= (~PipeOptions.Asynchronous);
instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
_asyncMode = false;
}
else
{
throw;
}
}
// the task gets added later
return new TaskStreamPair( instance, null);
}
@ -248,14 +322,28 @@ namespace Thrift.Transport.Server
{
try
{
EnsurePipeInstance();
EnsurePipeInstances();
await _stream.WaitForConnectionAsync(cancellationToken);
// fill the list and wait for any task to be completed
var tasks = new List<Task>();
for (var i = 0; i < _streams.Count; ++i)
{
if (_streams[i].Task == null)
{
var pair = _streams[i];
pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
_streams[i] = pair;
}
var trans = new ServerTransport(_stream, Configuration);
_stream = null; // pass ownership to ServerTransport
tasks.Add(_streams[i].Task);
}
//_isPending = false;
// there must be an exact mapping between task index and stream index
Debug.Assert(_streams.Count == tasks.Count);
var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
var trans = new ServerTransport(_streams[index].Stream, Configuration);
_streams.RemoveAt(index); // pass stream ownership to ServerTransport
return trans;
}
@ -296,8 +384,13 @@ namespace Thrift.Transport.Server
public override void Close()
{
PipeStream?.Dispose();
PipeStream = null;
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Disconnect();
PipeStream.Dispose();
PipeStream = null;
}
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
@ -341,19 +434,23 @@ namespace Thrift.Transport.Server
}
}
public override Task FlushAsync(CancellationToken cancellationToken)
public override async Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
PipeStream?.Dispose();
if (PipeStream != null)
{
if (PipeStream.IsConnected)
PipeStream.Disconnect();
PipeStream.Dispose();
PipeStream = null;
}
}
}
}

View File

@ -204,7 +204,6 @@ namespace ThriftTest
{
//public TServer Server { get; set; }
private readonly int handlerID;
private readonly StringBuilder sb = new();
private readonly TestLogDelegate logger;
public TestHandlerAsync()
@ -216,11 +215,12 @@ namespace ThriftTest
public void TestConsoleLogger(string msg, params object[] values)
{
sb.Clear();
var sb = new StringBuilder();
sb.AppendFormat("handler{0:D3}:", handlerID);
sb.AppendFormat(msg, values);
sb.AppendLine();
Console.Write(sb.ToString());
lock (typeof(Console))
Console.Write(sb.ToString());
}
public Task testVoid(CancellationToken cancellationToken)
@ -298,7 +298,7 @@ namespace ThriftTest
public Task<Dictionary<int, int>> testMap(Dictionary<int, int>? thing, CancellationToken cancellationToken)
{
sb.Clear();
var sb = new StringBuilder();
sb.Append("testMap({{");
if (thing != null)
{
@ -323,7 +323,7 @@ namespace ThriftTest
public Task<Dictionary<string, string>> testStringMap(Dictionary<string, string>? thing, CancellationToken cancellationToken)
{
sb.Clear();
var sb = new StringBuilder();
sb.Append("testStringMap({{");
if (thing != null)
{
@ -348,7 +348,7 @@ namespace ThriftTest
public Task<HashSet<int>> testSet(HashSet<int>? thing, CancellationToken cancellationToken)
{
sb.Clear();
var sb = new StringBuilder();
sb.Append("testSet({{");
if (thing != null)
{
@ -373,7 +373,7 @@ namespace ThriftTest
public Task<List<int>> testList(List<int>? thing, CancellationToken cancellationToken)
{
sb.Clear();
var sb = new StringBuilder();
sb.Append("testList({{");
if (thing != null)
{
@ -590,7 +590,8 @@ namespace ThriftTest
{
case TransportChoice.NamedPipe:
Debug.Assert(param.pipe != null);
trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeClientFlags.OnlyLocalClients);
var numListen = (param.server == ServerChoice.Simple) ? 1 : 16;
trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeServerFlags.OnlyLocalClients, numListen);
break;