From a2a09673a535058fe6c196c21f0a4994c276ecb5 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 26 Dec 2024 14:57:24 -0800 Subject: [PATCH] * Add basic test to see what dispose after a channel exception does. * Modify `CreateChannel` app to try and trigger GH1751 --- .../Applications/CreateChannel/Program.cs | 77 ++++++++++++++----- projects/RabbitMQ.Client/Impl/Channel.cs | 4 + projects/Test/Integration/TestQueueDeclare.cs | 16 ++++ 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/projects/Applications/CreateChannel/Program.cs b/projects/Applications/CreateChannel/Program.cs index 69d96e257..27c61621b 100644 --- a/projects/Applications/CreateChannel/Program.cs +++ b/projects/Applications/CreateChannel/Program.cs @@ -30,11 +30,12 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; -using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; namespace CreateChannel { @@ -44,49 +45,89 @@ public static class Program private const int ChannelsToOpen = 50; private static int channelsOpened; - private static AutoResetEvent doneEvent; public static async Task Main() { - doneEvent = new AutoResetEvent(false); + var doneTcs = new TaskCompletionSource(); var connectionFactory = new ConnectionFactory { }; await using IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); - _ = Task.Run(async () => + var workTask = Task.Run(async () => { - var channels = new IChannel[ChannelsToOpen]; - for (int i = 0; i < Repeats; i++) + try { - for (int j = 0; j < channels.Length; j++) + var channelOpenTasks = new List>(); + var channelDisposeTasks = new List(); + var channels = new List(); + for (int i = 0; i < Repeats; i++) { - channels[j] = await connection.CreateChannelAsync(); - channelsOpened++; - } + for (int j = 0; j < ChannelsToOpen; j++) + { + channelOpenTasks.Add(connection.CreateChannelAsync()); + } - for (int j = 0; j < channels.Length; j++) - { - await channels[j].DisposeAsync(); + for (int j = 0; j < channelOpenTasks.Count; j++) + { + IChannel ch = await channelOpenTasks[j]; + if (j % 8 == 0) + { + try + { + await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await ch.DisposeAsync(); + } + catch (Exception ex) + { + _ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}"); + } + } + else + { + channels.Add(ch); + channelsOpened++; + } + } + channelOpenTasks.Clear(); + + for (int j = 0; j < channels.Count; j++) + { + channelDisposeTasks.Add(channels[j].DisposeAsync()); + } + + for (int j = 0; j < channels.Count; j++) + { + await channelDisposeTasks[j]; + } + channelDisposeTasks.Clear(); } - } - doneEvent.Set(); + doneTcs.SetResult(true); + } + catch (Exception ex) + { + doneTcs.SetException(ex); + } }); Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}"); Console.WriteLine(); Console.WriteLine("Opened"); - while (!doneEvent.WaitOne(500)) + while (false == doneTcs.Task.IsCompleted) { Console.WriteLine($"{channelsOpened,5}"); + await Task.Delay(150); } watch.Stop(); Console.WriteLine($"{channelsOpened,5}"); Console.WriteLine(); - Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); + Console.WriteLine($"Took {watch.Elapsed}"); - Console.ReadLine(); + await workTask; } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index d5300b9c4..31d056abc 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -714,6 +714,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { + // TODO add check for Disposing / Disposed var channelClose = new ChannelClose(cmd.MethodSpan); SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, channelClose._replyCode, @@ -730,11 +731,14 @@ await ModelSendAsync(in method, cancellationToken) await Session.NotifyAsync(cancellationToken) .ConfigureAwait(false); + return true; } protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) { + // TODO add check for Disposing / Disposed + /* * Note: * This call _must_ come before completing the async continuation diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 79d0c9106..96282707b 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync() Assert.Equal(q, passiveDeclareResult.QueueName); } + [Fact] + public async Task TestPassiveQueueDeclareException_GH1749() + { + string q = GenerateQueueName(); + try + { + await _channel.QueueDeclarePassiveAsync(q); + } + catch (Exception ex) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex); + await _channel.DisposeAsync(); + _channel = null; + } + } + [Fact] public async Task TestConcurrentQueueDeclareAndBindAsync() {