From fbe51dd2685f4df8336804d9b02521e212785aa7 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Wed, 22 May 2024 10:48:58 +0200 Subject: [PATCH] Use a custom awaitable async queue --- .../Client/HiveMQClientTrafficProcessor.cs | 331 ++++++++---------- .../Client/internal/AwaitableQueueX.cs | 67 ++++ .../HiveMQtt.Test/HiveMQClient/PublishTest.cs | 54 +-- 3 files changed, 249 insertions(+), 203 deletions(-) create mode 100644 Source/HiveMQtt/Client/internal/AwaitableQueueX.cs diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 26d6516e..04872f41 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -33,11 +33,11 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient { internal MQTT5Properties ConnectionProperties { get; set; } = new(); - internal ConcurrentQueue OutgoingPublishQueue { get; } = new(); + internal AwaitableQueueX OutgoingPublishQueue { get; } = new(); - internal ConcurrentQueue SendQueue { get; } = new(); + internal AwaitableQueueX SendQueue { get; } = new(); - internal ConcurrentQueue ReceivedQueue { get; } = new(); + internal AwaitableQueueX ReceivedQueue { get; } = new(); // Transactional packets indexed by packet identifier internal ConcurrentDictionary> TransactionQueue { get; } = new(); @@ -134,44 +134,37 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationTo continue; } - if (this.OutgoingPublishQueue.TryDequeue(out var publishPacket)) - { - FlushResult writeResult = default; + var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + FlushResult writeResult = default; - Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); - if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || - publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) + Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); + if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || + publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) + { + // QoS > 0 - Add to transaction queue + if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) { - // QoS > 0 - Add to transaction queue - if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) - { - Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); - continue; - } - } - - writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); - this.OnPublishSentEventLauncher(publishPacket); - - if (writeResult.IsCanceled) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); - break; + Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); + continue; } + } - if (writeResult.IsCompleted) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); - break; - } + writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); + this.OnPublishSentEventLauncher(publishPacket); - this.lastCommunicationTimer.Restart(); + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); + break; } - else + + if (writeResult.IsCompleted) { - // Queue is empty - await Task.Delay(1).ConfigureAwait(false); - } // TryDequeue + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + break; + } + + this.lastCommunicationTimer.Restart(); } // while(true) Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); @@ -203,92 +196,85 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => } // Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); - if (this.SendQueue.TryDequeue(out var packet)) - { - FlushResult writeResult = default; + var packet = await this.SendQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + FlushResult writeResult = default; - switch (packet) - { - // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. - case ConnectPacket connectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); - this.OnConnectSentEventLauncher(connectPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); - this.OnDisconnectSentEventLauncher(disconnectPacket); - break; - case SubscribePacket subscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); - this.OnSubscribeSentEventLauncher(subscribePacket); - break; - case UnsubscribePacket unsubscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); - this.OnUnsubscribeSentEventLauncher(unsubscribePacket); - break; - case PublishPacket publishPacket: - throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); - case PubAckPacket pubAckPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); - this.OnPubAckSentEventLauncher(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); - this.OnPubRecSentEventLauncher(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - // This is in response to a received PubRec packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); - this.OnPubRelSentEventLauncher(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - // This is in response to a received PubRel packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); - this.OnPubCompSentEventLauncher(pubCompPacket); - break; - case PingReqPacket pingReqPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); - this.OnPingReqSentEventLauncher(pingReqPacket); - break; - default: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); - break; - } // switch - - if (writeResult.IsCanceled) - { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + switch (packet) + { + // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. + case ConnectPacket connectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); + this.OnConnectSentEventLauncher(connectPacket); break; - } - - if (writeResult.IsCompleted) - { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); + this.OnDisconnectSentEventLauncher(disconnectPacket); break; - } + case SubscribePacket subscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); + this.OnSubscribeSentEventLauncher(subscribePacket); + break; + case UnsubscribePacket unsubscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); + this.OnUnsubscribeSentEventLauncher(unsubscribePacket); + break; + case PublishPacket publishPacket: + throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); + case PubAckPacket pubAckPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); + this.OnPubAckSentEventLauncher(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); + this.OnPubRecSentEventLauncher(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + // This is in response to a received PubRec packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); + this.OnPubRelSentEventLauncher(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + // This is in response to a received PubRel packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); + this.OnPubCompSentEventLauncher(pubCompPacket); + break; + case PingReqPacket pingReqPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); + this.OnPingReqSentEventLauncher(pingReqPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); + break; + } // switch - this.lastCommunicationTimer.Restart(); + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + break; } - else + + if (writeResult.IsCompleted) { - // Queue is empty - await Task.Delay(1).ConfigureAwait(false); - } // TryDequeue + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + break; + } + + this.lastCommunicationTimer.Restart(); } // while(true) Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); @@ -410,74 +396,67 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok } // Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); - if (this.ReceivedQueue.TryDequeue(out var packet)) + var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + if (this.Options.ClientMaximumPacketSize != null) { - if (this.Options.ClientMaximumPacketSize != null) + if (packet.PacketSize > this.Options.ClientMaximumPacketSize) { - if (packet.PacketSize > this.Options.ClientMaximumPacketSize) - { - Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - var opts = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.PacketTooLarge, - ReasonString = "Packet Too Large", - }; - await this.DisconnectAsync(opts).ConfigureAwait(false); - return false; - } + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.PacketTooLarge, + ReasonString = "Packet Too Large", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + return false; } - - switch (packet) - { - case ConnAckPacket connAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); - this.ConnectionProperties = connAckPacket.Properties; - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier} {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - Logger.Warn($"We shouldn't get the disconnect here - Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); - case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); - break; - case SubAckPacket subAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); - this.OnSubAckReceivedEventLauncher(subAckPacket); - break; - case UnsubAckPacket unsubAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); - this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); - break; - case PublishPacket publishPacket: - this.HandleIncomingPublishPacket(publishPacket); - break; - case PubAckPacket pubAckPacket: - this.HandleIncomingPubAckPacket(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - this.HandleIncomingPubRecPacket(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - this.HandleIncomingPubRelPacket(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - this.HandleIncomingPubCompPacket(pubCompPacket); - break; - default: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); - Logger.Error($"Unrecognized packet received. Will discard. {packet}"); - break; - } // switch (packet) } - else + + switch (packet) { - // Queue is empty - await Task.Delay(1).ConfigureAwait(false); - } // TryDequeue + case ConnAckPacket connAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + this.ConnectionProperties = connAckPacket.Properties; + this.OnConnAckReceivedEventLauncher(connAckPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier} {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + Logger.Warn($"We shouldn't get the disconnect here - Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case SubAckPacket subAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); + this.OnSubAckReceivedEventLauncher(subAckPacket); + break; + case UnsubAckPacket unsubAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); + this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); + break; + case PublishPacket publishPacket: + this.HandleIncomingPublishPacket(publishPacket); + break; + case PubAckPacket pubAckPacket: + this.HandleIncomingPubAckPacket(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + this.HandleIncomingPubRecPacket(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + this.HandleIncomingPubRelPacket(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + this.HandleIncomingPubCompPacket(pubCompPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); + Logger.Error($"Unrecognized packet received. Will discard. {packet}"); + break; + } // switch (packet) } // while (true) Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}"); diff --git a/Source/HiveMQtt/Client/internal/AwaitableQueueX.cs b/Source/HiveMQtt/Client/internal/AwaitableQueueX.cs new file mode 100644 index 00000000..16ee266c --- /dev/null +++ b/Source/HiveMQtt/Client/internal/AwaitableQueueX.cs @@ -0,0 +1,67 @@ +namespace HiveMQtt.Client.Internal; + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +public class AwaitableQueueX : IDisposable +{ + private readonly SemaphoreSlim semaphore; + private readonly ConcurrentQueue queue; + + public AwaitableQueueX() + { + this.semaphore = new SemaphoreSlim(0); + this.queue = new ConcurrentQueue(); + } + + public void Enqueue(T item) + { + this.queue.Enqueue(item); + this.semaphore.Release(); + } + + public void EnqueueRange(IEnumerable source) + { + var n = 0; + foreach (var item in source) + { + this.queue.Enqueue(item); + n++; + } + + this.semaphore.Release(n); + } + + public async Task DequeueAsync(CancellationToken cancellationToken) + { + while (true) + { + await this.semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + + if (this.queue.TryDequeue(out var item)) + { + return item; + } + } + } + + public void Clear() + { + while (this.queue.TryDequeue(out _)) + { + this.semaphore.Release(); + } + } + + public int Count => this.queue.Count; + + public bool IsEmpty => this.queue.IsEmpty; + + public void Dispose() + { + this.semaphore.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs index 535e32d1..b0f34c96 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs @@ -243,17 +243,17 @@ async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs even Assert.Equal(10, client2MessageCount); Assert.Equal(10, client3MessageCount); - Assert.Empty(client1.OutgoingPublishQueue); - Assert.Empty(client2.OutgoingPublishQueue); - Assert.Empty(client3.OutgoingPublishQueue); + Assert.Equal(0, client1.OutgoingPublishQueue.Count); + Assert.Equal(0, client2.OutgoingPublishQueue.Count); + Assert.Equal(0, client3.OutgoingPublishQueue.Count); - Assert.Empty(client1.ReceivedQueue); - Assert.Empty(client2.ReceivedQueue); - Assert.Empty(client3.ReceivedQueue); + Assert.Equal(0, client1.ReceivedQueue.Count); + Assert.Equal(0, client2.ReceivedQueue.Count); + Assert.Equal(0, client3.ReceivedQueue.Count); - Assert.Empty(client1.SendQueue); - Assert.Empty(client2.SendQueue); - Assert.Empty(client3.SendQueue); + Assert.Equal(0, client1.SendQueue.Count); + Assert.Equal(0, client2.SendQueue.Count); + Assert.Equal(0, client3.SendQueue.Count); Assert.Empty(client1.TransactionQueue); Assert.Empty(client2.TransactionQueue); @@ -334,17 +334,17 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) Assert.Equal(10, client2MessageCount); Assert.Equal(10, client3MessageCount); - Assert.Empty(client1.OutgoingPublishQueue); - Assert.Empty(client2.OutgoingPublishQueue); - Assert.Empty(client3.OutgoingPublishQueue); + Assert.Equal(0, client1.OutgoingPublishQueue.Count); + Assert.Equal(0, client2.OutgoingPublishQueue.Count); + Assert.Equal(0, client3.OutgoingPublishQueue.Count); - Assert.Empty(client1.ReceivedQueue); - Assert.Empty(client2.ReceivedQueue); - Assert.Empty(client3.ReceivedQueue); + Assert.Equal(0, client1.ReceivedQueue.Count); + Assert.Equal(0, client2.ReceivedQueue.Count); + Assert.Equal(0, client3.ReceivedQueue.Count); - Assert.Empty(client1.SendQueue); - Assert.Empty(client2.SendQueue); - Assert.Empty(client3.SendQueue); + Assert.Equal(0, client1.SendQueue.Count); + Assert.Equal(0, client2.SendQueue.Count); + Assert.Equal(0, client3.SendQueue.Count); Assert.Empty(client1.TransactionQueue); Assert.Empty(client2.TransactionQueue); @@ -424,17 +424,17 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) Assert.Equal(10, client2MessageCount); Assert.Equal(10, client3MessageCount); - Assert.Empty(client1.OutgoingPublishQueue); - Assert.Empty(client2.OutgoingPublishQueue); - Assert.Empty(client3.OutgoingPublishQueue); + Assert.Equal(0, client1.OutgoingPublishQueue.Count); + Assert.Equal(0, client2.OutgoingPublishQueue.Count); + Assert.Equal(0, client3.OutgoingPublishQueue.Count); - Assert.Empty(client1.ReceivedQueue); - Assert.Empty(client2.ReceivedQueue); - Assert.Empty(client3.ReceivedQueue); + Assert.Equal(0, client1.ReceivedQueue.Count); + Assert.Equal(0, client2.ReceivedQueue.Count); + Assert.Equal(0, client3.ReceivedQueue.Count); - Assert.Empty(client1.SendQueue); - Assert.Empty(client2.SendQueue); - Assert.Empty(client3.SendQueue); + Assert.Equal(0, client1.SendQueue.Count); + Assert.Equal(0, client2.SendQueue.Count); + Assert.Equal(0, client3.SendQueue.Count); Assert.Empty(client1.TransactionQueue); Assert.Empty(client2.TransactionQueue);