Skip to content

Commit

Permalink
Merge pull request #1568 from rabbitmq/rabbitmq-dotnet-client-1567
Browse files Browse the repository at this point in the history
Can't close channel from consumer dispatcher
  • Loading branch information
lukebakken authored Jun 3, 2024
2 parents f8087b6 + a835088 commit 1a8d0cb
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
});
};

// queue1 -> produce click to queue2
// queue1 -> produce click to queue2
// click -> exchange
// queue2 -> consume click from queue1
// queue2 -> consume click from queue1
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
await _channel.QueueDeclareAsync(queue1Name);
await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name);
Expand Down Expand Up @@ -660,6 +660,38 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
Assert.True(await tcs.Task);
}

[Fact]
public async Task TestCloseWithinEventHandler_GH1567()
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

QueueDeclareOk q = await _channel.QueueDeclareAsync();
string queueName = q.QueueName;

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (_, eventArgs) =>
{
await _channel.BasicCancelAsync(eventArgs.ConsumerTag);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_channel.CloseAsync().ContinueWith((_) =>
{
_channel.Dispose();
_channel = null;
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
tcs.TrySetResult(true);
};

await _channel.BasicConsumeAsync(consumer, queueName, true);

var bp = new BasicProperties();

await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
basicProperties: bp, mandatory: true, body: GetRandomBody(64));

Assert.True(await tcs.Task);
}

private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
{
foreach (TaskCompletionSource<bool> tcs in tcsAry)
Expand Down

0 comments on commit 1a8d0cb

Please sign in to comment.