From 4b66a9d8812cf2c19f15382c56ec8e1327b6f2a1 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Thu, 5 Mar 2020 00:46:21 +0100 Subject: [PATCH] THRIFT-5133: Use ArrayPool when reading and writing strings to improve performance Client: netstd Patch: Mikel Blanchard This closes #2057 --- .../CompactProtocolBenchmarks.cs | 75 +++++++++++++++++++ .../Benchmarks/Thrift.Benchmarks/Program.cs | 29 +++++++ .../Thrift.Benchmarks.csproj | 35 +++++++++ lib/netstd/Directory.Build.props | 7 ++ lib/netstd/Thrift.sln | 21 +++++- .../Thrift/Protocol/TCompactProtocol.cs | 35 ++++++--- lib/netstd/Thrift/Thrift.csproj | 2 +- .../Thrift/Transport/Client/THttpTransport.cs | 4 + .../Transport/Client/TNamedPipeTransport.cs | 4 + .../Transport/Client/TStreamTransport.cs | 6 +- .../Server/TNamedPipeServerTransport.cs | 4 + 11 files changed, 209 insertions(+), 13 deletions(-) create mode 100644 lib/netstd/Benchmarks/Thrift.Benchmarks/CompactProtocolBenchmarks.cs create mode 100644 lib/netstd/Benchmarks/Thrift.Benchmarks/Program.cs create mode 100644 lib/netstd/Benchmarks/Thrift.Benchmarks/Thrift.Benchmarks.csproj create mode 100644 lib/netstd/Directory.Build.props diff --git a/lib/netstd/Benchmarks/Thrift.Benchmarks/CompactProtocolBenchmarks.cs b/lib/netstd/Benchmarks/Thrift.Benchmarks/CompactProtocolBenchmarks.cs new file mode 100644 index 000000000..cb6b07f94 --- /dev/null +++ b/lib/netstd/Benchmarks/Thrift.Benchmarks/CompactProtocolBenchmarks.cs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using System.Threading.Tasks; + +using BenchmarkDotNet.Attributes; + +using Thrift.Protocol; +using Thrift.Transport.Client; + +namespace Thrift.Benchmarks +{ + [MemoryDiagnoser] + public class CompactProtocolBenchmarks + { + private MemoryStream _Stream; + private TCompactProtocol _Protocol; + + [Params(10000)] + public int NumberOfOperationsPerIteration { get; set; } + + [GlobalSetup] + public void GlobalSetup() + { + _Stream = new MemoryStream(); + var transport = new TStreamTransport(_Stream, _Stream, null); + _Protocol = new TCompactProtocol(transport); + } + + [GlobalCleanup] + public void GlobalCleanup() + { + _Protocol.Dispose(); + } + + [Benchmark] + public async Task WriteString() + { + for (int i = 0; i < NumberOfOperationsPerIteration; i++) + { + await _Protocol.WriteStringAsync("Thrift String Benchmark"); + + _Stream.Seek(0, SeekOrigin.Begin); + } + } + + [Benchmark] + public async Task ReadString() + { + await _Protocol.WriteStringAsync("Thrift String Benchmark"); + + for (int i = 0; i < NumberOfOperationsPerIteration; i++) + { + _Stream.Seek(0, SeekOrigin.Begin); + + await _Protocol.ReadStringAsync(); + } + } + } +} diff --git a/lib/netstd/Benchmarks/Thrift.Benchmarks/Program.cs b/lib/netstd/Benchmarks/Thrift.Benchmarks/Program.cs new file mode 100644 index 000000000..923d73ef5 --- /dev/null +++ b/lib/netstd/Benchmarks/Thrift.Benchmarks/Program.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using BenchmarkDotNet.Running; + +namespace Thrift.Benchmarks +{ + internal static class Program + { + public static void Main(string[] args) + { + BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); + } + } +} diff --git a/lib/netstd/Benchmarks/Thrift.Benchmarks/Thrift.Benchmarks.csproj b/lib/netstd/Benchmarks/Thrift.Benchmarks/Thrift.Benchmarks.csproj new file mode 100644 index 000000000..35138d863 --- /dev/null +++ b/lib/netstd/Benchmarks/Thrift.Benchmarks/Thrift.Benchmarks.csproj @@ -0,0 +1,35 @@ + + + + + Exe + netcoreapp3.1;net48 + false + + + + + + + + + + + diff --git a/lib/netstd/Directory.Build.props b/lib/netstd/Directory.Build.props new file mode 100644 index 000000000..3bd9541a4 --- /dev/null +++ b/lib/netstd/Directory.Build.props @@ -0,0 +1,7 @@ + + + + + + + diff --git a/lib/netstd/Thrift.sln b/lib/netstd/Thrift.sln index 2952eb0d8..58c76ced9 100644 --- a/lib/netstd/Thrift.sln +++ b/lib/netstd/Thrift.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.26730.12 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29905.134 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{ED5A45B0-07D1-4507-96B7-83FBD3D031CA}" EndProject @@ -12,6 +12,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Thrift.Tests", "Tests\Thrif EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Thrift.PublicInterfaces.Compile.Tests", "Tests\Thrift.PublicInterfaces.Compile.Tests\Thrift.PublicInterfaces.Compile.Tests.csproj", "{A6AE021D-61CB-4D84-A103-0B663C62AE2C}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", "{BF7B896B-8BB6-447C-84F8-26871882A14A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thrift.Benchmarks", "Benchmarks\Thrift.Benchmarks\Thrift.Benchmarks.csproj", "{D0559DFF-6632-446C-9EFC-C750DA20B1D9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -70,6 +74,18 @@ Global {A6AE021D-61CB-4D84-A103-0B663C62AE2C}.Release|x64.Build.0 = Release|Any CPU {A6AE021D-61CB-4D84-A103-0B663C62AE2C}.Release|x86.ActiveCfg = Release|Any CPU {A6AE021D-61CB-4D84-A103-0B663C62AE2C}.Release|x86.Build.0 = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|x64.ActiveCfg = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|x64.Build.0 = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|x86.ActiveCfg = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Debug|x86.Build.0 = Debug|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|Any CPU.Build.0 = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|x64.ActiveCfg = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|x64.Build.0 = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|x86.ActiveCfg = Release|Any CPU + {D0559DFF-6632-446C-9EFC-C750DA20B1D9}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -78,6 +94,7 @@ Global {837F4084-AAD7-45F5-BC96-10E05A669DB4} = {ED5A45B0-07D1-4507-96B7-83FBD3D031CA} {0790D388-1A3C-4423-8CF2-C97074A8B68B} = {ED5A45B0-07D1-4507-96B7-83FBD3D031CA} {A6AE021D-61CB-4D84-A103-0B663C62AE2C} = {ED5A45B0-07D1-4507-96B7-83FBD3D031CA} + {D0559DFF-6632-446C-9EFC-C750DA20B1D9} = {BF7B896B-8BB6-447C-84F8-26871882A14A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FD20BC4A-0109-41D8-8C0C-893E784D7EF9} diff --git a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs index a8a46f2a4..bb531f42e 100644 --- a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs +++ b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Buffers; using System.Buffers.Binary; using System.Collections.Generic; using System.Diagnostics; @@ -66,7 +67,7 @@ namespace Thrift.Protocol // minimize memory allocations by means of an preallocated bytes buffer // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) - private byte[] PreAllocatedBuffer = new byte[128]; + private readonly byte[] PreAllocatedBuffer = new byte[128]; private struct VarInt { @@ -411,11 +412,19 @@ namespace Thrift.Protocol return; } - var bytes = Encoding.UTF8.GetBytes(str); + var buf = ArrayPool.Shared.Rent(Encoding.UTF8.GetByteCount(str)); + try + { + var numberOfBytes = Encoding.UTF8.GetBytes(str, 0, str.Length, buf, 0); - Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt); - await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); - await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); + Int32ToVarInt((uint)numberOfBytes, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + await Trans.WriteAsync(buf, 0, numberOfBytes, cancellationToken); + } + finally + { + ArrayPool.Shared.Return(buf); + } } public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) @@ -706,9 +715,17 @@ namespace Thrift.Protocol } Transport.CheckReadBytesAvailable(length); - var buf = new byte[length]; - await Trans.ReadAllAsync(buf, 0, length, cancellationToken); - return Encoding.UTF8.GetString(buf, 0, length); + + var buf = ArrayPool.Shared.Rent(length); + try + { + await Trans.ReadAllAsync(buf, 0, length, cancellationToken); + return Encoding.UTF8.GetString(buf, 0, length); + } + finally + { + ArrayPool.Shared.Return(buf); + } } public override async ValueTask ReadBinaryAsync(CancellationToken cancellationToken) @@ -717,7 +734,7 @@ namespace Thrift.Protocol var length = (int) await ReadVarInt32Async(cancellationToken); if (length == 0) { - return new byte[0]; + return Array.Empty(); } // read data diff --git a/lib/netstd/Thrift/Thrift.csproj b/lib/netstd/Thrift/Thrift.csproj index e40db00d2..b63a12a66 100644 --- a/lib/netstd/Thrift/Thrift.csproj +++ b/lib/netstd/Thrift/Thrift.csproj @@ -19,7 +19,7 @@ --> - netstandard2.0 + netstandard2.1;netstandard2.0 Thrift Thrift true diff --git a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs index bbd94fa98..0790cc882 100644 --- a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs +++ b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs @@ -109,7 +109,11 @@ namespace Thrift.Transport.Client try { +#if NETSTANDARD2_1 + var ret = await _inputStream.ReadAsync(new Memory(buffer, offset, length), cancellationToken); +#else var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); +#endif if (ret == -1) { throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs index f7f10b71a..8dab6a063 100644 --- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs +++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs @@ -72,7 +72,11 @@ namespace Thrift.Transport.Client } CheckReadBytesAvailable(length); +#if NETSTANDARD2_1 + var numRead = await PipeStream.ReadAsync(new Memory(buffer, offset, length), cancellationToken); +#else var numRead = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); +#endif CountConsumedMessageBytes(numRead); return numRead; } diff --git a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs index e04b3b333..ccadad025 100644 --- a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs +++ b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - +using System; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -82,7 +82,11 @@ namespace Thrift.Transport.Client "Cannot read from null inputstream"); } +#if NETSTANDARD2_1 + return await InputStream.ReadAsync(new Memory(buffer, offset, length), cancellationToken); +#else return await InputStream.ReadAsync(buffer, offset, length, cancellationToken); +#endif } public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs index a8b64c495..4b82cbd22 100644 --- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs +++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs @@ -277,7 +277,11 @@ namespace Thrift.Transport.Server } CheckReadBytesAvailable(length); +#if NETSTANDARD2_1 + var numBytes = await PipeStream.ReadAsync(new Memory(buffer, offset, length), cancellationToken); +#else var numBytes = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); +#endif CountConsumedMessageBytes(numBytes); return numBytes; }