Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage by using the body directly instead of copying in BasicPublishAsync #1445

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
10 changes: 5 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MethodFramingBasicAck
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
internal RentedOutgoingMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
}

[Config(typeof(Config))]
Expand All @@ -41,13 +41,13 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
}

[Config(typeof(Config))]
Expand All @@ -60,6 +60,6 @@ public class MethodFramingChannelClose
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
internal RentedOutgoingMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
}
}
17 changes: 12 additions & 5 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ abstract RabbitMQ.Client.Exceptions.ProtocolException.ReplyCode.get -> ushort
const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int
const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int
const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort
const RabbitMQ.Client.ConnectionFactory.DefaultCopyBodyToMemoryThreshold = 2147483647 -> int
const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint
const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint
const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string
Expand Down Expand Up @@ -212,6 +213,8 @@ RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.get -> int
RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.set -> void
RabbitMQ.Client.ConnectionFactory.CreateConnection() -> RabbitMQ.Client.IConnection
RabbitMQ.Client.ConnectionFactory.CreateConnection(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) -> RabbitMQ.Client.IConnection
RabbitMQ.Client.ConnectionFactory.CreateConnection(string clientProvidedName) -> RabbitMQ.Client.IConnection
Expand Down Expand Up @@ -507,8 +510,10 @@ RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool r
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublish<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
RabbitMQ.Client.IChannel.BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence<byte> body = default(System.Buffers.ReadOnlySequence<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence<byte> body = default(System.Buffers.ReadOnlySequence<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicQos(uint prefetchSize, ushort prefetchCount, bool global) -> void
RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReject(ulong deliveryTag, bool requeue) -> void
Expand Down Expand Up @@ -583,6 +588,7 @@ RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<Rabbi
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.CopyBodyToMemoryThreshold.get -> int
RabbitMQ.Client.IConnection.CreateChannel() -> RabbitMQ.Client.IChannel
RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.IChannel>
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
Expand Down Expand Up @@ -880,6 +886,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.CopyBodyToMemoryThreshold -> int
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumersAsync -> bool
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
Expand Down Expand Up @@ -953,11 +960,11 @@ static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel) -> void
static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> void
Expand Down
117 changes: 117 additions & 0 deletions projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#nullable enable

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
internal sealed class RentedOutgoingMemory : IDisposable
{
private readonly TaskCompletionSource<bool>? _sendCompletionSource;
private bool _disposedValue;
private byte[]? _rentedArray;
private ReadOnlySequence<byte> _data;

public RentedOutgoingMemory(ReadOnlyMemory<byte> data, byte[]? rentedArray = null, bool waitSend = false)
: this(new ReadOnlySequence<byte>(data), rentedArray, waitSend)
{
}

public RentedOutgoingMemory(ReadOnlySequence<byte> data, byte[]? rentedArray = null, bool waitSend = false)
{
_data = data;
_rentedArray = rentedArray;

if (waitSend)
{
_sendCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}

internal int Size => (int)Data.Length;

public int RentedArraySize => _rentedArray?.Length ?? 0;

internal ReadOnlySequence<byte> Data
{
get
{
if (_disposedValue)
{
throw new ObjectDisposedException(nameof(RentedOutgoingMemory));
}

return _data;
}
}

/// <summary>
/// Mark the data as sent.
/// </summary>
/// <returns><c>true</c> if the object can be disposed, <c>false</c> if the <see cref="SocketFrameHandler"/> is waiting for the data to be sent.</returns>
public bool DidSend()
{
if (_sendCompletionSource is null)
{
return true;
}

_sendCompletionSource.SetResult(true);
return false;
}

/// <summary>
/// Wait for the data to be sent.
/// </summary>
/// <returns><c>true</c> if the data was sent and the object can be disposed.</returns>
public ValueTask<bool> WaitForDataSendAsync()
{
return _sendCompletionSource is null ? new ValueTask<bool>(false) : WaitForFinishCore();

async ValueTask<bool> WaitForFinishCore()
{
await _sendCompletionSource.Task.ConfigureAwait(false);
return true;
}
}

public void WriteTo(PipeWriter pipeWriter)
{
foreach (ReadOnlyMemory<byte> memory in Data)
{
pipeWriter.Write(memory.Span);
}
}

private void Dispose(bool disposing)
{
if (_disposedValue)
{
return;
}

Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing.");
_disposedValue = true;

if (disposing)
{
_data = default;

if (_rentedArray != null)
{
ClientArrayPool.Return(_rentedArray);
_rentedArray = null;
}
}
}

public void Dispose()
{
Dispose(disposing: true);
}
}
}
13 changes: 12 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public sealed class ConnectionConfig
/// </summary>
public readonly int DispatchConsumerConcurrency;

/// <summary>
/// The threshold for when to copy the body to a temporary array.
/// </summary>
/// <remarks>
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
/// the buffer cannot be modified by the application. This causes
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
/// </remarks>
public readonly int CopyBodyToMemoryThreshold;

internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;

internal ConnectionConfig(string virtualHost, string userName, string password,
Expand All @@ -153,7 +163,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync, int copyBodyToMemoryThreshold)
{
VirtualHost = virtualHost;
UserName = userName;
Expand All @@ -176,6 +186,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
DispatchConsumersAsync = dispatchConsumersAsync;
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
CopyBodyToMemoryThreshold = copyBodyToMemoryThreshold;
}
}
}
25 changes: 23 additions & 2 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public const string DefaultVHost = "/";

/// <summary>
/// Default value for the copy body to memory threshold.
/// </summary>
public const int DefaultCopyBodyToMemoryThreshold = int.MaxValue;

/// <summary>
/// TLS versions enabled by default: TLSv1.2, v1.1, v1.0.
/// </summary>
Expand Down Expand Up @@ -361,6 +366,16 @@ public AmqpTcpEndpoint Endpoint
/// </summary>
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;

/// <summary>
/// The threshold for when to copy the body to a temporary array.
/// </summary>
/// <remarks>
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
/// the buffer cannot be modified by the application. This causes
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
/// </remarks>
public int CopyBodyToMemoryThreshold { get; set; } = DefaultCopyBodyToMemoryThreshold;

/// <summary>
/// The uri to use for the connection.
/// </summary>
Expand Down Expand Up @@ -748,7 +763,12 @@ public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endp
}
}

private ConnectionConfig CreateConfig(string clientProvidedName)
internal ConnectionConfig CreateConfig()
{
return CreateConfig(ClientProvidedName);
}

internal ConnectionConfig CreateConfig(string clientProvidedName)
{
return new ConnectionConfig(
VirtualHost,
Expand All @@ -771,7 +791,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
RequestedConnectionTimeout,
DispatchConsumersAsync,
ConsumerDispatchConcurrency,
CreateFrameHandlerAsync);
CreateFrameHandlerAsync,
CopyBodyToMemoryThreshold);
}

internal async Task<IFrameHandler> CreateFrameHandlerAsync(
Expand Down
Loading