Skip to content

Commit

Permalink
Merge pull request #1671 from rabbitmq/rabbitmq-dotnet-client-1669-fo…
Browse files Browse the repository at this point in the history
…llowup

Follow-up to #1669 - per-channel dispatch concurrency
  • Loading branch information
lukebakken authored Sep 12, 2024
2 parents 624cf2e + 601f501 commit 735bbca
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 44 deletions.
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
7 changes: 1 addition & 6 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ namespace RabbitMQ.Client
///hosts with an empty name are not addressable. </para></remarks>
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
{
/// <summary>
/// Default value for consumer dispatch concurrency.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// Default value for the desired maximum channel number. Default: 2047.
/// </summary>
Expand Down Expand Up @@ -180,7 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default);
}
}
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ namespace RabbitMQ.Client
{
public static class IConnectionExtensions
{
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);

/// <summary>
/// Asynchronously close this connection and all its channels.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
{
}
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,13 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
Expand Down
6 changes: 4 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
protected ChannelBase(ConnectionConfig config, ISession session,
ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
Action<Exception, string> onException = (exception, context) =>
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
_channel0 = new Channel(_config, _session0);

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -253,7 +253,8 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,42 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
protected readonly ChannelReader<WorkStruct> _reader;
private readonly ChannelWriter<WorkStruct> _writer;
private readonly Task _worker;
private readonly ushort _concurrency;
private bool _quiesce = false;
private bool _disposed;

internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
{
_channel = channel;
_concurrency = concurrency;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
{
SingleReader = concurrency == 1,
SingleReader = _concurrency == 1,
SingleWriter = false,
AllowSynchronousContinuations = false
});
_reader = workChannel.Reader;
_writer = workChannel.Writer;

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
if (_concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
var tasks = new Task[_concurrency];
for (int i = 0; i < _concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
}

public bool IsShutdown
{
get
{
return _quiesce;
}
}
public bool IsShutdown => _quiesce;

public ushort Concurrency => _concurrency;

public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable

bool IsShutdown { get; }

ushort Concurrency { get; }

IAsyncBasicConsumer GetAndRemoveConsumer(string tag);

ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);
Expand Down
11 changes: 6 additions & 5 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected readonly ITestOutputHelper _output;
protected readonly string _testDisplayName;

protected readonly ushort _consumerDispatchConcurrency = 1;
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
protected readonly bool _openChannel = true;

public static readonly TimeSpan ShortSpan;
Expand Down Expand Up @@ -109,7 +109,7 @@ static IntegrationFixture()
}

public IntegrationFixture(ITestOutputHelper output,
ushort consumerDispatchConcurrency = 1,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
bool openChannel = true)
{
_consumerDispatchConcurrency = consumerDispatchConcurrency;
Expand Down Expand Up @@ -143,8 +143,7 @@ public virtual async Task InitializeAsync()
*/
if (_connFactory == null)
{
_connFactory = CreateConnectionFactory();
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
_connFactory = CreateConnectionFactory(_consumerDispatchConcurrency);
}

if (_conn == null)
Expand Down Expand Up @@ -517,13 +516,15 @@ protected static async Task WaitAsync(TaskCompletionSource<bool> tcs, TimeSpan t
}
}

protected ConnectionFactory CreateConnectionFactory()
protected ConnectionFactory CreateConnectionFactory(
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
return new ConnectionFactory
{
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
ContinuationTimeout = WaitSpan,
HandshakeContinuationTimeout = WaitSpan,
ConsumerDispatchConcurrency = consumerDispatchConcurrency
};
}

Expand Down
35 changes: 34 additions & 1 deletion projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,28 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestAsyncConsumer : IntegrationFixture
{
private const ushort ConsumerDispatchConcurrency = 2;

private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown");

public TestAsyncConsumer(ITestOutputHelper output)
: base(output, consumerDispatchConcurrency: 2)
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
{
}

[Fact]
public async Task TestBasicRoundtripConcurrent()
{
await ValidateConsumerDispatchConcurrency();

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

Expand Down Expand Up @@ -146,6 +151,8 @@ public async Task TestBasicRoundtripConcurrent()
[Fact]
public async Task TestBasicRoundtripConcurrentManyMessages()
{
await ValidateConsumerDispatchConcurrency();

AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

Expand Down Expand Up @@ -323,6 +330,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
[Fact]
public async Task TestBasicRejectAsync()
{
await ValidateConsumerDispatchConcurrency();

string queueName = GenerateQueueName();

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -421,6 +430,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicAckAsync()
{
await ValidateConsumerDispatchConcurrency();

string queueName = GenerateQueueName();

const int messageCount = 1024;
Expand Down Expand Up @@ -488,6 +499,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicNackAsync()
{
await ValidateConsumerDispatchConcurrency();

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.ConnectionShutdown += (o, ea) =>
Expand Down Expand Up @@ -561,6 +574,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
{
await ValidateConsumerDispatchConcurrency();

AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
var tasks = new List<Task>();
for (int i = 0; i < 256; i++)
Expand All @@ -581,6 +596,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
[Fact]
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
{
await ValidateConsumerDispatchConcurrency();

string exchangeName = GenerateExchangeName();
string queue1Name = GenerateQueueName();
string queue2Name = GenerateQueueName();
Expand Down Expand Up @@ -650,6 +667,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
[Fact]
public async Task TestCloseWithinEventHandler_GH1567()
{
await ValidateConsumerDispatchConcurrency();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
Expand Down Expand Up @@ -679,6 +698,20 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
Assert.True(await tcs.Task);
}

private async Task ValidateConsumerDispatchConcurrency()
{
ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10);
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
using (IChannel ch = await _conn.CreateChannelAsync(
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency))
{
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}
}

private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
{
foreach (TaskCompletionSource<bool> tcs in tcsAry)
Expand Down
9 changes: 8 additions & 1 deletion projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestAsyncEventingBasicConsumer : IntegrationFixture
{
private const ushort ConsumerDispatchConcurrency = 2;

private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan);
private readonly CancellationTokenRegistration _ctr;
private readonly TaskCompletionSource<bool> _onCallbackExceptionTcs =
Expand All @@ -49,7 +52,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

public TestAsyncEventingBasicConsumer(ITestOutputHelper output)
: base(output, consumerDispatchConcurrency: 2)
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
{
_ctr = _cts.Token.Register(OnTokenCanceled);
}
Expand Down Expand Up @@ -81,6 +84,10 @@ private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event
[Fact]
public async Task TestAsyncEventingBasicConsumer_GH1038()
{
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);

string exchangeName = GenerateExchangeName();
string queueName = GenerateQueueName();
string routingKey = string.Empty;
Expand Down
Loading

0 comments on commit 735bbca

Please sign in to comment.