diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 6af99e6f9..a5b9889fb 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -892,7 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort -RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort +RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 5bb86ccb4..360d3c9c7 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -92,11 +92,6 @@ namespace RabbitMQ.Client ///hosts with an empty name are not addressable. public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory { - /// - /// Default value for consumer dispatch concurrency. - /// - public const ushort DefaultConsumerDispatchConcurrency = 1; - /// /// Default value for the desired maximum channel number. Default: 2047. /// @@ -180,7 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. - public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency; + public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency; /// The host to connect to. public string HostName { get; set; } = "localhost"; diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index aaad772df..e28bec3c7 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// - /// Defaults to . + /// Defaults to null, which will use the value from /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. /// /// Cancellation token - Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default); + Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs index ead665a03..33b9bc64c 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs @@ -7,12 +7,6 @@ namespace RabbitMQ.Client { public static class IConnectionExtensions { - /// - /// Asynchronously create and return a fresh channel, session, and channel. - /// - public static Task CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) => - connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken); - /// /// Asynchronously close this connection and all its channels. /// diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 67aca8f69..c31147eee 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl { internal class Channel : ChannelBase { - public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) : base(config, session, consumerDispatchConcurrency) { } diff --git a/projects/RabbitMQ.Client/client/framing/Constants.cs b/projects/RabbitMQ.Client/client/framing/Constants.cs index b1f78236e..637d53a78 100644 --- a/projects/RabbitMQ.Client/client/framing/Constants.cs +++ b/projects/RabbitMQ.Client/client/framing/Constants.cs @@ -83,5 +83,13 @@ public static class Constants public const int NotImplemented = 540; ///(= 541) public const int InternalError = 541; + + /// + /// The default consumer dispatch concurrency. See + /// to set this value for every channel created on a connection, + /// and + /// for setting this value for a particular channel. + /// + public const ushort DefaultConsumerDispatchConcurrency = 1; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 1d398b9e8..e3918e4c5 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -240,12 +240,14 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) + public async Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + CancellationToken cancellationToken = default) { EnsureIsOpen(); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken) + ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken) .ConfigureAwait(false); - AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency); + AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc); await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return channel; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 28d606534..97925a7b0 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -73,10 +73,12 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + protected ChannelBase(ConnectionConfig config, ISession session, + ushort? perChannelConsumerDispatchConcurrency = null) { ContinuationTimeout = config.ContinuationTimeout; - ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency); + ConsumerDispatcher = new AsyncConsumerDispatcher(this, + perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency)); Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 09369bb07..8e8065b8d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); - _channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ; + _channel0 = new Channel(_config, _session0); ClientProperties = new Dictionary(_config.ClientProperties) { @@ -253,7 +253,8 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) + public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + CancellationToken cancellationToken = default) { EnsureIsOpen(); ISession session = CreateSession(); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 0afe13151..4ffc3ea9f 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -14,15 +14,17 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, protected readonly ChannelReader _reader; private readonly ChannelWriter _writer; private readonly Task _worker; + private readonly ushort _concurrency; private bool _quiesce = false; private bool _disposed; internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) { _channel = channel; + _concurrency = concurrency; var workChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { - SingleReader = concurrency == 1, + SingleReader = _concurrency == 1, SingleWriter = false, AllowSynchronousContinuations = false }); @@ -30,14 +32,14 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) _writer = workChannel.Writer; Func loopStart = ProcessChannelAsync; - if (concurrency == 1) + if (_concurrency == 1) { _worker = Task.Run(loopStart); } else { - var tasks = new Task[concurrency]; - for (int i = 0; i < concurrency; i++) + var tasks = new Task[_concurrency]; + for (int i = 0; i < _concurrency; i++) { tasks[i] = Task.Run(loopStart); } @@ -45,13 +47,9 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) } } - public bool IsShutdown - { - get - { - return _quiesce; - } - } + public bool IsShutdown => _quiesce; + + public ushort Concurrency => _concurrency; public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) { diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index 686036b9c..4b5dd679a 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable bool IsShutdown { get; } + ushort Concurrency { get; } + IAsyncBasicConsumer GetAndRemoveConsumer(string tag); ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken); diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 3d4eff039..ea447bdb5 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime protected readonly ITestOutputHelper _output; protected readonly string _testDisplayName; - protected readonly ushort _consumerDispatchConcurrency = 1; + protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency; protected readonly bool _openChannel = true; public static readonly TimeSpan ShortSpan; @@ -109,7 +109,7 @@ static IntegrationFixture() } public IntegrationFixture(ITestOutputHelper output, - ushort consumerDispatchConcurrency = 1, + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, bool openChannel = true) { _consumerDispatchConcurrency = consumerDispatchConcurrency; @@ -143,8 +143,7 @@ public virtual async Task InitializeAsync() */ if (_connFactory == null) { - _connFactory = CreateConnectionFactory(); - _connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency; + _connFactory = CreateConnectionFactory(_consumerDispatchConcurrency); } if (_conn == null) @@ -517,13 +516,15 @@ protected static async Task WaitAsync(TaskCompletionSource tcs, TimeSpan t } } - protected ConnectionFactory CreateConnectionFactory() + protected ConnectionFactory CreateConnectionFactory( + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) { return new ConnectionFactory { ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}", ContinuationTimeout = WaitSpan, HandshakeContinuationTimeout = WaitSpan, + ConsumerDispatchConcurrency = consumerDispatchConcurrency }; } diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index d07a21b27..2edda15c7 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -36,6 +36,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -43,16 +44,20 @@ namespace Test.Integration { public class TestAsyncConsumer : IntegrationFixture { + private const ushort ConsumerDispatchConcurrency = 2; + private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); public TestAsyncConsumer(ITestOutputHelper output) - : base(output, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency) { } [Fact] public async Task TestBasicRoundtripConcurrent() { + await ValidateConsumerDispatchConcurrency(); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); @@ -146,6 +151,8 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + await ValidateConsumerDispatchConcurrency(); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); @@ -323,6 +330,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() [Fact] public async Task TestBasicRejectAsync() { + await ValidateConsumerDispatchConcurrency(); + string queueName = GenerateQueueName(); var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -421,6 +430,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicAckAsync() { + await ValidateConsumerDispatchConcurrency(); + string queueName = GenerateQueueName(); const int messageCount = 1024; @@ -488,6 +499,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicNackAsync() { + await ValidateConsumerDispatchConcurrency(); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => @@ -561,6 +574,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { + await ValidateConsumerDispatchConcurrency(); + AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); var tasks = new List(); for (int i = 0; i < 256; i++) @@ -581,6 +596,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() [Fact] public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() { + await ValidateConsumerDispatchConcurrency(); + string exchangeName = GenerateExchangeName(); string queue1Name = GenerateQueueName(); string queue2Name = GenerateQueueName(); @@ -650,6 +667,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name, [Fact] public async Task TestCloseWithinEventHandler_GH1567() { + await ValidateConsumerDispatchConcurrency(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(); @@ -679,6 +698,20 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, Assert.True(await tcs.Task); } + private async Task ValidateConsumerDispatchConcurrency() + { + ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10); + AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; + Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + using (IChannel ch = await _conn.CreateChannelAsync( + consumerDispatchConcurrency: expectedConsumerDispatchConcurrency)) + { + AutorecoveringChannel ach = (AutorecoveringChannel)ch; + Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); + } + } + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) { foreach (TaskCompletionSource tcs in tcsAry) diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 762ca6e49..b9285901a 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -34,6 +34,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -41,6 +42,8 @@ namespace Test.Integration { public class TestAsyncEventingBasicConsumer : IntegrationFixture { + private const ushort ConsumerDispatchConcurrency = 2; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan); private readonly CancellationTokenRegistration _ctr; private readonly TaskCompletionSource _onCallbackExceptionTcs = @@ -49,7 +52,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); public TestAsyncEventingBasicConsumer(ITestOutputHelper output) - : base(output, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency) { _ctr = _cts.Token.Register(OnTokenCanceled); } @@ -81,6 +84,10 @@ private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event [Fact] public async Task TestAsyncEventingBasicConsumer_GH1038() { + AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; + Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + string exchangeName = GenerateExchangeName(); string queueName = GenerateQueueName(); string routingKey = string.Empty; diff --git a/projects/Test/Integration/TestConcurrentAccessBase.cs.cs b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs index ea83d6b5e..21d862e82 100644 --- a/projects/Test/Integration/TestConcurrentAccessBase.cs.cs +++ b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs @@ -42,7 +42,7 @@ public class TestConcurrentAccessBase : IntegrationFixture protected const ushort _messageCount = 200; public TestConcurrentAccessBase(ITestOutputHelper output, - ushort consumerDispatchConcurrency = 1, + ushort consumerDispatchConcurrency = RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency, bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel) { }