From 7770fd863a46f4a8b0b6d3febc5d001c882f9d7a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 11 Sep 2024 14:38:09 -0700 Subject: [PATCH 1/2] Follow-up to #1669 - per-channel dispatch concurrency PR #1669 by @danielmarbach adds the ability to configure consumer dispatch on a per-channel basis. * Test that consumer dispatch concurrency is set on the dispatcher. --- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 5 +- .../client/api/ConnectionFactory.cs | 7 +-- .../RabbitMQ.Client/client/api/IConnection.cs | 5 +- .../client/api/IConnectionExtensions.cs | 6 --- .../RabbitMQ.Client/client/framing/Channel.cs | 3 +- .../client/framing/Constants.cs | 8 ++++ .../client/impl/AutorecoveringConnection.cs | 9 ++-- .../client/impl/ChannelBase.cs | 12 ++++- .../RabbitMQ.Client/client/impl/Connection.cs | 5 +- .../ConsumerDispatcherChannelBase.cs | 20 ++++---- .../IConsumerDispatcher.cs | 2 + projects/RabbitMQ.Client/util/Misc.cs | 48 +++++++++++++++++++ projects/Test/Common/IntegrationFixture.cs | 11 +++-- .../Test/Integration/TestAsyncConsumer.cs | 35 +++++++++++++- .../TestAsyncEventingBasicConsumer.cs | 9 +++- .../TestConcurrentAccessBase.cs.cs | 2 +- 16 files changed, 143 insertions(+), 44 deletions(-) create mode 100644 projects/RabbitMQ.Client/util/Misc.cs diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 6af99e6f94..571643ea3f 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -892,7 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort -RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort +RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 5bb86ccb4c..360d3c9c73 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -92,11 +92,6 @@ namespace RabbitMQ.Client ///hosts with an empty name are not addressable. public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory { - /// - /// Default value for consumer dispatch concurrency. - /// - public const ushort DefaultConsumerDispatchConcurrency = 1; - /// /// Default value for the desired maximum channel number. Default: 2047. /// @@ -180,7 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. - public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency; + public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency; /// The host to connect to. public string HostName { get; set; } = "localhost"; diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index aaad772dfa..44af08a6cc 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// - /// Defaults to . + /// Defaults to . /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. /// /// Cancellation token - Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default); + Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs index ead665a03b..33b9bc64c4 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs @@ -7,12 +7,6 @@ namespace RabbitMQ.Client { public static class IConnectionExtensions { - /// - /// Asynchronously create and return a fresh channel, session, and channel. - /// - public static Task CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) => - connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken); - /// /// Asynchronously close this connection and all its channels. /// diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 67aca8f697..e3a103d448 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl { internal class Channel : ChannelBase { - public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + public Channel(ConnectionConfig config, ISession session, + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) : base(config, session, consumerDispatchConcurrency) { } diff --git a/projects/RabbitMQ.Client/client/framing/Constants.cs b/projects/RabbitMQ.Client/client/framing/Constants.cs index b1f78236ee..4df40693d4 100644 --- a/projects/RabbitMQ.Client/client/framing/Constants.cs +++ b/projects/RabbitMQ.Client/client/framing/Constants.cs @@ -83,5 +83,13 @@ public static class Constants public const int NotImplemented = 540; ///(= 541) public const int InternalError = 541; + + /// + /// The default consumer dispatch concurrency. See + /// to set this value for every channel created on a connection, + /// and + /// for setting this value for a particular channel. + /// + public const ushort DefaultConsumerDispatchConcurrency = 1; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 1d398b9e8a..740641942b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -37,6 +37,7 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Impl; +using RabbitMQ.Util; namespace RabbitMQ.Client.Framing.Impl { @@ -240,12 +241,14 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) + public async Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + CancellationToken cancellationToken = default) { EnsureIsOpen(); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken) + ushort cdc = Misc.DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency); + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken) .ConfigureAwait(false); - AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency); + AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc); await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return channel; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 28d6065345..c563d21313 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -45,6 +45,7 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; +using RabbitMQ.Util; namespace RabbitMQ.Client.Impl { @@ -73,10 +74,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + protected ChannelBase(ConnectionConfig config, ISession session, + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) { ContinuationTimeout = config.ContinuationTimeout; - ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency); + ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency); Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); @@ -92,6 +94,12 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort consumer Session = session; } + private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency) + { + ushort cdc = Misc.DetermineConsumerDispatchConcurrency(config, perChannelConsumerDispatchConcurrency); + return new AsyncConsumerDispatcher(this, cdc); + } + internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan ContinuationTimeout { get; set; } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 09369bb076..d276a749be 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); - _channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ; + _channel0 = new Channel(_config, _session0); ClientProperties = new Dictionary(_config.ClientProperties) { @@ -253,7 +253,8 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default) + public Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + CancellationToken cancellationToken = default) { EnsureIsOpen(); ISession session = CreateSession(); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 0afe131513..4ffc3ea9f9 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -14,15 +14,17 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, protected readonly ChannelReader _reader; private readonly ChannelWriter _writer; private readonly Task _worker; + private readonly ushort _concurrency; private bool _quiesce = false; private bool _disposed; internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) { _channel = channel; + _concurrency = concurrency; var workChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { - SingleReader = concurrency == 1, + SingleReader = _concurrency == 1, SingleWriter = false, AllowSynchronousContinuations = false }); @@ -30,14 +32,14 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) _writer = workChannel.Writer; Func loopStart = ProcessChannelAsync; - if (concurrency == 1) + if (_concurrency == 1) { _worker = Task.Run(loopStart); } else { - var tasks = new Task[concurrency]; - for (int i = 0; i < concurrency; i++) + var tasks = new Task[_concurrency]; + for (int i = 0; i < _concurrency; i++) { tasks[i] = Task.Run(loopStart); } @@ -45,13 +47,9 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency) } } - public bool IsShutdown - { - get - { - return _quiesce; - } - } + public bool IsShutdown => _quiesce; + + public ushort Concurrency => _concurrency; public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) { diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index 686036b9c1..4b5dd679a8 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable bool IsShutdown { get; } + ushort Concurrency { get; } + IAsyncBasicConsumer GetAndRemoveConsumer(string tag); ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken); diff --git a/projects/RabbitMQ.Client/util/Misc.cs b/projects/RabbitMQ.Client/util/Misc.cs new file mode 100644 index 0000000000..fa6832bc5a --- /dev/null +++ b/projects/RabbitMQ.Client/util/Misc.cs @@ -0,0 +1,48 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using RabbitMQ.Client; + +namespace RabbitMQ.Util +{ + internal static class Misc + { + internal static ushort DetermineConsumerDispatchConcurrency(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency) + { + ushort cdc = config.ConsumerDispatchConcurrency; + if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency) + { + cdc = perChannelConsumerDispatchConcurrency; + } + return cdc; + } + } +} diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 3d4eff039c..ea447bdb5f 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime protected readonly ITestOutputHelper _output; protected readonly string _testDisplayName; - protected readonly ushort _consumerDispatchConcurrency = 1; + protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency; protected readonly bool _openChannel = true; public static readonly TimeSpan ShortSpan; @@ -109,7 +109,7 @@ static IntegrationFixture() } public IntegrationFixture(ITestOutputHelper output, - ushort consumerDispatchConcurrency = 1, + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, bool openChannel = true) { _consumerDispatchConcurrency = consumerDispatchConcurrency; @@ -143,8 +143,7 @@ public virtual async Task InitializeAsync() */ if (_connFactory == null) { - _connFactory = CreateConnectionFactory(); - _connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency; + _connFactory = CreateConnectionFactory(_consumerDispatchConcurrency); } if (_conn == null) @@ -517,13 +516,15 @@ protected static async Task WaitAsync(TaskCompletionSource tcs, TimeSpan t } } - protected ConnectionFactory CreateConnectionFactory() + protected ConnectionFactory CreateConnectionFactory( + ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) { return new ConnectionFactory { ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}", ContinuationTimeout = WaitSpan, HandshakeContinuationTimeout = WaitSpan, + ConsumerDispatchConcurrency = consumerDispatchConcurrency }; } diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index d07a21b27e..2edda15c7a 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -36,6 +36,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -43,16 +44,20 @@ namespace Test.Integration { public class TestAsyncConsumer : IntegrationFixture { + private const ushort ConsumerDispatchConcurrency = 2; + private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); public TestAsyncConsumer(ITestOutputHelper output) - : base(output, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency) { } [Fact] public async Task TestBasicRoundtripConcurrent() { + await ValidateConsumerDispatchConcurrency(); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); @@ -146,6 +151,8 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + await ValidateConsumerDispatchConcurrency(); + AddCallbackExceptionHandlers(); _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); @@ -323,6 +330,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() [Fact] public async Task TestBasicRejectAsync() { + await ValidateConsumerDispatchConcurrency(); + string queueName = GenerateQueueName(); var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -421,6 +430,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicAckAsync() { + await ValidateConsumerDispatchConcurrency(); + string queueName = GenerateQueueName(); const int messageCount = 1024; @@ -488,6 +499,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicNackAsync() { + await ValidateConsumerDispatchConcurrency(); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionShutdown += (o, ea) => @@ -561,6 +574,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { + await ValidateConsumerDispatchConcurrency(); + AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); var tasks = new List(); for (int i = 0; i < 256; i++) @@ -581,6 +596,8 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() [Fact] public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() { + await ValidateConsumerDispatchConcurrency(); + string exchangeName = GenerateExchangeName(); string queue1Name = GenerateQueueName(); string queue2Name = GenerateQueueName(); @@ -650,6 +667,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name, [Fact] public async Task TestCloseWithinEventHandler_GH1567() { + await ValidateConsumerDispatchConcurrency(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(); @@ -679,6 +698,20 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, Assert.True(await tcs.Task); } + private async Task ValidateConsumerDispatchConcurrency() + { + ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10); + AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; + Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + using (IChannel ch = await _conn.CreateChannelAsync( + consumerDispatchConcurrency: expectedConsumerDispatchConcurrency)) + { + AutorecoveringChannel ach = (AutorecoveringChannel)ch; + Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); + } + } + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) { foreach (TaskCompletionSource tcs in tcsAry) diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 762ca6e49e..b9285901a9 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -34,6 +34,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -41,6 +42,8 @@ namespace Test.Integration { public class TestAsyncEventingBasicConsumer : IntegrationFixture { + private const ushort ConsumerDispatchConcurrency = 2; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan); private readonly CancellationTokenRegistration _ctr; private readonly TaskCompletionSource _onCallbackExceptionTcs = @@ -49,7 +52,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); public TestAsyncEventingBasicConsumer(ITestOutputHelper output) - : base(output, consumerDispatchConcurrency: 2) + : base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency) { _ctr = _cts.Token.Register(OnTokenCanceled); } @@ -81,6 +84,10 @@ private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event [Fact] public async Task TestAsyncEventingBasicConsumer_GH1038() { + AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; + Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); + string exchangeName = GenerateExchangeName(); string queueName = GenerateQueueName(); string routingKey = string.Empty; diff --git a/projects/Test/Integration/TestConcurrentAccessBase.cs.cs b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs index ea83d6b5e8..21d862e823 100644 --- a/projects/Test/Integration/TestConcurrentAccessBase.cs.cs +++ b/projects/Test/Integration/TestConcurrentAccessBase.cs.cs @@ -42,7 +42,7 @@ public class TestConcurrentAccessBase : IntegrationFixture protected const ushort _messageCount = 200; public TestConcurrentAccessBase(ITestOutputHelper output, - ushort consumerDispatchConcurrency = 1, + ushort consumerDispatchConcurrency = RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency, bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel) { } From 601f501c858dfd4bf2d17f2cdf8c5c6741045cd7 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 12 Sep 2024 08:32:08 -0700 Subject: [PATCH 2/2] * We've come full circle ... restoring @danielmarbach's original changes. --- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 2 +- .../RabbitMQ.Client/client/api/IConnection.cs | 4 +- .../RabbitMQ.Client/client/framing/Channel.cs | 3 +- .../client/framing/Constants.cs | 2 +- .../client/impl/AutorecoveringConnection.cs | 5 +- .../client/impl/ChannelBase.cs | 12 ++--- .../RabbitMQ.Client/client/impl/Connection.cs | 2 +- projects/RabbitMQ.Client/util/Misc.cs | 48 ------------------- 8 files changed, 11 insertions(+), 67 deletions(-) delete mode 100644 projects/RabbitMQ.Client/util/Misc.cs diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 571643ea3f..a5b9889fbe 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -894,4 +894,4 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort -RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 44af08a6cc..e28bec3c7c 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -239,13 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. /// - /// Defaults to . + /// Defaults to null, which will use the value from /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. /// /// Cancellation token - Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index e3a103d448..c31147eee0 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -38,8 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl { internal class Channel : ChannelBase { - public Channel(ConnectionConfig config, ISession session, - ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) + public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) : base(config, session, consumerDispatchConcurrency) { } diff --git a/projects/RabbitMQ.Client/client/framing/Constants.cs b/projects/RabbitMQ.Client/client/framing/Constants.cs index 4df40693d4..637d53a785 100644 --- a/projects/RabbitMQ.Client/client/framing/Constants.cs +++ b/projects/RabbitMQ.Client/client/framing/Constants.cs @@ -87,7 +87,7 @@ public static class Constants /// /// The default consumer dispatch concurrency. See /// to set this value for every channel created on a connection, - /// and + /// and /// for setting this value for a particular channel. /// public const ushort DefaultConsumerDispatchConcurrency = 1; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 740641942b..e3918e4c5a 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -37,7 +37,6 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Impl; -using RabbitMQ.Util; namespace RabbitMQ.Client.Framing.Impl { @@ -241,11 +240,11 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + public async Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { EnsureIsOpen(); - ushort cdc = Misc.DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency); + ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken) .ConfigureAwait(false); AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index c563d21313..97925a7b06 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -45,7 +45,6 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Util; namespace RabbitMQ.Client.Impl { @@ -75,10 +74,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; protected ChannelBase(ConnectionConfig config, ISession session, - ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency) + ushort? perChannelConsumerDispatchConcurrency = null) { ContinuationTimeout = config.ContinuationTimeout; - ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency); + ConsumerDispatcher = new AsyncConsumerDispatcher(this, + perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency)); Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); @@ -94,12 +94,6 @@ protected ChannelBase(ConnectionConfig config, ISession session, Session = session; } - private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency) - { - ushort cdc = Misc.DetermineConsumerDispatchConcurrency(config, perChannelConsumerDispatchConcurrency); - return new AsyncConsumerDispatcher(this, cdc); - } - internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan ContinuationTimeout { get; set; } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index d276a749be..8e8065b8d6 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -253,7 +253,7 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency, + public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { EnsureIsOpen(); diff --git a/projects/RabbitMQ.Client/util/Misc.cs b/projects/RabbitMQ.Client/util/Misc.cs deleted file mode 100644 index fa6832bc5a..0000000000 --- a/projects/RabbitMQ.Client/util/Misc.cs +++ /dev/null @@ -1,48 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. -//--------------------------------------------------------------------------- - -using RabbitMQ.Client; - -namespace RabbitMQ.Util -{ - internal static class Misc - { - internal static ushort DetermineConsumerDispatchConcurrency(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency) - { - ushort cdc = config.ConsumerDispatchConcurrency; - if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency) - { - cdc = perChannelConsumerDispatchConcurrency; - } - return cdc; - } - } -}