Skip to content

Commit

Permalink
* Add basic test to see what dispose after a channel exception does.
Browse files Browse the repository at this point in the history
* Modify `CreateChannel` app to try and trigger GH1751
  • Loading branch information
lukebakken committed Dec 26, 2024
1 parent 30a244d commit a2a0967
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 18 deletions.
77 changes: 59 additions & 18 deletions projects/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace CreateChannel
{
Expand All @@ -44,49 +45,89 @@ public static class Program
private const int ChannelsToOpen = 50;

private static int channelsOpened;
private static AutoResetEvent doneEvent;

public static async Task Main()
{
doneEvent = new AutoResetEvent(false);
var doneTcs = new TaskCompletionSource<bool>();

var connectionFactory = new ConnectionFactory { };
await using IConnection connection = await connectionFactory.CreateConnectionAsync();

var watch = Stopwatch.StartNew();
_ = Task.Run(async () =>
var workTask = Task.Run(async () =>
{
var channels = new IChannel[ChannelsToOpen];
for (int i = 0; i < Repeats; i++)
try
{
for (int j = 0; j < channels.Length; j++)
var channelOpenTasks = new List<Task<IChannel>>();
var channelDisposeTasks = new List<ValueTask>();
var channels = new List<IChannel>();
for (int i = 0; i < Repeats; i++)
{
channels[j] = await connection.CreateChannelAsync();
channelsOpened++;
}
for (int j = 0; j < ChannelsToOpen; j++)
{
channelOpenTasks.Add(connection.CreateChannelAsync());
}

for (int j = 0; j < channels.Length; j++)
{
await channels[j].DisposeAsync();
for (int j = 0; j < channelOpenTasks.Count; j++)
{
IChannel ch = await channelOpenTasks[j];
if (j % 8 == 0)
{
try
{
await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
}
catch (OperationInterruptedException)
{
await ch.DisposeAsync();
}
catch (Exception ex)
{
_ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}");
}
}
else
{
channels.Add(ch);
channelsOpened++;
}
}
channelOpenTasks.Clear();

for (int j = 0; j < channels.Count; j++)
{
channelDisposeTasks.Add(channels[j].DisposeAsync());
}

for (int j = 0; j < channels.Count; j++)
{
await channelDisposeTasks[j];
}
channelDisposeTasks.Clear();
}
}

doneEvent.Set();
doneTcs.SetResult(true);
}
catch (Exception ex)
{
doneTcs.SetException(ex);
}
});

Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
Console.WriteLine();
Console.WriteLine("Opened");
while (!doneEvent.WaitOne(500))
while (false == doneTcs.Task.IsCompleted)
{
Console.WriteLine($"{channelsOpened,5}");
await Task.Delay(150);
}
watch.Stop();
Console.WriteLine($"{channelsOpened,5}");
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
Console.WriteLine($"Took {watch.Elapsed}");

Console.ReadLine();
await workTask;
}
}
}
4 changes: 4 additions & 0 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
// TODO add check for Disposing / Disposed
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
Expand All @@ -730,11 +731,14 @@ await ModelSendAsync(in method, cancellationToken)

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);

return true;
}

protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
// TODO add check for Disposing / Disposed

/*
* Note:
* This call _must_ come before completing the async continuation
Expand Down
16 changes: 16 additions & 0 deletions projects/Test/Integration/TestQueueDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync()
Assert.Equal(q, passiveDeclareResult.QueueName);
}

[Fact]
public async Task TestPassiveQueueDeclareException_GH1749()
{
string q = GenerateQueueName();
try
{
await _channel.QueueDeclarePassiveAsync(q);
}
catch (Exception ex)
{
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex);
await _channel.DisposeAsync();
_channel = null;
}
}

[Fact]
public async Task TestConcurrentQueueDeclareAndBindAsync()
{
Expand Down

0 comments on commit a2a0967

Please sign in to comment.