Skip to content

Commit

Permalink
* Add lock object for disposal of Channels and Connections. Note: a `…
Browse files Browse the repository at this point in the history
…SemaphoreSlim` can't be used because it must be disposed as well, and that can't happen cleanly in a `Dispose` method.
  • Loading branch information
lukebakken committed Dec 23, 2024
1 parent 0b80ed3 commit 26bc93c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 1 deletion.
13 changes: 13 additions & 0 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable

private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;

private bool _disposedValue;
private bool _isDisposing;
private readonly object _isDisposingLock = new();

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -269,6 +272,15 @@ public async ValueTask DisposeAsync()
return;
}

lock (_isDisposingLock)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -282,6 +294,7 @@ await this.AbortAsync()
finally
{
_disposedValue = true;
_isDisposing = false;
}
}

Expand Down
29 changes: 28 additions & 1 deletion projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ internal partial class Channel : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposedValue = false;
private bool _disposedValue;
private bool _isDisposing;
private readonly object _isDisposingLock = new();

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
Expand Down Expand Up @@ -546,6 +548,15 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
lock (_isDisposingLock)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -561,14 +572,29 @@ protected virtual void Dispose(bool disposing)
finally
{
_disposedValue = true;
_isDisposing = false;
}
}
}

protected virtual async ValueTask DisposeAsyncCore(bool disposing)
{
if (_disposedValue)
{
return;
}

if (disposing)
{
lock (_isDisposingLock)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -590,6 +616,7 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
finally
{
_disposedValue = true;
_isDisposing = false;
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ namespace RabbitMQ.Client.Framing
internal sealed partial class Connection : IConnection
{
private bool _disposedValue;
private bool _isDisposing;
private readonly object _isDisposingLock = new();

private volatile bool _closed;

private readonly ConnectionConfig _config;
Expand Down Expand Up @@ -502,6 +505,15 @@ public async ValueTask DisposeAsync()
return;
}

lock (_isDisposingLock)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -523,6 +535,7 @@ await _channel0.DisposeAsync()
finally
{
_disposedValue = true;
_isDisposing = false;
}
}

Expand Down
49 changes: 49 additions & 0 deletions projects/Test/Integration/TestChannelShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown()
await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown");
Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
}

[Fact]
public async Task TestConcurrentDisposeAsync_GH1749()
{
bool sawCallbackException = false;
int channelShutdownCount = 0;

_channel.CallbackExceptionAsync += (channel, ea) =>
{
sawCallbackException = true;
return Task.CompletedTask;
};

_channel.ChannelShutdownAsync += (channel, args) =>
{
Interlocked.Increment(ref channelShutdownCount);
return Task.CompletedTask;
};

var disposeTasks = new List<ValueTask>
{
_channel.DisposeAsync(),
_channel.DisposeAsync(),
_channel.DisposeAsync()
};

foreach (ValueTask vt in disposeTasks)
{
await vt;
}

Assert.Equal(1, channelShutdownCount);
Assert.False(sawCallbackException);

disposeTasks.Clear();
disposeTasks.Add(_conn.DisposeAsync());
disposeTasks.Add(_conn.DisposeAsync());
disposeTasks.Add(_conn.DisposeAsync());

foreach (ValueTask vt in disposeTasks)
{
await vt;
}

_channel = null;
_conn = null;
}
}
}

0 comments on commit 26bc93c

Please sign in to comment.