Skip to content

Commit

Permalink
Merge pull request #1585 from rabbitmq/rabbitmq-dotnet-client-1038
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin authored May 31, 2024
2 parents 137abc9 + 7005353 commit f8087b6
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration
{
public class TestAsyncEventingBasicConsumer : IntegrationFixture
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan);
private readonly CancellationTokenRegistration _ctr;
private readonly TaskCompletionSource<bool> _onCallbackExceptionTcs =
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<bool> _onReceivedTcs =
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

public TestAsyncEventingBasicConsumer(ITestOutputHelper output)
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2)
{
_ctr = _cts.Token.Register(OnTokenCanceled);
}

public override Task DisposeAsync()
{
_ctr.Dispose();
_cts.Dispose();
return base.DisposeAsync();
}

private void OnTokenCanceled()
{
_onCallbackExceptionTcs.TrySetCanceled();
_onReceivedTcs.TrySetCanceled();
}

private void ConsumerChannelOnCallbackException(object sender, CallbackExceptionEventArgs e)
{
_onCallbackExceptionTcs.TrySetResult(true);
}

private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event)
{
_onReceivedTcs.TrySetResult(true);
throw new Exception("from async subscriber");
}

[Fact]
public async Task TestAsyncEventingBasicConsumer_GH1038()
{
string exchangeName = GenerateExchangeName();
string queueName = GenerateQueueName();
string routingKey = string.Empty;

await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await _channel.QueueDeclareAsync(queueName, false, false, true, null);
await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null);

_channel.CallbackException += ConsumerChannelOnCallbackException;

//async subscriber
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += AsyncConsumerOnReceived;
await _channel.BasicConsumeAsync(queueName, false, consumer);

//publisher
using IChannel publisherChannel = await _conn.CreateChannelAsync();
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchangeName, "", props, messageBodyBytes);

await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task);
Assert.True(await _onReceivedTcs.Task);
Assert.True(await _onCallbackExceptionTcs.Task);
}
}
}

0 comments on commit f8087b6

Please sign in to comment.