From 892088dc28845671e9f1b45d312ca784dc5ced63 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 03:53:20 +0100 Subject: [PATCH 01/11] Reduce memory usage by using the body directly in the pipe --- .../WireFormatting/MethodFraming.cs | 10 +- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 + .../client/ChunckedSequence.cs | 76 ++++++++++++ .../client/RentedOutgoingMemory.cs | 116 ++++++++++++++++++ .../RabbitMQ.Client/client/api/IChannel.cs | 27 +++- .../client/impl/AutorecoveringChannel.cs | 17 ++- .../client/impl/ChannelBase.cs | 25 +++- .../RabbitMQ.Client/client/impl/Connection.cs | 4 +- projects/RabbitMQ.Client/client/impl/Frame.cs | 70 ++++++++--- .../client/impl/IFrameHandler.cs | 2 +- .../RabbitMQ.Client/client/impl/ISession.cs | 11 +- .../client/impl/SessionBase.cs | 21 +++- .../client/impl/SocketFrameHandler.cs | 18 +-- projects/Test/Unit/TestFrameFormatting.cs | 27 ++-- 14 files changed, 370 insertions(+), 55 deletions(-) create mode 100644 projects/RabbitMQ.Client/client/ChunckedSequence.cs create mode 100644 projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs diff --git a/projects/Benchmarks/WireFormatting/MethodFraming.cs b/projects/Benchmarks/WireFormatting/MethodFraming.cs index 7e66796e5f..6fc40f3097 100644 --- a/projects/Benchmarks/WireFormatting/MethodFraming.cs +++ b/projects/Benchmarks/WireFormatting/MethodFraming.cs @@ -19,7 +19,7 @@ public class MethodFramingBasicAck public ushort Channel { get; set; } [Benchmark] - internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel); + internal RentedOutgoingMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel); } [Config(typeof(Config))] @@ -41,13 +41,13 @@ public class MethodFramingBasicPublish public int FrameMax { get; set; } [Benchmark] - internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax); [Benchmark] - internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); [Benchmark] - internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); } [Config(typeof(Config))] @@ -60,6 +60,6 @@ public class MethodFramingChannelClose public ushort Channel { get; set; } [Benchmark] - internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel); + internal RentedOutgoingMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel); } } diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 88fb81ef82..ec70403ee9 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -67,6 +67,7 @@ + diff --git a/projects/RabbitMQ.Client/client/ChunckedSequence.cs b/projects/RabbitMQ.Client/client/ChunckedSequence.cs new file mode 100644 index 0000000000..e85aa04551 --- /dev/null +++ b/projects/RabbitMQ.Client/client/ChunckedSequence.cs @@ -0,0 +1,76 @@ +using System; +using System.Buffers; + +namespace RabbitMQ.Client; + +internal ref struct ChunkedSequence +{ + private ReadOnlyChunk _first; + private ReadOnlyChunk _current; + + private bool _changed; + private ReadOnlySequence? _sequence; + + public ChunkedSequence() + { + _first = _current = null; + _sequence = null; + _changed = false; + } + + public void Append(ReadOnlySequence sequence) + { + SequencePosition pos = sequence.Start; + while (sequence.TryGet(ref pos, out ReadOnlyMemory mem)) + { + Append(mem); + } + } + + public void Append(ReadOnlyMemory memory) + { + if (_current == null) + { + _first = _current = new ReadOnlyChunk(memory); + } + else + { + _current = _current.Append(memory); + } + + _changed = true; + } + + internal ReadOnlySequence GetSequence() + { + if (_changed) + { + _sequence = new ReadOnlySequence(_first, 0, _current, _current.Memory.Length); + } + else + { + _sequence ??= new ReadOnlySequence(); + } + + return _sequence.Value; + } + + private sealed class ReadOnlyChunk : ReadOnlySequenceSegment + { + public ReadOnlyChunk(ReadOnlyMemory memory) + { + Memory = memory; + } + + public ReadOnlyChunk Append(ReadOnlyMemory memory) + { + ReadOnlyChunk nextChunk = new(memory) + { + RunningIndex = RunningIndex + Memory.Length + }; + + Next = nextChunk; + return nextChunk; + } + } +} diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs new file mode 100644 index 0000000000..2bad94a5f3 --- /dev/null +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -0,0 +1,116 @@ +#nullable enable + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Threading.Tasks; +using Microsoft.Extensions.ObjectPool; + +namespace RabbitMQ.Client +{ + internal class RentedOutgoingMemory : IDisposable, IResettable + { + private static readonly ObjectPool s_pool = ObjectPool.Create(); + + private bool _disposedValue; + private TaskCompletionSource? _sendCompletionSource; + + internal ReadOnlySequence Data; + internal byte[]? RentedArray; + + internal int Size => (int) Data.Length; + + /// + /// Mark the data as sent. + /// + public void DidSend() + { + if (_sendCompletionSource is null) + { + Dispose(); + } + else + { + _sendCompletionSource.SetResult(true); + } + } + + /// + /// Wait for the data to be sent. + /// + /// A that completes when the data is sent. + public ValueTask WaitForDataSendAsync() + { + return _sendCompletionSource is null ? default : WaitForFinishCore(); + + async ValueTask WaitForFinishCore() + { + await _sendCompletionSource.Task.ConfigureAwait(false); + Dispose(); + } + } + + public void WriteTo(PipeWriter pipeWriter) + { + foreach (ReadOnlyMemory memory in Data) + { + pipeWriter.Write(memory.Span); + } + } + + private void Dispose(bool disposing) + { + if (_disposedValue) + { + return; + } + + if (disposing && RentedArray != null) + { + ClientArrayPool.Return(RentedArray); + } + + _disposedValue = true; + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + bool IResettable.TryReset() + { + if (!_disposedValue) + { + return false; + } + + _disposedValue = false; + RentedArray = default; + _sendCompletionSource = default; + Data = default; + return true; + } + + public static RentedOutgoingMemory Create(ReadOnlySequence mem, byte[] buffer, bool waitSend = false) + { + var rented = s_pool.Get(); + + rented.Data = mem; + rented.RentedArray = buffer; + + if (waitSend) + { + rented._sendCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + return rented; + } + + public static RentedOutgoingMemory Create(ReadOnlyMemory mem, byte[] buffer, bool waitSend = false) + { + return Create(new ReadOnlySequence(mem), buffer, waitSend); + } + } +} diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 9d3248d22a..20e074c764 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -258,7 +259,7 @@ void BasicPublish(CachedString exchange, CachedString routingKey, i /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -269,7 +270,29 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool copyBody = false) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + + /// + /// Asynchronously publishes a message. + /// + /// + /// + /// Routing key must be shorter than 255 bytes. + /// + /// + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool copyBody = true) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + + /// + /// Asynchronously publishes a message. + /// + /// + /// + /// Routing key must be shorter than 255 bytes. + /// + /// + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; #nullable disable diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index d37f7a4764..71ccb8d9d2 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; @@ -327,13 +328,21 @@ public void BasicPublish(CachedString exchange, CachedString routin where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) { diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 4aa0c31f16..7e93311978 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -507,7 +508,7 @@ protected void ChannelSend(in TMethod method, in THeader heade } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlySequence body, bool copyBody) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -516,7 +517,7 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead _flowControlBlock.Wait(); } - return Session.TransmitAsync(in method, in header, body); + return Session.TransmitAsync(in method, in header, body, copyBody); } internal void OnCallbackException(CallbackExceptionEventArgs args) @@ -1258,7 +1259,7 @@ public void BasicPublish(CachedString exchange, CachedString routin ChannelSend(in cmd, in basicProperties, body); } - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1270,10 +1271,16 @@ public ValueTask BasicPublishAsync(string exchange, string routingK } var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + return ModelSendAsync(in cmd, in basicProperties, body, copyBody); } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); + } + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1285,7 +1292,13 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr } var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + return ModelSendAsync(in cmd, in basicProperties, body, copyBody); + } + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); } public void UpdateSecret(string newSecret, string reason) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 4417d991e3..e1eae85dd5 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -539,7 +539,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) _callbackExceptionWrapper.Invoke(this, args); } - internal void Write(RentedMemory frames) + internal void Write(RentedOutgoingMemory frames) { ValueTask task = _frameHandler.WriteAsync(frames); if (!task.IsCompletedSuccessfully) @@ -548,7 +548,7 @@ internal void Write(RentedMemory frames) } } - internal ValueTask WriteAsync(RentedMemory frames) + internal ValueTask WriteAsync(RentedOutgoingMemory frames) { return _frameHandler.WriteAsync(frames); } diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index ad5eab4c19..a01f6d87ac 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -113,13 +113,29 @@ internal static class BodySegment public const int FrameSize = BaseFrameSize; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int WriteTo(Span span, ushort channel, ReadOnlySpan body) + public static int WriteTo(ref ChunkedSequence data, Memory buffer, ushort channel, ReadOnlySequence body, bool copyBody) { const int StartBodyArgument = StartPayload; - NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); - body.CopyTo(span.Slice(StartBodyArgument)); - span[StartPayload + body.Length] = Constants.FrameEnd; - return body.Length + BaseFrameSize; + NetworkOrderSerializer.WriteUInt64(ref buffer.Span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); + + if (copyBody) + { + int length = (int)body.Length; + Span span = buffer.Span; + + body.CopyTo(span.Slice(StartBodyArgument)); + span[StartPayload + length] = Constants.FrameEnd; + + data.Append(buffer.Slice(0, length + BaseFrameSize)); + + return length + BaseFrameSize; + } + + data.Append(buffer.Slice(0, StartBodyArgument)); + data.Append(body); + buffer.Span[StartBodyArgument] = Constants.FrameEnd; + data.Append(buffer.Slice(StartBodyArgument, 1)); + return BaseFrameSize; } } @@ -139,18 +155,18 @@ internal static class Heartbeat /// private static ReadOnlySpan Payload => new byte[] { Constants.FrameHeartbeat, 0, 0, 0, 0, 0, 0, Constants.FrameEnd }; - public static RentedMemory GetHeartbeatFrame() + public static RentedOutgoingMemory GetHeartbeatFrame() { // Is returned by SocketFrameHandler.WriteLoop byte[] buffer = ClientArrayPool.Rent(FrameSize); Payload.CopyTo(buffer); var mem = new ReadOnlyMemory(buffer, 0, FrameSize); - return new RentedMemory(mem, buffer); + return RentedOutgoingMemory.Create(mem, buffer); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static RentedMemory SerializeToFrames(ref T method, ushort channelNumber) + public static RentedOutgoingMemory SerializeToFrames(ref T method, ushort channelNumber) where T : struct, IOutgoingAmqpMethod { int size = Method.FrameSize + method.GetRequiredBufferSize(); @@ -161,35 +177,57 @@ public static RentedMemory SerializeToFrames(ref T method, ushort channelNumb System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); var mem = new ReadOnlyMemory(array, 0, size); - return new RentedMemory(mem, array); + return RentedOutgoingMemory.Create(mem, array); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static RentedMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) + public static RentedOutgoingMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlySequence body, ushort channelNumber, int maxBodyPayloadBytes, bool copyBody = true) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { - int remainingBodyBytes = body.Length; + int remainingBodyBytes = (int) body.Length; int size = Method.FrameSize + Header.FrameSize + method.GetRequiredBufferSize() + header.GetRequiredBufferSize() + - BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes) + remainingBodyBytes; + BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes); + + if (copyBody) + { + size += remainingBodyBytes; + } // Will be returned by SocketFrameWriter.WriteLoop byte[] array = ClientArrayPool.Rent(size); + ChunkedSequence sequence = new ChunkedSequence(); + Memory buffer = array.AsMemory(); + int offset = Method.WriteTo(array, channelNumber, ref method); offset += Header.WriteTo(array.AsSpan(offset), channelNumber, ref header, remainingBodyBytes); - ReadOnlySpan bodySpan = body.Span; + + sequence.Append(buffer.Slice(0, offset)); + buffer = buffer.Slice(offset); + + ReadOnlySequence remainingBody = body; while (remainingBodyBytes > 0) { int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes; - offset += BodySegment.WriteTo(array.AsSpan(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize)); + int segmentSize = BodySegment.WriteTo(ref sequence, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody); + + buffer = buffer.Slice(segmentSize); + offset += segmentSize; remainingBodyBytes -= frameSize; } System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); - var mem = new ReadOnlyMemory(array, 0, size); - return new RentedMemory(mem, array); + return RentedOutgoingMemory.Create(sequence.GetSequence(), array, waitSend: !copyBody); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static RentedOutgoingMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + return SerializeToFrames(ref method, ref header, new ReadOnlySequence(body), channelNumber, maxBodyPayloadBytes); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs index beeaf85de2..df07a7e1ee 100644 --- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs @@ -73,6 +73,6 @@ internal interface IFrameHandler Task SendProtocolHeaderAsync(CancellationToken cancellationToken); // TODO cancellation token for write timeout / cancellation? - ValueTask WriteAsync(RentedMemory frames); + ValueTask WriteAsync(RentedOutgoingMemory frames); } } diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 1d5a34f76b..0ad03f0d6a 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Threading.Tasks; using RabbitMQ.Client.Framing.Impl; @@ -79,13 +80,21 @@ internal interface ISession void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod; + void Transmit(in TMethod cmd, in THeader header, ReadOnlySequence body) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader; + void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingAmqpMethod; - ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool copyBody = true) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader; + + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool copyBody = true) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index a433626878..35cf143427 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -151,7 +152,7 @@ public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingA return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); } - public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) + public void Transmit(in TMethod cmd, in THeader header, ReadOnlySequence body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -163,7 +164,14 @@ public void Transmit(in TMethod cmd, in THeader header, ReadOn Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); } - public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) + public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + Transmit(cmd, header, new ReadOnlySequence(body)); + } + + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool copyBody = true) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -172,7 +180,14 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody)); + } + + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool copyBody = true) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + return TransmitAsync(cmd, header, new ReadOnlySequence(body), copyBody); } private void ThrowAlreadyClosedException() diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index ef922b8c59..a2a9f42e36 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -48,8 +48,8 @@ internal sealed class SocketFrameHandler : IFrameHandler private readonly Func _socketFactory; private readonly TimeSpan _connectionTimeout; - private readonly ChannelWriter _channelWriter; - private readonly ChannelReader _channelReader; + private readonly ChannelWriter _channelWriter; + private readonly ChannelReader _channelReader; private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1); private IPAddress[] _amqpTcpEndpointAddresses; @@ -76,7 +76,7 @@ public SocketFrameHandler(AmqpTcpEndpoint amqpTcpEndpoint, _readTimeout = readTimeout; _writeTimeout = writeTimeout; - var channel = Channel.CreateBounded( + var channel = Channel.CreateBounded( new BoundedChannelOptions(128) { AllowSynchronousContinuations = false, @@ -296,7 +296,7 @@ await _pipeWriter.FlushAsync(cancellationToken) .ConfigureAwait(false); } - public async ValueTask WriteAsync(RentedMemory frames) + public async ValueTask WriteAsync(RentedOutgoingMemory frames) { if (_closed) { @@ -307,6 +307,9 @@ public async ValueTask WriteAsync(RentedMemory frames) { await _channelWriter.WriteAsync(frames) .ConfigureAwait(false); + + await frames.WaitForDataSendAsync() + .ConfigureAwait(false); } } @@ -332,17 +335,18 @@ private async Task WriteLoop() { while (await _channelReader.WaitToReadAsync().ConfigureAwait(false)) { - while (_channelReader.TryRead(out RentedMemory frames)) + while (_channelReader.TryRead(out RentedOutgoingMemory frames)) { try { - await _pipeWriter.WriteAsync(frames.Memory) + frames.WriteTo(_pipeWriter); + await _pipeWriter.FlushAsync() .ConfigureAwait(false); RabbitMqClientEventSource.Log.CommandSent(frames.Size); } finally { - frames.Dispose(); + frames.DidSend(); } } diff --git a/projects/Test/Unit/TestFrameFormatting.cs b/projects/Test/Unit/TestFrameFormatting.cs index 9bbb31e46d..43cd189dd8 100644 --- a/projects/Test/Unit/TestFrameFormatting.cs +++ b/projects/Test/Unit/TestFrameFormatting.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using RabbitMQ.Client; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Impl; @@ -42,12 +43,15 @@ public class TestFrameFormatting : WireFormattingFixture [Fact] public void HeartbeatFrame() { - RentedMemory sfc = Framing.Heartbeat.GetHeartbeatFrame(); - ReadOnlySpan frameSpan = sfc.Memory.Span; + RentedOutgoingMemory sfc = Framing.Heartbeat.GetHeartbeatFrame(); try { - Assert.Equal(8, frameSpan.Length); + Assert.Equal(8, sfc.Size); + + Span frameSpan = stackalloc byte[8]; + sfc.Data.CopyTo(frameSpan); + Assert.Equal(Constants.FrameHeartbeat, frameSpan[0]); Assert.Equal(0, frameSpan[1]); // channel Assert.Equal(0, frameSpan[2]); // channel @@ -59,7 +63,7 @@ public void HeartbeatFrame() } finally { - ClientArrayPool.Return(sfc.RentedArray); + sfc.Dispose(); } } @@ -133,14 +137,21 @@ public void MethodFrame() Assert.Equal(Constants.FrameEnd, frameBytes[18]); } - [Fact] - public void BodySegmentFrame() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void BodySegmentFrame(bool copyBody) { const int Channel = 3; byte[] payload = new byte[4]; - byte[] frameBytes = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + payload.Length]; - RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(frameBytes, Channel, payload); + byte[] buffer = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + (copyBody ? payload.Length : 0)]; + + ChunkedSequence segment = new ChunkedSequence(); + + RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref segment, buffer, Channel, new ReadOnlySequence(payload), copyBody); + + byte[] frameBytes = segment.GetSequence().ToArray(); Assert.Equal(8, RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize); Assert.Equal(Constants.FrameBody, frameBytes[0]); From 5e68abb23af9e6ea4b2feea8737f633bf3120ec8 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 12:27:04 +0100 Subject: [PATCH 02/11] Merge sequential memories --- .../client/ChunckedSequence.cs | 76 ---------------- .../client/RentedOutgoingMemory.cs | 29 +++--- .../client/impl/EmptyBasicProperty.cs | 2 +- projects/RabbitMQ.Client/client/impl/Frame.cs | 14 +-- .../RabbitMQ.Client/util/SequenceBuilder.cs | 90 +++++++++++++++++++ projects/Test/Unit/TestFrameFormatting.cs | 15 +++- projects/Test/Unit/TestSequenceBuilder.cs | 89 ++++++++++++++++++ 7 files changed, 217 insertions(+), 98 deletions(-) delete mode 100644 projects/RabbitMQ.Client/client/ChunckedSequence.cs create mode 100644 projects/RabbitMQ.Client/util/SequenceBuilder.cs create mode 100644 projects/Test/Unit/TestSequenceBuilder.cs diff --git a/projects/RabbitMQ.Client/client/ChunckedSequence.cs b/projects/RabbitMQ.Client/client/ChunckedSequence.cs deleted file mode 100644 index e85aa04551..0000000000 --- a/projects/RabbitMQ.Client/client/ChunckedSequence.cs +++ /dev/null @@ -1,76 +0,0 @@ -using System; -using System.Buffers; - -namespace RabbitMQ.Client; - -internal ref struct ChunkedSequence -{ - private ReadOnlyChunk _first; - private ReadOnlyChunk _current; - - private bool _changed; - private ReadOnlySequence? _sequence; - - public ChunkedSequence() - { - _first = _current = null; - _sequence = null; - _changed = false; - } - - public void Append(ReadOnlySequence sequence) - { - SequencePosition pos = sequence.Start; - while (sequence.TryGet(ref pos, out ReadOnlyMemory mem)) - { - Append(mem); - } - } - - public void Append(ReadOnlyMemory memory) - { - if (_current == null) - { - _first = _current = new ReadOnlyChunk(memory); - } - else - { - _current = _current.Append(memory); - } - - _changed = true; - } - - internal ReadOnlySequence GetSequence() - { - if (_changed) - { - _sequence = new ReadOnlySequence(_first, 0, _current, _current.Memory.Length); - } - else - { - _sequence ??= new ReadOnlySequence(); - } - - return _sequence.Value; - } - - private sealed class ReadOnlyChunk : ReadOnlySequenceSegment - { - public ReadOnlyChunk(ReadOnlyMemory memory) - { - Memory = memory; - } - - public ReadOnlyChunk Append(ReadOnlyMemory memory) - { - ReadOnlyChunk nextChunk = new(memory) - { - RunningIndex = RunningIndex + Memory.Length - }; - - Next = nextChunk; - return nextChunk; - } - } -} diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index 2bad94a5f3..9910ddf327 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -13,13 +13,13 @@ internal class RentedOutgoingMemory : IDisposable, IResettable private static readonly ObjectPool s_pool = ObjectPool.Create(); private bool _disposedValue; + private byte[]? _rentedArray; private TaskCompletionSource? _sendCompletionSource; - internal ReadOnlySequence Data; - internal byte[]? RentedArray; - internal int Size => (int) Data.Length; + internal ReadOnlySequence Data { get; private set; } + /// /// Mark the data as sent. /// @@ -28,6 +28,7 @@ public void DidSend() if (_sendCompletionSource is null) { Dispose(); + s_pool.Return(this); } else { @@ -47,6 +48,7 @@ async ValueTask WaitForFinishCore() { await _sendCompletionSource.Task.ConfigureAwait(false); Dispose(); + s_pool.Return(this); } } @@ -65,9 +67,14 @@ private void Dispose(bool disposing) return; } - if (disposing && RentedArray != null) + if (disposing) { - ClientArrayPool.Return(RentedArray); + if (_rentedArray != null) + { + ClientArrayPool.Return(_rentedArray); + Data = default; + _rentedArray = null; + } } _disposedValue = true; @@ -87,18 +94,18 @@ bool IResettable.TryReset() } _disposedValue = false; - RentedArray = default; - _sendCompletionSource = default; + _rentedArray = default; Data = default; + _sendCompletionSource = default; return true; } - public static RentedOutgoingMemory Create(ReadOnlySequence mem, byte[] buffer, bool waitSend = false) + public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence mem, byte[] buffer, bool waitSend = false) { var rented = s_pool.Get(); rented.Data = mem; - rented.RentedArray = buffer; + rented._rentedArray = buffer; if (waitSend) { @@ -108,9 +115,9 @@ public static RentedOutgoingMemory Create(ReadOnlySequence mem, byte[] buf return rented; } - public static RentedOutgoingMemory Create(ReadOnlyMemory mem, byte[] buffer, bool waitSend = false) + public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory mem, byte[] buffer, bool waitSend = false) { - return Create(new ReadOnlySequence(mem), buffer, waitSend); + return GetAndInitialize(new ReadOnlySequence(mem), buffer, waitSend); } } } diff --git a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs index 7868556dce..5547d413a1 100644 --- a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs +++ b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.client.impl #nullable enable internal readonly struct EmptyBasicProperty : IReadOnlyBasicProperties, IAmqpHeader { - internal static readonly EmptyBasicProperty Empty; + internal static readonly EmptyBasicProperty Empty = default; ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic; diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index a01f6d87ac..6a01f7ad48 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -113,7 +113,7 @@ internal static class BodySegment public const int FrameSize = BaseFrameSize; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int WriteTo(ref ChunkedSequence data, Memory buffer, ushort channel, ReadOnlySequence body, bool copyBody) + public static int WriteTo(ref SequenceBuilder data, Memory buffer, ushort channel, ReadOnlySequence body, bool copyBody) { const int StartBodyArgument = StartPayload; NetworkOrderSerializer.WriteUInt64(ref buffer.Span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); @@ -161,7 +161,7 @@ public static RentedOutgoingMemory GetHeartbeatFrame() byte[] buffer = ClientArrayPool.Rent(FrameSize); Payload.CopyTo(buffer); var mem = new ReadOnlyMemory(buffer, 0, FrameSize); - return RentedOutgoingMemory.Create(mem, buffer); + return RentedOutgoingMemory.GetAndInitialize(mem, buffer); } } @@ -177,7 +177,7 @@ public static RentedOutgoingMemory SerializeToFrames(ref T method, ushort cha System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); var mem = new ReadOnlyMemory(array, 0, size); - return RentedOutgoingMemory.Create(mem, array); + return RentedOutgoingMemory.GetAndInitialize(mem, array); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -198,20 +198,20 @@ public static RentedOutgoingMemory SerializeToFrames(ref TMeth // Will be returned by SocketFrameWriter.WriteLoop byte[] array = ClientArrayPool.Rent(size); - ChunkedSequence sequence = new ChunkedSequence(); + SequenceBuilder sequenceBuilder = new SequenceBuilder(); Memory buffer = array.AsMemory(); int offset = Method.WriteTo(array, channelNumber, ref method); offset += Header.WriteTo(array.AsSpan(offset), channelNumber, ref header, remainingBodyBytes); - sequence.Append(buffer.Slice(0, offset)); + sequenceBuilder.Append(buffer.Slice(0, offset)); buffer = buffer.Slice(offset); ReadOnlySequence remainingBody = body; while (remainingBodyBytes > 0) { int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes; - int segmentSize = BodySegment.WriteTo(ref sequence, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody); + int segmentSize = BodySegment.WriteTo(ref sequenceBuilder, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody); buffer = buffer.Slice(segmentSize); offset += segmentSize; @@ -219,7 +219,7 @@ public static RentedOutgoingMemory SerializeToFrames(ref TMeth } System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); - return RentedOutgoingMemory.Create(sequence.GetSequence(), array, waitSend: !copyBody); + return RentedOutgoingMemory.GetAndInitialize(sequenceBuilder.Build(), array, waitSend: !copyBody); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/projects/RabbitMQ.Client/util/SequenceBuilder.cs b/projects/RabbitMQ.Client/util/SequenceBuilder.cs new file mode 100644 index 0000000000..6d32607d6a --- /dev/null +++ b/projects/RabbitMQ.Client/util/SequenceBuilder.cs @@ -0,0 +1,90 @@ +#nullable enable +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Util; + +internal ref struct SequenceBuilder +{ + private Segment? _first; + private Segment? _current; + + public SequenceBuilder() + { + _first = _current = null; + } + + public void Append(ReadOnlySequence sequence) + { + SequencePosition pos = sequence.Start; + while (sequence.TryGet(ref pos, out ReadOnlyMemory mem)) + { + Append(mem); + } + } + + public void Append(ReadOnlyMemory memory) + { + if (_current == null) + { + _first = _current = new Segment(memory); + } + else if (!_current.TryMerge(memory)) + { + _current = _current.Append(memory); + } + } + + public ReadOnlySequence Build() + { + if (_first == null || _current == null) + { + return default; + } + + return new ReadOnlySequence(_first, 0, _current, _current.Memory.Length); + } + + private sealed class Segment : ReadOnlySequenceSegment + { + public Segment(ReadOnlyMemory memory) + { + Memory = memory; + } + + /// + /// Try to merge the next memory into this chunk. + /// + /// + /// Used in that can write the same array when the body is being copied. + /// + /// The next memory. + /// true if the memory was merged; otherwise false. + public bool TryMerge(ReadOnlyMemory next) + { + if (MemoryMarshal.TryGetArray(Memory, out ArraySegment segment) && + MemoryMarshal.TryGetArray(next, out ArraySegment nextSegment) && + segment.Array == nextSegment.Array && + nextSegment.Offset == segment.Offset + segment.Count) + { + Memory = segment.Array.AsMemory(segment.Offset, segment.Count + nextSegment.Count); + return true; + } + + return false; + } + + public Segment Append(ReadOnlyMemory memory) + { + Segment nextChunk = new(memory) + { + RunningIndex = RunningIndex + Memory.Length + }; + + Next = nextChunk; + return nextChunk; + } + } +} diff --git a/projects/Test/Unit/TestFrameFormatting.cs b/projects/Test/Unit/TestFrameFormatting.cs index 43cd189dd8..268f52596c 100644 --- a/projects/Test/Unit/TestFrameFormatting.cs +++ b/projects/Test/Unit/TestFrameFormatting.cs @@ -34,6 +34,7 @@ using RabbitMQ.Client; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Impl; +using RabbitMQ.Util; using Xunit; namespace Test.Unit @@ -147,11 +148,19 @@ public void BodySegmentFrame(bool copyBody) byte[] payload = new byte[4]; byte[] buffer = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + (copyBody ? payload.Length : 0)]; - ChunkedSequence segment = new ChunkedSequence(); + SequenceBuilder builder = new SequenceBuilder(); - RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref segment, buffer, Channel, new ReadOnlySequence(payload), copyBody); + RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref builder, buffer, Channel, new ReadOnlySequence(payload), copyBody); - byte[] frameBytes = segment.GetSequence().ToArray(); + var sequence = builder.Build(); + + if (copyBody) + { + // When copying the body, the memory is sequential + Assert.True(sequence.IsSingleSegment); + } + + byte[] frameBytes = sequence.ToArray(); Assert.Equal(8, RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize); Assert.Equal(Constants.FrameBody, frameBytes[0]); diff --git a/projects/Test/Unit/TestSequenceBuilder.cs b/projects/Test/Unit/TestSequenceBuilder.cs new file mode 100644 index 0000000000..8bd8b7d8dd --- /dev/null +++ b/projects/Test/Unit/TestSequenceBuilder.cs @@ -0,0 +1,89 @@ +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using RabbitMQ.Util; +using Xunit; + +namespace Test.Unit; + +public class TestSequenceBuilder +{ + [Fact] + public void TestSingleMemory() + { + // Arrange + byte[] array = new byte[] { 1, 2, 3, 4, 5, 6 }; + + // Act + SequenceBuilder builder = new SequenceBuilder(); + builder.Append(array); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.True(sequence.IsSingleSegment); + Assert.True(MemoryMarshal.TryGetArray(sequence.First, out var segment)); + Assert.Equal(array, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(6, segment.Count); + } + + [Fact] + public void TestMerge() + { + // Arrange + byte[] array = new byte[] { 1, 2, 3, 4, 5, 6 }; + Memory first = array.AsMemory(0, 3); + Memory second = array.AsMemory(3, 3); + + // Act + SequenceBuilder builder = new SequenceBuilder(); + + builder.Append(first); + builder.Append(second); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.True(sequence.IsSingleSegment); + Assert.True(MemoryMarshal.TryGetArray(sequence.First, out var segment)); + Assert.Equal(array, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(6, segment.Count); + } + + [Fact] + public void TestMultipleMemory() + { + // Arrange + byte[] first = new byte[] { 1, 2, 3 }; + byte[] second = new byte[] { 4, 5, 6 }; + + // Act + SequenceBuilder builder = new SequenceBuilder(); + + builder.Append(first); + builder.Append(second); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.Equal(new byte[] { 1, 2, 3, 4, 5, 6 }, sequence.ToArray()); + + var enumerator = sequence.GetEnumerator(); + Assert.True(enumerator.MoveNext()); + Assert.True(MemoryMarshal.TryGetArray(enumerator.Current, out var segment)); + Assert.Equal(first, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(3, segment.Count); + + Assert.True(enumerator.MoveNext()); + Assert.True(MemoryMarshal.TryGetArray(enumerator.Current, out segment)); + Assert.Equal(second, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(3, segment.Count); + } +} From ae7ca17f3d81c24473a0dfdf192d8381390ed7eb Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 12:42:09 +0100 Subject: [PATCH 03/11] Add configuration to set the copy threshold --- .../client/api/ConnectionConfig.cs | 13 ++++++++++++- .../client/api/ConnectionFactory.cs | 18 +++++++++++++++++- .../RabbitMQ.Client/client/api/IChannel.cs | 8 ++++---- .../client/impl/AutorecoveringChannel.cs | 8 ++++---- .../RabbitMQ.Client/client/impl/ChannelBase.cs | 10 +++++----- .../RabbitMQ.Client/client/impl/Connection.cs | 2 ++ .../RabbitMQ.Client/client/impl/ISession.cs | 4 ++-- .../RabbitMQ.Client/client/impl/SessionBase.cs | 8 +++++--- 8 files changed, 51 insertions(+), 20 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs index 3aba1f9ab2..451dd73209 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs @@ -143,6 +143,16 @@ public sealed class ConnectionConfig /// public readonly int DispatchConsumerConcurrency; + /// + /// The threshold for when to copy the body to a temporary array. + /// + /// + /// When the body is larger than this threshold it will reuse the same buffer. Because of this + /// the buffer cannot be modified by the application. This causes + /// the socket () to block until the frame is sent. + /// + public readonly int CopyBodyToMemoryThreshold; + internal readonly Func> FrameHandlerFactoryAsync; internal ConnectionConfig(string virtualHost, string userName, string password, @@ -153,7 +163,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler, TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout, bool dispatchConsumersAsync, int dispatchConsumerConcurrency, - Func> frameHandlerFactoryAsync) + Func> frameHandlerFactoryAsync, int copyBodyToMemoryThreshold) { VirtualHost = virtualHost; UserName = userName; @@ -176,6 +186,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, DispatchConsumersAsync = dispatchConsumersAsync; DispatchConsumerConcurrency = dispatchConsumerConcurrency; FrameHandlerFactoryAsync = frameHandlerFactoryAsync; + CopyBodyToMemoryThreshold = copyBodyToMemoryThreshold; } } } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 7a50ae9e85..32afead05a 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -138,6 +138,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// public const string DefaultVHost = "/"; + /// + /// Default value for the copy body to memory threshold. + /// + public const int DefaultCopyBodyToMemoryThreshold = int.MaxValue; + /// /// TLS versions enabled by default: TLSv1.2, v1.1, v1.0. /// @@ -361,6 +366,16 @@ public AmqpTcpEndpoint Endpoint /// public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize; + /// + /// The threshold for when to copy the body to a temporary array. + /// + /// + /// When the body is larger than this threshold it will reuse the same buffer. Because of this + /// the buffer cannot be modified by the application. This causes + /// the socket () to block until the frame is sent. + /// + public int CopyBodyToMemoryThreshold { get; set; } = DefaultCopyBodyToMemoryThreshold; + /// /// The uri to use for the connection. /// @@ -771,7 +786,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName) RequestedConnectionTimeout, DispatchConsumersAsync, ConsumerDispatchConcurrency, - CreateFrameHandlerAsync); + CreateFrameHandlerAsync, + CopyBodyToMemoryThreshold); } internal async Task CreateFrameHandlerAsync( diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 20e074c764..e65874c27f 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -259,7 +259,7 @@ void BasicPublish(CachedString exchange, CachedString routingKey, i /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool copyBody = true) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -270,7 +270,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool copyBody = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -281,7 +281,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool copyBody = true) + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -292,7 +292,7 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool copyBody = true) + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; #nullable disable diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 71ccb8d9d2..5f9baba2a1 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -328,19 +328,19 @@ public void BasicPublish(CachedString exchange, CachedString routin where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 7e93311978..6e665227be 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -508,7 +508,7 @@ protected void ChannelSend(in TMethod method, in THeader heade } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlySequence body, bool copyBody) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlySequence body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -1259,7 +1259,7 @@ public void BasicPublish(CachedString exchange, CachedString routin ChannelSend(in cmd, in basicProperties, body); } - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1274,13 +1274,13 @@ public ValueTask BasicPublishAsync(string exchange, string routingK return ModelSendAsync(in cmd, in basicProperties, body, copyBody); } - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1295,7 +1295,7 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr return ModelSendAsync(in cmd, in basicProperties, body, copyBody); } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool copyBody = true) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index e1eae85dd5..df70cc5e2d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) public IDictionary? ServerProperties { get; private set; } + public int CopyBodyToMemoryThreshold => _config.CopyBodyToMemoryThreshold; + public IEnumerable ShutdownReport => _shutdownReport; private ShutdownReportEntry[] _shutdownReport = Array.Empty(); diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 0ad03f0d6a..ca8076b439 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -90,11 +90,11 @@ void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemor ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingAmqpMethod; - ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool copyBody = true) + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; - ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool copyBody = true) + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 35cf143427..6df61337e1 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -171,7 +171,7 @@ public void Transmit(in TMethod cmd, in THeader header, ReadOn Transmit(cmd, header, new ReadOnlySequence(body)); } - public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool copyBody = true) + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -180,10 +180,12 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody)); + copyBody ??= body.Length >= Connection.CopyBodyToMemoryThreshold; + + return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value)); } - public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool copyBody = true) + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { From 8a59fc6037a1248520a80b1194834c30e065ba08 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 14:22:06 +0100 Subject: [PATCH 04/11] Added tests for non-copying body --- .../client/RentedOutgoingMemory.cs | 6 +- .../client/api/ConnectionFactory.cs | 7 +- .../client/api/IChannelExtensions.cs | 10 +-- .../RabbitMQ.Client/client/api/IConnection.cs | 5 ++ .../client/impl/AutorecoveringConnection.cs | 4 +- .../RabbitMQ.Client/client/impl/Connection.cs | 17 +++++ .../client/impl/SessionBase.cs | 2 +- .../AsyncIntegration/TestBasicPublishAsync.cs | 71 +++++++++++++++++++ .../TestBasicPublishCopyBodyAsync.cs | 66 +++++++++++++++++ .../Test/Common/IntegrationFixtureBase.cs | 38 +++++++++- projects/Test/Common/TrackRentedByteResult.cs | 24 +++++++ .../Test/Unit/TestRentedOutgoingMemory.cs | 51 +++++++++++++ 12 files changed, 290 insertions(+), 11 deletions(-) create mode 100644 projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs create mode 100644 projects/Test/Common/TrackRentedByteResult.cs create mode 100644 projects/Test/Unit/TestRentedOutgoingMemory.cs diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index 9910ddf327..1f57c73e23 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -18,6 +18,8 @@ internal class RentedOutgoingMemory : IDisposable, IResettable internal int Size => (int) Data.Length; + public int RentedArraySize => _rentedArray?.Length ?? 0; + internal ReadOnlySequence Data { get; private set; } /// @@ -100,7 +102,7 @@ bool IResettable.TryReset() return true; } - public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence mem, byte[] buffer, bool waitSend = false) + public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence mem, byte[]? buffer = null, bool waitSend = false) { var rented = s_pool.Get(); @@ -115,7 +117,7 @@ public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence mem, return rented; } - public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory mem, byte[] buffer, bool waitSend = false) + public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory mem, byte[]? buffer = null, bool waitSend = false) { return GetAndInitialize(new ReadOnlySequence(mem), buffer, waitSend); } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 32afead05a..ced8372ca6 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -763,7 +763,12 @@ public async ValueTask CreateConnectionAsync(IEndpointResolver endp } } - private ConnectionConfig CreateConfig(string clientProvidedName) + internal ConnectionConfig CreateConfig() + { + return CreateConfig(ClientProvidedName); + } + + internal ConnectionConfig CreateConfig(string clientProvidedName) { return new ConnectionConfig( VirtualHost, diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 74edd17fb2..f7ea8dcd80 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -140,14 +140,14 @@ public static ValueTask BasicPublishAsync(this IChannel channel, PublicationA public static void BasicPublish(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) => channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); - public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) + => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody); - public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) + public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) => channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); - public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) + => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody); #nullable disable /// diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 11653032a1..0df5edacb5 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -125,6 +125,11 @@ public interface IConnection : INetworkConnection, IDisposable /// IEnumerable ShutdownReport { get; } + /// + /// The threshold for when to copy the body to a temporary array. + /// + int CopyBodyToMemoryThreshold { get; } + /// /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 56962d8edc..9bc9830591 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -50,7 +50,7 @@ internal sealed partial class AutorecoveringConnection : IConnection private Connection _innerConnection; private bool _disposed; - private Connection InnerConnection + internal Connection InnerConnection { get { @@ -181,6 +181,8 @@ public event EventHandler RecoveringConsumer public IEnumerable ShutdownReport => InnerConnection.ShutdownReport; + public int CopyBodyToMemoryThreshold => InnerConnection.CopyBodyToMemoryThreshold; + public IProtocol Protocol => Endpoint.Protocol; public RecoveryAwareChannel CreateNonRecoveringChannel() diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index df70cc5e2d..a60a28a87c 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -58,6 +58,9 @@ internal sealed partial class Connection : IConnection private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + internal bool TrackRentedBytes = false; + internal uint RentedBytes; + internal Connection(ConnectionConfig config, IFrameHandler frameHandler) { _config = config; @@ -552,9 +555,23 @@ internal void Write(RentedOutgoingMemory frames) internal ValueTask WriteAsync(RentedOutgoingMemory frames) { + TrackRented(frames.RentedArraySize); + return _frameHandler.WriteAsync(frames); } + private void TrackRented(int size) + { + if (TrackRentedBytes && size > 0) + { +#if NET + Interlocked.Add(ref RentedBytes, (uint)size); +#else + Interlocked.Add(ref Unsafe.As(ref RentedBytes), size); +#endif + } + } + public void Dispose() { if (_disposed) diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 6df61337e1..610334256f 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -180,7 +180,7 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - copyBody ??= body.Length >= Connection.CopyBodyToMemoryThreshold; + copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold; return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value)); } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs index e6a526a130..f649f7dbf9 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs @@ -67,5 +67,76 @@ public async Task TestQueuePurgeAsync() Assert.True(await publishSyncSource.Task); Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q)); } + + [Fact] + public async Task TestNonCopyingBody() + { + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytes()) + { + await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: false); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); + } + + [Fact] + public async Task TestCopyingBody() + { + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytes()) + { + await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } + + [Fact] + public async Task TestDefaultCopyingBody() + { + Assert.Equal(int.MaxValue, _conn.CopyBodyToMemoryThreshold); + + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytes()) + { + await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } } } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs new file mode 100644 index 0000000000..ef3184ced6 --- /dev/null +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -0,0 +1,66 @@ +using System.Threading.Tasks; +using RabbitMQ.Client; +using Xunit; +using Xunit.Abstractions; + +namespace Test.AsyncIntegration; + +public class TestBasicPublishCopyBodyAsync : AsyncIntegrationFixture +{ + public TestBasicPublishCopyBodyAsync(ITestOutputHelper output) : base(output) + { + } + + protected override ConnectionFactory CreateConnectionFactory() + { + var factory = base.CreateConnectionFactory(); + factory.CopyBodyToMemoryThreshold = 1024; + return factory; + } + + [Theory(Skip = "Parallelization is disabled for this collection")] + [InlineData(512)] + [InlineData(1024)] + public async Task TestNonCopyingBody(ushort size) + { + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytes()) + { + await _channel.BasicPublishAsync(string.Empty, q, body); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); + } + + [Theory] + [InlineData(1025)] + [InlineData(2048)] + public async Task TestCopyingBody(ushort size) + { + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytes()) + { + await _channel.BasicPublishAsync(string.Empty, q, body); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } +} diff --git a/projects/Test/Common/IntegrationFixtureBase.cs b/projects/Test/Common/IntegrationFixtureBase.cs index b177dd6e15..cbbb8ad65f 100644 --- a/projects/Test/Common/IntegrationFixtureBase.cs +++ b/projects/Test/Common/IntegrationFixtureBase.cs @@ -38,6 +38,7 @@ using System.Reflection; using System.Text; using System.Threading; +using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; @@ -48,6 +49,8 @@ namespace Test { public abstract class IntegrationFixtureBase : IDisposable { + private readonly SemaphoreSlim _byteTrackingLock = new SemaphoreSlim(1, 1); + private static bool s_isRunningInCI = false; private static bool s_isWindows = false; private static bool s_isVerbose = false; @@ -371,7 +374,7 @@ protected void Wait(ManualResetEventSlim latch, TimeSpan timeSpan, string desc) $"waiting {timeSpan.TotalSeconds} seconds on a latch for '{desc}' timed out"); } - protected ConnectionFactory CreateConnectionFactory() + protected virtual ConnectionFactory CreateConnectionFactory() { string now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); return new ConnectionFactory @@ -418,6 +421,39 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action a(args); } + protected async Task TrackRentedBytes() + { + Connection connection; + + if (_conn is AutorecoveringConnection autorecoveringConnection) + { + connection = autorecoveringConnection.InnerConnection as Connection; + } + else + { + connection = _conn as Connection; + } + + if (connection is null) + { + throw new InvalidOperationException("Cannot track rented bytes without a connection"); + } + + await _byteTrackingLock.WaitAsync(); + + try + { + connection.RentedBytes = 0; + connection.TrackRentedBytes = true; + return new TrackRentedByteResult(connection, _byteTrackingLock); + } + catch + { + _byteTrackingLock.Release(); + throw; + } + } + private static void InitIsRunningInCI() { bool ci; diff --git a/projects/Test/Common/TrackRentedByteResult.cs b/projects/Test/Common/TrackRentedByteResult.cs new file mode 100644 index 0000000000..48b9a0e50f --- /dev/null +++ b/projects/Test/Common/TrackRentedByteResult.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading; +using RabbitMQ.Client.Framing.Impl; + +namespace Test; + +public sealed class TrackRentedByteResult : IDisposable +{ + private readonly Connection _connection; + private readonly SemaphoreSlim _byteTrackingLock; + + internal TrackRentedByteResult(Connection connection, SemaphoreSlim byteTrackingLock) + { + _connection = connection; + _byteTrackingLock = byteTrackingLock; + } + + public uint RentedBytes => _connection.RentedBytes; + + public void Dispose() + { + _byteTrackingLock.Release(); + } +} diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs new file mode 100644 index 0000000000..e0dcebb9b2 --- /dev/null +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -0,0 +1,51 @@ +using RabbitMQ.Client; +using Xunit; + +namespace Test.Unit; + +public class TestRentedOutgoingMemory +{ + [Fact] + public void TestNonBlocking() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: false); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync(); + + // Assert + Assert.True(waitTask.IsCompleted); + } + + [Fact] + public void TestBlocking() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: true); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync(); + + // Assert + Assert.False(waitTask.IsCompleted); + } + + [Fact] + public void TestBlockingCompleted() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: true); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync(); + + rentedMemory.DidSend(); + + // Assert + Assert.False(waitTask.IsCompleted); + } +} From f59728cc0b158c5e59ed99e74ed0c32aedaefdeb Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 16:59:46 +0100 Subject: [PATCH 05/11] Fix incorrect test --- projects/Test/AsyncIntegration/TestBasicPublishAsync.cs | 8 ++++---- .../AsyncIntegration/TestBasicPublishCopyBodyAsync.cs | 4 ++-- projects/Test/Common/IntegrationFixtureBase.cs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs index f649f7dbf9..fc89f5ccc7 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs @@ -78,7 +78,7 @@ public async Task TestNonCopyingBody() uint rentedBytes; - using (var result = await TrackRentedBytes()) + using (var result = await TrackRentedBytesAsync()) { await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: false); rentedBytes = result.RentedBytes; @@ -101,7 +101,7 @@ public async Task TestCopyingBody() uint rentedBytes; - using (var result = await TrackRentedBytes()) + using (var result = await TrackRentedBytesAsync()) { await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true); rentedBytes = result.RentedBytes; @@ -126,9 +126,9 @@ public async Task TestDefaultCopyingBody() uint rentedBytes; - using (var result = await TrackRentedBytes()) + using (var result = await TrackRentedBytesAsync()) { - await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true); + await _channel.BasicPublishAsync(string.Empty, q, body); rentedBytes = result.RentedBytes; } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs index ef3184ced6..da8e125512 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -28,7 +28,7 @@ public async Task TestNonCopyingBody(ushort size) uint rentedBytes; - using (var result = await TrackRentedBytes()) + using (var result = await TrackRentedBytesAsync()) { await _channel.BasicPublishAsync(string.Empty, q, body); rentedBytes = result.RentedBytes; @@ -51,7 +51,7 @@ public async Task TestCopyingBody(ushort size) uint rentedBytes; - using (var result = await TrackRentedBytes()) + using (var result = await TrackRentedBytesAsync()) { await _channel.BasicPublishAsync(string.Empty, q, body); rentedBytes = result.RentedBytes; diff --git a/projects/Test/Common/IntegrationFixtureBase.cs b/projects/Test/Common/IntegrationFixtureBase.cs index cbbb8ad65f..1777e424f6 100644 --- a/projects/Test/Common/IntegrationFixtureBase.cs +++ b/projects/Test/Common/IntegrationFixtureBase.cs @@ -421,7 +421,7 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action a(args); } - protected async Task TrackRentedBytes() + protected async Task TrackRentedBytesAsync() { Connection connection; From 1f0a4bece21ab038bdddc0c77e0debe2f4da873c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 15 Dec 2023 08:26:37 -0800 Subject: [PATCH 06/11] Fix formatting --- projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs | 2 +- projects/RabbitMQ.Client/client/impl/Frame.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index 1f57c73e23..e8adc74bb5 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -16,7 +16,7 @@ internal class RentedOutgoingMemory : IDisposable, IResettable private byte[]? _rentedArray; private TaskCompletionSource? _sendCompletionSource; - internal int Size => (int) Data.Length; + internal int Size => (int)Data.Length; public int RentedArraySize => _rentedArray?.Length ?? 0; diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index 6a01f7ad48..a88e063b00 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -185,7 +185,7 @@ public static RentedOutgoingMemory SerializeToFrames(ref TMeth where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { - int remainingBodyBytes = (int) body.Length; + int remainingBodyBytes = (int)body.Length; int size = Method.FrameSize + Header.FrameSize + method.GetRequiredBufferSize() + header.GetRequiredBufferSize() + BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes); From 51d6c667a2e2406e1933dcbf66a7c07e903a7f0e Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 17:56:15 +0100 Subject: [PATCH 07/11] Re-enable test This test was disabled before I've implemented TrackRentedBytesAsync correctly. This should pass now. --- projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs index da8e125512..d2ba144007 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -18,7 +18,7 @@ protected override ConnectionFactory CreateConnectionFactory() return factory; } - [Theory(Skip = "Parallelization is disabled for this collection")] + [Theory] [InlineData(512)] [InlineData(1024)] public async Task TestNonCopyingBody(ushort size) From 75b8424555bb971435b502e8ec1f144e5143e341 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 19:40:39 +0100 Subject: [PATCH 08/11] Remove ObjectPool This complicates the tests. After returning the rented object back to the pool, the data is resetted. --- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 - .../client/RentedOutgoingMemory.cs | 78 ++++++++----------- projects/RabbitMQ.Client/client/impl/Frame.cs | 6 +- .../Test/Unit/TestRentedOutgoingMemory.cs | 34 ++++---- 4 files changed, 58 insertions(+), 61 deletions(-) diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index ec70403ee9..88fb81ef82 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -67,7 +67,6 @@ - diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index e8adc74bb5..396514302e 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -4,23 +4,48 @@ using System.Buffers; using System.IO.Pipelines; using System.Threading.Tasks; -using Microsoft.Extensions.ObjectPool; namespace RabbitMQ.Client { - internal class RentedOutgoingMemory : IDisposable, IResettable + internal class RentedOutgoingMemory : IDisposable { - private static readonly ObjectPool s_pool = ObjectPool.Create(); - private bool _disposedValue; private byte[]? _rentedArray; private TaskCompletionSource? _sendCompletionSource; + private ReadOnlySequence _data; + + public RentedOutgoingMemory(ReadOnlyMemory data, byte[]? rentedArray = null, bool waitSend = false) + : this(new ReadOnlySequence(data), rentedArray, waitSend) + { + } + + public RentedOutgoingMemory(ReadOnlySequence data, byte[]? rentedArray = null, bool waitSend = false) + { + _data = data; + _rentedArray = rentedArray; + + if (waitSend) + { + _sendCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } internal int Size => (int)Data.Length; public int RentedArraySize => _rentedArray?.Length ?? 0; - internal ReadOnlySequence Data { get; private set; } + internal ReadOnlySequence Data + { + get + { + if (_disposedValue) + { + throw new ObjectDisposedException(nameof(RentedOutgoingMemory)); + } + + return _data; + } + } /// /// Mark the data as sent. @@ -30,7 +55,6 @@ public void DidSend() if (_sendCompletionSource is null) { Dispose(); - s_pool.Return(this); } else { @@ -50,7 +74,6 @@ async ValueTask WaitForFinishCore() { await _sendCompletionSource.Task.ConfigureAwait(false); Dispose(); - s_pool.Return(this); } } @@ -69,17 +92,18 @@ private void Dispose(bool disposing) return; } + _disposedValue = true; + if (disposing) { + _data = default; + if (_rentedArray != null) { ClientArrayPool.Return(_rentedArray); - Data = default; _rentedArray = null; } } - - _disposedValue = true; } public void Dispose() @@ -87,39 +111,5 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } - - bool IResettable.TryReset() - { - if (!_disposedValue) - { - return false; - } - - _disposedValue = false; - _rentedArray = default; - Data = default; - _sendCompletionSource = default; - return true; - } - - public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence mem, byte[]? buffer = null, bool waitSend = false) - { - var rented = s_pool.Get(); - - rented.Data = mem; - rented._rentedArray = buffer; - - if (waitSend) - { - rented._sendCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - - return rented; - } - - public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory mem, byte[]? buffer = null, bool waitSend = false) - { - return GetAndInitialize(new ReadOnlySequence(mem), buffer, waitSend); - } } } diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index a88e063b00..98cd735b87 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -161,7 +161,7 @@ public static RentedOutgoingMemory GetHeartbeatFrame() byte[] buffer = ClientArrayPool.Rent(FrameSize); Payload.CopyTo(buffer); var mem = new ReadOnlyMemory(buffer, 0, FrameSize); - return RentedOutgoingMemory.GetAndInitialize(mem, buffer); + return new RentedOutgoingMemory(mem, buffer); } } @@ -177,7 +177,7 @@ public static RentedOutgoingMemory SerializeToFrames(ref T method, ushort cha System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); var mem = new ReadOnlyMemory(array, 0, size); - return RentedOutgoingMemory.GetAndInitialize(mem, array); + return new RentedOutgoingMemory(mem, array); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -219,7 +219,7 @@ public static RentedOutgoingMemory SerializeToFrames(ref TMeth } System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); - return RentedOutgoingMemory.GetAndInitialize(sequenceBuilder.Build(), array, waitSend: !copyBody); + return new RentedOutgoingMemory(sequenceBuilder.Build(), array, waitSend: !copyBody); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs index e0dcebb9b2..3ee2b46ac4 100644 --- a/projects/Test/Unit/TestRentedOutgoingMemory.cs +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -1,4 +1,5 @@ -using RabbitMQ.Client; +using System.Threading.Tasks; +using RabbitMQ.Client; using Xunit; namespace Test.Unit; @@ -6,46 +7,53 @@ namespace Test.Unit; public class TestRentedOutgoingMemory { [Fact] - public void TestNonBlocking() + public async Task TestNonBlocking() { // Arrange byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; - RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: false); + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: false); // Act - var waitTask = rentedMemory.WaitForDataSendAsync(); + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); + var completedTask = await Task.WhenAny(timeoutTask, waitTask); // Assert - Assert.True(waitTask.IsCompleted); + Assert.Equal(waitTask, completedTask); } [Fact] - public void TestBlocking() + public async Task TestBlocking() { // Arrange byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; - RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: true); + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: true); // Act - var waitTask = rentedMemory.WaitForDataSendAsync(); + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); + var completedTask = await Task.WhenAny(timeoutTask, waitTask); // Assert - Assert.False(waitTask.IsCompleted); + Assert.Equal(timeoutTask, completedTask); } [Fact] - public void TestBlockingCompleted() + public async Task TestBlockingCompleted() { // Arrange byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; - RentedOutgoingMemory rentedMemory = RentedOutgoingMemory.GetAndInitialize(buffer, waitSend: true); + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: true); // Act - var waitTask = rentedMemory.WaitForDataSendAsync(); + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); rentedMemory.DidSend(); + var completedTask = await Task.WhenAny(timeoutTask, waitTask); + // Assert - Assert.False(waitTask.IsCompleted); + Assert.Equal(waitTask, completedTask); } } From 70e93763d3fb9f93452caacf3699e40372df5599 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 20:06:18 +0100 Subject: [PATCH 09/11] Improved disposing and fixed tests --- .../client/RentedOutgoingMemory.cs | 30 +++++++------- .../client/impl/SessionBase.cs | 2 +- .../client/impl/SocketFrameHandler.cs | 12 +++++- .../TestBasicPublishCopyBodyAsync.cs | 12 +++--- ...ncurrentAccessWithSharedConnectionAsync.cs | 41 ++++++++----------- .../Test/Unit/TestRentedOutgoingMemory.cs | 7 +++- 6 files changed, 55 insertions(+), 49 deletions(-) diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index 396514302e..6469806ddb 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -2,16 +2,18 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO.Pipelines; using System.Threading.Tasks; +using RabbitMQ.Client.Impl; namespace RabbitMQ.Client { - internal class RentedOutgoingMemory : IDisposable + internal sealed class RentedOutgoingMemory : IDisposable { + private readonly TaskCompletionSource? _sendCompletionSource; private bool _disposedValue; private byte[]? _rentedArray; - private TaskCompletionSource? _sendCompletionSource; private ReadOnlySequence _data; public RentedOutgoingMemory(ReadOnlyMemory data, byte[]? rentedArray = null, bool waitSend = false) @@ -50,30 +52,30 @@ internal ReadOnlySequence Data /// /// Mark the data as sent. /// - public void DidSend() + /// true if the object can be disposed, false if the is waiting for the data to be sent. + public bool DidSend() { if (_sendCompletionSource is null) { - Dispose(); - } - else - { - _sendCompletionSource.SetResult(true); + return true; } + + _sendCompletionSource.SetResult(true); + return false; } /// /// Wait for the data to be sent. /// - /// A that completes when the data is sent. - public ValueTask WaitForDataSendAsync() + /// true if the data was sent and the object can be disposed. + public ValueTask WaitForDataSendAsync() { - return _sendCompletionSource is null ? default : WaitForFinishCore(); + return _sendCompletionSource is null ? new ValueTask(false) : WaitForFinishCore(); - async ValueTask WaitForFinishCore() + async ValueTask WaitForFinishCore() { await _sendCompletionSource.Task.ConfigureAwait(false); - Dispose(); + return true; } } @@ -92,6 +94,7 @@ private void Dispose(bool disposing) return; } + Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing."); _disposedValue = true; if (disposing) @@ -109,7 +112,6 @@ private void Dispose(bool disposing) public void Dispose() { Dispose(disposing: true); - GC.SuppressFinalize(this); } } } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 610334256f..fe078ff338 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -180,7 +180,7 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold; + copyBody ??= body.Length <= Connection.CopyBodyToMemoryThreshold; return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value)); } diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index a2a9f42e36..b319533390 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -308,8 +308,13 @@ public async ValueTask WriteAsync(RentedOutgoingMemory frames) await _channelWriter.WriteAsync(frames) .ConfigureAwait(false); - await frames.WaitForDataSendAsync() + bool didSend = await frames.WaitForDataSendAsync() .ConfigureAwait(false); + + if (didSend) + { + frames.Dispose(); + } } } @@ -346,7 +351,10 @@ await _pipeWriter.FlushAsync() } finally { - frames.DidSend(); + if (frames.DidSend()) + { + frames.Dispose(); + } } } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs index d2ba144007..65e8e39000 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -36,9 +36,9 @@ public async Task TestNonCopyingBody(ushort size) Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); - // It is expected that the rented bytes is smaller than the size of the body - // since we're not copying the body. Only the frame headers are rented. - Assert.True(rentedBytes < size); + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); } [Theory] @@ -59,8 +59,8 @@ public async Task TestCopyingBody(ushort size) Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); - // It is expected that the rented bytes is larger than the size of the body - // since the body is copied with the frame headers. - Assert.True(rentedBytes >= size); + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); } } diff --git a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs index 9626bf4fa8..4e431ecbdc 100644 --- a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -55,37 +55,28 @@ public override async Task InitializeAsync() _conn.ConnectionShutdown += HandleConnectionShutdown; } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync() + [Theory] + [InlineData(false)] + [InlineData(true)] + public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync(bool copyBody) { - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), 30); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), copyBody, 30); } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize64Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize256Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize1024Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024); - } - - private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30) + [Theory] + [InlineData(64, false)] + [InlineData(64, true)] + [InlineData(256, false)] + [InlineData(256, true)] + [InlineData(1024, false)] + [InlineData(1024, true)] + public Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, bool copyBody, int iterations = 30) { byte[] body = GetRandomBody(length); - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, copyBody, iterations); } - private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations) + private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, bool copyBody, int iterations) { return TestConcurrentChannelOperationsAsync(async (conn) => { @@ -128,7 +119,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); for (ushort j = 0; j < _messageCount; j++) { - await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true); + await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true, copyBody: copyBody); } Assert.True(await tcs.Task); diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs index 3ee2b46ac4..b45adf217b 100644 --- a/projects/Test/Unit/TestRentedOutgoingMemory.cs +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -17,9 +17,12 @@ public async Task TestNonBlocking() var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); var timeoutTask = Task.Delay(100); var completedTask = await Task.WhenAny(timeoutTask, waitTask); + bool didSend = rentedMemory.DidSend(); // Assert Assert.Equal(waitTask, completedTask); + Assert.False(waitTask.Result); + Assert.True(didSend); } [Fact] @@ -49,11 +52,13 @@ public async Task TestBlockingCompleted() var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); var timeoutTask = Task.Delay(100); - rentedMemory.DidSend(); + bool didSend = rentedMemory.DidSend(); var completedTask = await Task.WhenAny(timeoutTask, waitTask); // Assert Assert.Equal(waitTask, completedTask); + Assert.True(waitTask.Result); + Assert.False(didSend); } } From 1f0b20ed5c49427e078a821fb44c274f6a9d7be9 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 20:10:29 +0100 Subject: [PATCH 10/11] Fix blocking warning --- projects/Test/Unit/TestRentedOutgoingMemory.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs index b45adf217b..955d43d50c 100644 --- a/projects/Test/Unit/TestRentedOutgoingMemory.cs +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -21,7 +21,7 @@ public async Task TestNonBlocking() // Assert Assert.Equal(waitTask, completedTask); - Assert.False(waitTask.Result); + Assert.False(!waitTask.IsCompleted || await waitTask); Assert.True(didSend); } @@ -39,6 +39,7 @@ public async Task TestBlocking() // Assert Assert.Equal(timeoutTask, completedTask); + Assert.False(waitTask.IsCompleted); } [Fact] @@ -58,7 +59,7 @@ public async Task TestBlockingCompleted() // Assert Assert.Equal(waitTask, completedTask); - Assert.True(waitTask.Result); + Assert.True(waitTask.IsCompleted && await waitTask); Assert.False(didSend); } } From 7df74eb5fd917a9a8987f13d7ff422218e15ce55 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 22 Dec 2023 06:03:53 -0800 Subject: [PATCH 11/11] Update PublicAPI.Unshipped --- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 239e593250..947b4e9d5d 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -2,6 +2,7 @@ abstract RabbitMQ.Client.Exceptions.ProtocolException.ReplyCode.get -> ushort const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort +const RabbitMQ.Client.ConnectionFactory.DefaultCopyBodyToMemoryThreshold = 2147483647 -> int const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string @@ -212,6 +213,8 @@ RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void +RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.get -> int +RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.set -> void RabbitMQ.Client.ConnectionFactory.CreateConnection() -> RabbitMQ.Client.IConnection RabbitMQ.Client.ConnectionFactory.CreateConnection(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) -> RabbitMQ.Client.IConnection RabbitMQ.Client.ConnectionFactory.CreateConnection(string clientProvidedName) -> RabbitMQ.Client.IConnection @@ -507,8 +510,10 @@ RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool r RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler RabbitMQ.Client.IChannel.BasicPublish(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void RabbitMQ.Client.IChannel.BasicPublish(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence body = default(System.Buffers.ReadOnlySequence), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence body = default(System.Buffers.ReadOnlySequence), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicQos(uint prefetchSize, ushort prefetchCount, bool global) -> void RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicReject(ulong deliveryTag, bool requeue) -> void @@ -583,6 +588,7 @@ RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler System.EventHandler RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler +RabbitMQ.Client.IConnection.CopyBodyToMemoryThreshold.get -> int RabbitMQ.Client.IConnection.CreateChannel() -> RabbitMQ.Client.IChannel RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint @@ -880,6 +886,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan +readonly RabbitMQ.Client.ConnectionConfig.CopyBodyToMemoryThreshold -> int readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumersAsync -> bool readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan @@ -953,11 +960,11 @@ static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void +static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> void static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> void -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel) -> void static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> void