From 26bc93c9027918e21f91b801a5647268e8b8f92e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 23 Dec 2024 11:51:25 -0800 Subject: [PATCH] * Add lock object for disposal of Channels and Connections. Note: a `SemaphoreSlim` can't be used because it must be disposed as well, and that can't happen cleanly in a `Dispose` method. --- .../Impl/AutorecoveringChannel.cs | 13 +++++ projects/RabbitMQ.Client/Impl/Channel.cs | 29 ++++++++++- projects/RabbitMQ.Client/Impl/Connection.cs | 13 +++++ .../Test/Integration/TestChannelShutdown.cs | 49 +++++++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index b8e6825e0..92f2fd781 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -47,7 +47,10 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; + private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -269,6 +272,15 @@ public async ValueTask DisposeAsync() return; } + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -282,6 +294,7 @@ await this.AbortAsync() finally { _disposedValue = true; + _isDisposing = false; } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 3bd46d091..d5300b9c4 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -61,7 +61,9 @@ internal partial class Channel : IChannel, IRecoverable internal readonly IConsumerDispatcher ConsumerDispatcher; - private bool _disposedValue = false; + private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); public Channel(ISession session, CreateChannelOptions createChannelOptions) { @@ -546,6 +548,15 @@ protected virtual void Dispose(bool disposing) if (disposing) { + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -561,14 +572,29 @@ protected virtual void Dispose(bool disposing) finally { _disposedValue = true; + _isDisposing = false; } } } protected virtual async ValueTask DisposeAsyncCore(bool disposing) { + if (_disposedValue) + { + return; + } + if (disposing) { + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -590,6 +616,7 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() finally { _disposedValue = true; + _isDisposing = false; } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 52c8d77c4..3272bf1f8 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -46,6 +46,9 @@ namespace RabbitMQ.Client.Framing internal sealed partial class Connection : IConnection { private bool _disposedValue; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); + private volatile bool _closed; private readonly ConnectionConfig _config; @@ -502,6 +505,15 @@ public async ValueTask DisposeAsync() return; } + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -523,6 +535,7 @@ await _channel0.DisposeAsync() finally { _disposedValue = true; + _isDisposing = false; } } diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 37af0ec19..b62f8b058 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Impl; @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown() await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + + [Fact] + public async Task TestConcurrentDisposeAsync_GH1749() + { + bool sawCallbackException = false; + int channelShutdownCount = 0; + + _channel.CallbackExceptionAsync += (channel, ea) => + { + sawCallbackException = true; + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (channel, args) => + { + Interlocked.Increment(ref channelShutdownCount); + return Task.CompletedTask; + }; + + var disposeTasks = new List + { + _channel.DisposeAsync(), + _channel.DisposeAsync(), + _channel.DisposeAsync() + }; + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + Assert.Equal(1, channelShutdownCount); + Assert.False(sawCallbackException); + + disposeTasks.Clear(); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + _channel = null; + _conn = null; + } } }