Skip to content

Commit

Permalink
* We've come full circle ... restoring @danielmarbach's original chan…
Browse files Browse the repository at this point in the history
…ges.
  • Loading branch information
lukebakken committed Sep 12, 2024
1 parent 7770fd8 commit 601f501
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 67 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -894,4 +894,4 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <see cref="Constants.DefaultConsumerDispatchConcurrency"/>.
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default);
}
}
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
{
}
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static class Constants
/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort, System.Threading.CancellationToken)"/>
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Framing.Impl
{
Expand Down Expand Up @@ -241,11 +240,11 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ushort cdc = Misc.DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency);
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
Expand Down
12 changes: 3 additions & 9 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -75,10 +74,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable
internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency);
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
Action<Exception, string> onException = (exception, context) =>
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
Expand All @@ -94,12 +94,6 @@ protected ChannelBase(ConnectionConfig config, ISession session,
Session = session;
}

private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
{
ushort cdc = Misc.DetermineConsumerDispatchConcurrency(config, perChannelConsumerDispatchConcurrency);
return new AsyncConsumerDispatcher(this, cdc);
}

internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan ContinuationTimeout { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
Expand Down
48 changes: 0 additions & 48 deletions projects/RabbitMQ.Client/util/Misc.cs

This file was deleted.

0 comments on commit 601f501

Please sign in to comment.