From 30a244d5831055ea96107080cdaff3763a75ecfd Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 19 Dec 2024 11:11:51 -0800 Subject: [PATCH 1/2] Ensure `DisposeAsync` and `Dispose` are idempotent Fixes #1749 * Add test project * Add `FirstChanceException` logging. * Add code to repeatedly close connection. * Allow passing hostname to GH-1749 program as the first arg. * Toxiproxy tests fixup. * Standardize on `_disposedValue` name. * Ensure `_disposedValue` is set in `finally` block. * Add lock object for disposal of Channels and Connections. Note: a `SemaphoreSlim` can't be used because it must be disposed as well, and that can't happen cleanly in a `Dispose` method. --- RabbitMQDotNetClient.sln | 7 + projects/Applications/GH-1647/GH-1647.csproj | 2 +- projects/Applications/GH-1749/GH-1749.csproj | 19 ++ projects/Applications/GH-1749/Program.cs | 216 ++++++++++++++++++ projects/Applications/GH-1749/Util.cs | 140 ++++++++++++ .../ConsumerDispatcherChannelBase.cs | 68 +++--- .../Impl/AutorecoveringChannel.cs | 51 ++++- .../AutorecoveringConnection.Recording.cs | 20 +- .../Impl/AutorecoveringConnection.Recovery.cs | 8 +- .../Impl/AutorecoveringConnection.cs | 27 ++- projects/RabbitMQ.Client/Impl/Channel.cs | 109 +++++++-- projects/RabbitMQ.Client/Impl/Connection.cs | 31 ++- .../Test/Integration/TestChannelShutdown.cs | 49 ++++ projects/Test/Integration/TestToxiproxy.cs | 28 ++- 14 files changed, 672 insertions(+), 103 deletions(-) create mode 100644 projects/Applications/GH-1749/GH-1749.csproj create mode 100644 projects/Applications/GH-1749/Program.cs create mode 100644 projects/Applications/GH-1749/Util.cs diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 4307f8500..deb687a47 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -108,6 +110,10 @@ Global {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -123,6 +129,7 @@ Global {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/Applications/GH-1647/GH-1647.csproj b/projects/Applications/GH-1647/GH-1647.csproj index f2591d159..f08f12982 100644 --- a/projects/Applications/GH-1647/GH-1647.csproj +++ b/projects/Applications/GH-1647/GH-1647.csproj @@ -9,7 +9,7 @@ - + diff --git a/projects/Applications/GH-1749/GH-1749.csproj b/projects/Applications/GH-1749/GH-1749.csproj new file mode 100644 index 000000000..9e44f6447 --- /dev/null +++ b/projects/Applications/GH-1749/GH-1749.csproj @@ -0,0 +1,19 @@ + + + + Exe + net8.0 + GH_1749 + enable + enable + + + + + + + + + + + diff --git a/projects/Applications/GH-1749/Program.cs b/projects/Applications/GH-1749/Program.cs new file mode 100644 index 000000000..a1cdf9cbb --- /dev/null +++ b/projects/Applications/GH-1749/Program.cs @@ -0,0 +1,216 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Runtime.ExceptionServices; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace GH_1749 +{ + class GH1749Consumer : AsyncDefaultBasicConsumer + { + public GH1749Consumer(IChannel channel) : base(channel) + { + } + + protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) + { + Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", Util.Now, consumerTags[0]); + return base.OnCancelAsync(consumerTags, cancellationToken); + } + } + + static class Program + { + const string DefaultHostName = "localhost"; + const string ConnectionClientProvidedName = "GH_1749"; + static readonly CancellationTokenSource s_cancellationTokenSource = new(); + static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token; + + static Util? s_util; + + static async Task Main(string[] args) + { + string hostname = DefaultHostName; + if (args.Length > 0) + { + hostname = args[0]; + } + + s_util = new Util(hostname, "guest", "guest"); + + AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException; + + ConnectionFactory connectionFactory = new() + { + HostName = hostname, + AutomaticRecoveryEnabled = true, + UserName = "guest", + Password = "guest", + ClientProvidedName = ConnectionClientProvidedName + }; + + var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); + await using var connection = await connectionFactory.CreateConnectionAsync(); + + connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea); + _ = CloseConnectionAsync(); + return Task.CompletedTask; + }; + + connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync; + + connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync; + connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync; + + connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync; + + connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync; + connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync; + + connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync; + + await using var channel = await connection.CreateChannelAsync(options: channelOptions); + + channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync; + channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; + + QueueDeclareOk queue = await channel.QueueDeclareAsync(); + + var consumer = new GH1749Consumer(channel); + await channel.BasicConsumeAsync(queue.QueueName, true, consumer); + + _ = CloseConnectionAsync(); + + Console.WriteLine("{0} [INFO] consumer is running", Util.Now); + Console.ReadLine(); + } + + static async Task CloseConnectionAsync() + { + if (s_util is null) + { + throw new NullReferenceException("s_util"); + } + + try + { + Console.WriteLine("{0} [INFO] start closing connection: {1}", Now, ConnectionClientProvidedName); + await s_util.CloseConnectionAsync(ConnectionClientProvidedName); + Console.WriteLine("{0} [INFO] done closing connection: {1}", Now, ConnectionClientProvidedName); + } + catch (Exception ex) + { + Console.Error.WriteLine("{0} [ERROR] error while closing connection: {1}", Now, ex); + } + } + + private static string Now => Util.Now; + + private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e) + { + if (e.Exception is ObjectDisposedException) + { + Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception); + } + } + + private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter); + return Task.CompletedTask; + } + + private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter); + return Task.CompletedTask; + } + + private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag); + return Task.CompletedTask; + } + } +} + diff --git a/projects/Applications/GH-1749/Util.cs b/projects/Applications/GH-1749/Util.cs new file mode 100644 index 000000000..1cc2debda --- /dev/null +++ b/projects/Applications/GH-1749/Util.cs @@ -0,0 +1,140 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Globalization; +using EasyNetQ.Management.Client; + +namespace GH_1749 +{ + public class Util : IDisposable + { + private static readonly Random s_random = Random.Shared; + private readonly ManagementClient _managementClient; + + public Util() : this("localhost", "guest", "guest") + { + } + + public Util(string hostname, string managementUsername, string managementPassword) + { + if (string.IsNullOrEmpty(managementUsername)) + { + managementUsername = "guest"; + } + + if (string.IsNullOrEmpty(managementPassword)) + { + throw new ArgumentNullException(nameof(managementPassword)); + } + + var managementUri = new Uri($"http://{hostname}:15672"); + _managementClient = new ManagementClient(managementUri, managementUsername, managementPassword); + } + + public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture); + + public static Random S_Random + { + get + { + return s_random; + } + } + + public async Task CloseConnectionAsync(string connectionClientProvidedName) + { + ushort tries = 1; + EasyNetQ.Management.Client.Model.Connection? connectionToClose = null; + do + { + IReadOnlyList connections; + try + { + do + { + ushort delayMilliseconds = (ushort)(tries * 2 * 100); + if (delayMilliseconds > 1000) + { + delayMilliseconds = 1000; + } + + await Task.Delay(TimeSpan.FromMilliseconds(delayMilliseconds)); + + connections = await _managementClient.GetConnectionsAsync(); + } while (connections.Count == 0); + + connectionToClose = connections.Where(c0 => + { + if (c0.ClientProperties.ContainsKey("connection_name")) + { + object? maybeConnectionName = c0.ClientProperties["connection_name"]; + if (maybeConnectionName is string connectionNameStr) + { + return string.Equals(connectionNameStr, connectionClientProvidedName, StringComparison.InvariantCultureIgnoreCase); + } + } + + return false; + }).FirstOrDefault(); + } + catch (ArgumentNullException) + { + // Sometimes we see this in GitHub CI + tries++; + continue; + } + + if (connectionToClose != null) + { + try + { + await _managementClient.CloseConnectionAsync(connectionToClose); + return; + } + catch (UnexpectedHttpStatusCodeException) + { + tries++; + } + } + } while (tries <= 10); + + if (connectionToClose == null) + { + throw new InvalidOperationException( + $"{Now} [ERROR] could not find/delete connection: '{connectionClientProvidedName}'"); + } + } + + public void Dispose() => _managementClient.Dispose(); + } +} diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 92c273957..76ceaae20 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -45,7 +45,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, private readonly Task _worker; private readonly ushort _concurrency; private bool _quiesce = false; - private bool _disposed; + private bool _disposedValue; internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) { @@ -85,7 +85,7 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) { - if (false == _disposed && false == _quiesce) + if (false == _disposedValue && false == _quiesce) { AddConsumer(consumer, consumerTag); WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag); @@ -101,7 +101,7 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationToken cancellationToken) { - if (false == _disposed && false == _quiesce) + if (false == _disposedValue && false == _quiesce) { IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag); var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); @@ -115,7 +115,7 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken) { - if (false == _disposed && false == _quiesce) + if (false == _disposedValue && false == _quiesce) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); @@ -129,7 +129,7 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken) { - if (false == _disposed && false == _quiesce) + if (false == _disposedValue && false == _quiesce) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); @@ -148,7 +148,7 @@ public void Quiesce() public async Task WaitForShutdownAsync() { - if (_disposed) + if (_disposedValue) { return; } @@ -202,6 +202,15 @@ protected override Task InternalShutdownAsync() protected abstract Task ProcessChannelAsync(); + protected enum WorkType : byte + { + Shutdown, + Cancel, + CancelOk, + Deliver, + ConsumeOk + } + protected readonly struct WorkStruct : IDisposable { public readonly IAsyncBasicConsumer Consumer; @@ -276,42 +285,35 @@ public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string cons public void Dispose() => Body.Dispose(); } - protected enum WorkType : byte + public void Dispose() { - Shutdown, - Cancel, - CancelOk, - Deliver, - ConsumeOk + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { - if (!_disposed) + if (_disposedValue) { - try - { - if (disposing) - { - Quiesce(); - } - } - catch - { - // CHOMP - } - finally + return; + } + + try + { + if (disposing) { - _disposed = true; + Quiesce(); } } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); + catch + { + // CHOMP + } + finally + { + _disposedValue = true; + } } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index d98734f19..92f2fd781 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -47,7 +47,10 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; - private bool _disposed; + + private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -143,7 +146,7 @@ public IAsyncBasicConsumer? DefaultConsumer public bool IsClosed => !IsOpen; - public bool IsOpen => !_disposed && _innerChannel.IsOpen; + public bool IsOpen => !_disposedValue && _innerChannel.IsOpen; public string? CurrentQueue => InnerChannel.CurrentQueue; @@ -155,7 +158,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - if (_disposed) + if (_disposedValue) { return false; } @@ -192,7 +195,7 @@ await newChannel.TxSelectAsync(cancellationToken) * with the resulting basic.ack never getting sent out. */ - if (_disposed) + if (_disposedValue) { await newChannel.AbortAsync(CancellationToken.None) .ConfigureAwait(false); @@ -252,23 +255,47 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposedValue) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { - if (_disposed) + if (_disposedValue) { return; } - if (IsOpen) + lock (_isDisposingLock) { - await this.AbortAsync() - .ConfigureAwait(false); + if (_isDisposing) + { + return; + } + _isDisposing = true; } - _recordedConsumerTags.Clear(); - _disposed = true; + try + { + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + _recordedConsumerTags.Clear(); + } + finally + { + _disposedValue = true; + _isDisposing = false; + } } public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); @@ -477,7 +504,7 @@ public Task TxSelectAsync(CancellationToken cancellationToken) [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { - if (_disposed) + if (_disposedValue) { ThrowDisposed(); } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs index ef8f21c35..c8063e949 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs @@ -53,7 +53,7 @@ internal sealed partial class AutorecoveringConnection internal async ValueTask RecordExchangeAsync(RecordedExchange exchange, bool recordedEntitiesSemaphoreHeld) { - if (_disposed) + if (_disposedValue) { return; } @@ -85,7 +85,7 @@ private void DoRecordExchange(in RecordedExchange exchange) internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -133,7 +133,7 @@ await DeleteAutoDeleteExchangeAsync(binding.Source, internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -187,7 +187,7 @@ bool AnyBindingsOnExchange(string exchange) internal async ValueTask RecordQueueAsync(RecordedQueue queue, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -219,7 +219,7 @@ private void DoRecordQueue(RecordedQueue queue) internal async ValueTask DeleteRecordedQueueAsync(string queueName, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -268,7 +268,7 @@ await DeleteAutoDeleteExchangeAsync(binding.Source, internal async ValueTask RecordBindingAsync(RecordedBinding binding, bool recordedEntitiesSemaphoreHeld) { - if (_disposed) + if (_disposedValue) { return; } @@ -300,7 +300,7 @@ private void DoRecordBinding(in RecordedBinding binding) internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -332,7 +332,7 @@ private void DoDeleteRecordedBinding(in RecordedBinding rb) internal async ValueTask RecordConsumerAsync(RecordedConsumer consumer, bool recordedEntitiesSemaphoreHeld) { - if (_disposed) + if (_disposedValue) { return; } @@ -369,7 +369,7 @@ private void DoRecordConsumer(in RecordedConsumer consumer) internal async ValueTask DeleteRecordedConsumerAsync(string consumerTag, bool recordedEntitiesSemaphoreHeld) { - if (_disposed) + if (_disposedValue) { return; } @@ -466,7 +466,7 @@ private void DoAddRecordedChannel(AutorecoveringChannel channel) internal async Task DeleteRecordedChannelAsync(AutorecoveringChannel channel, bool channelsSemaphoreHeld, bool recordedEntitiesSemaphoreHeld) { - if (_disposed) + if (_disposedValue) { return; } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs index 7eaed0479..da1eb2b61 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs @@ -285,7 +285,7 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken) private async ValueTask RecoverExchangesAsync(IConnection connection, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -337,7 +337,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) private async Task RecoverQueuesAsync(IConnection connection, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -451,7 +451,7 @@ void UpdateConsumerQueue(string oldName, string newName) private async ValueTask RecoverBindingsAsync(IConnection connection, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { - if (_disposed) + if (_disposedValue) { return; } @@ -503,7 +503,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRecover, IChannel channelToUse, bool recordedEntitiesSemaphoreHeld = false, CancellationToken cancellationToken = default) { - if (_disposed) + if (_disposedValue) { return; } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index f38de44a1..37eab3613 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -49,7 +49,7 @@ internal sealed partial class AutorecoveringConnection : IConnection private readonly IEndpointResolver _endpoints; private Connection _innerConnection; - private bool _disposed; + private bool _disposedValue; private Connection InnerConnection { @@ -268,11 +268,19 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca return autorecoveringChannel; } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposedValue) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { - if (_disposed) + if (_disposedValue) { return; } @@ -281,6 +289,11 @@ public async ValueTask DisposeAsync() { await _innerConnection.DisposeAsync() .ConfigureAwait(false); + + _channels.Clear(); + _recordedEntitiesSemaphore.Dispose(); + _channelsSemaphore.Dispose(); + _recoveryCancellationTokenSource.Dispose(); } catch (OperationInterruptedException) { @@ -288,11 +301,7 @@ await _innerConnection.DisposeAsync() } finally { - _channels.Clear(); - _recordedEntitiesSemaphore.Dispose(); - _channelsSemaphore.Dispose(); - _recoveryCancellationTokenSource.Dispose(); - _disposed = true; + _disposedValue = true; } } @@ -302,7 +311,7 @@ private void EnsureIsOpen() [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { - if (_disposed) + if (_disposedValue) { ThrowDisposed(); } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 4e73a9a08..d5300b9c4 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -61,6 +61,10 @@ internal partial class Channel : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; + private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -514,47 +518,106 @@ public override string ToString() void IDisposable.Dispose() { + if (_disposedValue) + { + return; + } + Dispose(true); } + public async ValueTask DisposeAsync() + { + if (_disposedValue) + { + return; + } + + await DisposeAsyncCore(true) + .ConfigureAwait(false); + + Dispose(false); + } + protected virtual void Dispose(bool disposing) { + if (_disposedValue) + { + return; + } + if (disposing) { - if (IsOpen) + lock (_isDisposingLock) { - this.AbortAsync().GetAwaiter().GetResult(); + if (_isDisposing) + { + return; + } + _isDisposing = true; } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); - } - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore() - .ConfigureAwait(false); + try + { + if (IsOpen) + { + this.AbortAsync().GetAwaiter().GetResult(); + } - Dispose(false); + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + } + finally + { + _disposedValue = true; + _isDisposing = false; + } + } } - protected virtual async ValueTask DisposeAsyncCore() + protected virtual async ValueTask DisposeAsyncCore(bool disposing) { - if (IsOpen) + if (_disposedValue) { - await this.AbortAsync().ConfigureAwait(false); + return; } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - if (_outstandingPublisherConfirmationsRateLimiter is not null) + if (disposing) { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + + try + { + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } + } + finally + { + _disposedValue = true; + _isDisposing = false; + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index af26bb7ac..3272bf1f8 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -45,7 +45,10 @@ namespace RabbitMQ.Client.Framing { internal sealed partial class Connection : IConnection { - private bool _disposed; + private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); + private volatile bool _closed; private readonly ConnectionConfig _config; @@ -485,15 +488,32 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio return _frameHandler.WriteAsync(frames, cancellationToken); } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposedValue) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { - if (_disposed) + if (_disposedValue) { return; } + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -514,14 +534,15 @@ await _channel0.DisposeAsync() } finally { - _disposed = true; + _disposedValue = true; + _isDisposing = false; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { - if (_disposed) + if (_disposedValue) { ThrowObjectDisposedException(); } diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 37af0ec19..b62f8b058 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Impl; @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown() await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + + [Fact] + public async Task TestConcurrentDisposeAsync_GH1749() + { + bool sawCallbackException = false; + int channelShutdownCount = 0; + + _channel.CallbackExceptionAsync += (channel, ea) => + { + sawCallbackException = true; + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (channel, args) => + { + Interlocked.Increment(ref channelShutdownCount); + return Task.CompletedTask; + }; + + var disposeTasks = new List + { + _channel.DisposeAsync(), + _channel.DisposeAsync(), + _channel.DisposeAsync() + }; + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + Assert.Equal(1, channelShutdownCount); + Assert.False(sawCallbackException); + + disposeTasks.Clear(); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + _channel = null; + _conn = null; + } } } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 1629dc69b..507ac12e5 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -41,12 +41,14 @@ using Xunit; using Xunit.Abstractions; +#nullable enable + namespace Test.Integration { public class TestToxiproxy : IntegrationFixture { private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1); - private ToxiproxyManager _toxiproxyManager; + private ToxiproxyManager? _toxiproxyManager; private int _proxyPort; public TestToxiproxy(ITestOutputHelper output) : base(output) @@ -61,14 +63,24 @@ public override Task InitializeAsync() Assert.Null(_conn); Assert.Null(_channel); - _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - _proxyPort = ToxiproxyManager.ProxyPort; - return _toxiproxyManager.InitializeAsync(); + if (AreToxiproxyTestsEnabled) + { + _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + _proxyPort = ToxiproxyManager.ProxyPort; + return _toxiproxyManager.InitializeAsync(); + } + else + { + return Task.CompletedTask; + } } public override async Task DisposeAsync() { - await _toxiproxyManager.DisposeAsync(); + if (_toxiproxyManager is not null) + { + await _toxiproxyManager.DisposeAsync(); + } await base.DisposeAsync(); } @@ -77,6 +89,7 @@ public override async Task DisposeAsync() public async Task TestCloseConnection() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -199,6 +212,7 @@ async Task PublishLoop() public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -246,6 +260,7 @@ await Assert.ThrowsAsync(() => public async Task TestTcpReset_GH1464() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); @@ -298,6 +313,7 @@ public async Task TestTcpReset_GH1464() public async Task TestPublisherConfirmationThrottling() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); const int TotalMessageCount = 64; const int MaxOutstandingConfirms = 8; @@ -397,7 +413,7 @@ private bool AreToxiproxyTestsEnabled { get { - string s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); + string? s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); if (string.IsNullOrEmpty(s)) { From a2a09673a535058fe6c196c21f0a4994c276ecb5 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 26 Dec 2024 14:57:24 -0800 Subject: [PATCH 2/2] * Add basic test to see what dispose after a channel exception does. * Modify `CreateChannel` app to try and trigger GH1751 --- .../Applications/CreateChannel/Program.cs | 77 ++++++++++++++----- projects/RabbitMQ.Client/Impl/Channel.cs | 4 + projects/Test/Integration/TestQueueDeclare.cs | 16 ++++ 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/projects/Applications/CreateChannel/Program.cs b/projects/Applications/CreateChannel/Program.cs index 69d96e257..27c61621b 100644 --- a/projects/Applications/CreateChannel/Program.cs +++ b/projects/Applications/CreateChannel/Program.cs @@ -30,11 +30,12 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; -using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; namespace CreateChannel { @@ -44,49 +45,89 @@ public static class Program private const int ChannelsToOpen = 50; private static int channelsOpened; - private static AutoResetEvent doneEvent; public static async Task Main() { - doneEvent = new AutoResetEvent(false); + var doneTcs = new TaskCompletionSource(); var connectionFactory = new ConnectionFactory { }; await using IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); - _ = Task.Run(async () => + var workTask = Task.Run(async () => { - var channels = new IChannel[ChannelsToOpen]; - for (int i = 0; i < Repeats; i++) + try { - for (int j = 0; j < channels.Length; j++) + var channelOpenTasks = new List>(); + var channelDisposeTasks = new List(); + var channels = new List(); + for (int i = 0; i < Repeats; i++) { - channels[j] = await connection.CreateChannelAsync(); - channelsOpened++; - } + for (int j = 0; j < ChannelsToOpen; j++) + { + channelOpenTasks.Add(connection.CreateChannelAsync()); + } - for (int j = 0; j < channels.Length; j++) - { - await channels[j].DisposeAsync(); + for (int j = 0; j < channelOpenTasks.Count; j++) + { + IChannel ch = await channelOpenTasks[j]; + if (j % 8 == 0) + { + try + { + await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await ch.DisposeAsync(); + } + catch (Exception ex) + { + _ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}"); + } + } + else + { + channels.Add(ch); + channelsOpened++; + } + } + channelOpenTasks.Clear(); + + for (int j = 0; j < channels.Count; j++) + { + channelDisposeTasks.Add(channels[j].DisposeAsync()); + } + + for (int j = 0; j < channels.Count; j++) + { + await channelDisposeTasks[j]; + } + channelDisposeTasks.Clear(); } - } - doneEvent.Set(); + doneTcs.SetResult(true); + } + catch (Exception ex) + { + doneTcs.SetException(ex); + } }); Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}"); Console.WriteLine(); Console.WriteLine("Opened"); - while (!doneEvent.WaitOne(500)) + while (false == doneTcs.Task.IsCompleted) { Console.WriteLine($"{channelsOpened,5}"); + await Task.Delay(150); } watch.Stop(); Console.WriteLine($"{channelsOpened,5}"); Console.WriteLine(); - Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); + Console.WriteLine($"Took {watch.Elapsed}"); - Console.ReadLine(); + await workTask; } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index d5300b9c4..31d056abc 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -714,6 +714,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { + // TODO add check for Disposing / Disposed var channelClose = new ChannelClose(cmd.MethodSpan); SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, channelClose._replyCode, @@ -730,11 +731,14 @@ await ModelSendAsync(in method, cancellationToken) await Session.NotifyAsync(cancellationToken) .ConfigureAwait(false); + return true; } protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) { + // TODO add check for Disposing / Disposed + /* * Note: * This call _must_ come before completing the async continuation diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 79d0c9106..96282707b 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync() Assert.Equal(q, passiveDeclareResult.QueueName); } + [Fact] + public async Task TestPassiveQueueDeclareException_GH1749() + { + string q = GenerateQueueName(); + try + { + await _channel.QueueDeclarePassiveAsync(q); + } + catch (Exception ex) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex); + await _channel.DisposeAsync(); + _channel = null; + } + } + [Fact] public async Task TestConcurrentQueueDeclareAndBindAsync() {