From 17f8cbffc585f1cd542609973f88872ea806c476 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 27 Jun 2024 10:27:59 +0200 Subject: [PATCH] Refactor Packet ID Generation (#179) --- Source/HiveMQtt/Client/HiveMQClient.cs | 27 +++--- .../Client/HiveMQClientTrafficProcessor.cs | 74 +++++++++++----- Source/HiveMQtt/Client/HiveMQClientUtil.cs | 15 ---- .../Client/Options/HiveMQClientOptions.cs | 2 +- .../Client/internal/BoundedDictionaryX.cs | 7 ++ .../Client/internal/PacketIDManagerr.cs | 87 +++++++++++++++++++ .../HiveMQtt/MQTT5/Packets/PublishPacket.cs | 10 +++ .../MQTT5/Packets/UnsubscribePacket.cs | 2 +- 8 files changed, 175 insertions(+), 49 deletions(-) create mode 100644 Source/HiveMQtt/Client/internal/PacketIDManagerr.cs diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index c2ec4fd2..8ee5b5a7 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -97,7 +97,7 @@ public async Task ConnectAsync() // Construct the MQTT Connect packet and queue to send var connPacket = new ConnectPacket(this.Options); - Logger.Trace($"Queuing packet for send: {connPacket.GetType().Name} id={connPacket.PacketIdentifier}"); + Logger.Trace($"Queuing CONNECT packet for send."); this.SendQueue.Enqueue(connPacket); ConnAckPacket connAck; @@ -174,7 +174,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) EventHandler eventHandler = TaskHandler; this.OnDisconnectSent += eventHandler; - Logger.Trace($"Queuing packet for send: {disconnectPacket.GetType().Name} id={disconnectPacket.PacketIdentifier}"); + Logger.Trace($"Queuing DISCONNECT packet for send."); this.SendQueue.Enqueue(disconnectPacket); try @@ -201,23 +201,25 @@ public async Task PublishAsync(MQTT5PublishMessage message, Cance { message.Validate(); - var packetIdentifier = this.GeneratePacketIdentifier(); - var publishPacket = new PublishPacket(message, (ushort)packetIdentifier); - // QoS 0: Fast Service if (message.QoS == QualityOfService.AtMostOnceDelivery) { - Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); + var publishPacket = new PublishPacket(message, 0); + Logger.Trace($"Queuing QoS 0 publish packet for send: {publishPacket.GetType().Name}"); + this.OutgoingPublishQueue.Enqueue(publishPacket); return new PublishResult(publishPacket.Message); } else if (message.QoS == QualityOfService.AtLeastOnceDelivery) { // QoS 1: Acknowledged Delivery - Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); + var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); + var publishPacket = new PublishPacket(message, (ushort)packetIdentifier); + PubAckPacket pubAckPacket; + + Logger.Trace($"Queuing QoS 1 publish packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); - PubAckPacket pubAckPacket; try { // Wait on the QoS 1 handshake @@ -236,8 +238,11 @@ public async Task PublishAsync(MQTT5PublishMessage message, Cance else if (message.QoS == QualityOfService.ExactlyOnceDelivery) { // QoS 2: Assured Delivery + var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); + var publishPacket = new PublishPacket(message, (ushort)packetIdentifier); PublishResult? publishResult = null; - Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); + + Logger.Trace($"Queuing QoS 2 publish packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); List packetList; @@ -314,7 +319,7 @@ public async Task SubscribeAsync(SubscribeOptions options) // FIXME: We should only ever have one subscribe in flight at any time (for now) // Construct the MQTT Subscribe packet - var packetIdentifier = this.GeneratePacketIdentifier(); + var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier); // Setup the task completion source to wait for the SUBACK @@ -422,7 +427,7 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp // Fire the corresponding event this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions); - var packetIdentifier = this.GeneratePacketIdentifier(); + var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false); var unsubscribePacket = new UnsubscribePacket(unsubOptions, (ushort)packetIdentifier); var taskCompletionSource = new TaskCompletionSource(); diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index eae79c81..f7120ced 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -44,6 +44,8 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient // Outgoing Publish QoS > 0 in-flight transactions indexed by packet identifier internal BoundedDictionaryX> OPubTransactionQueue { get; set; } + internal PacketIDManager PacketIDManager { get; } = new(); + private readonly Stopwatch lastCommunicationTimer = new(); /// @@ -101,6 +103,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task Logger.Debug($"{this.Options.ClientId}-(CM)- OPubTransactionQueue:....{this.OPubTransactionQueue.Count}/{this.OPubTransactionQueue.Capacity}"); Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:....{this.IPubTransactionQueue.Count}/{this.IPubTransactionQueue.Capacity}"); Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:......{this.Subscriptions.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- PacketIDsInUse:..........{this.PacketIDManager.Count}"); // Background Tasks Health Check await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); @@ -157,10 +160,11 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = FlushResult writeResult = default; var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { + Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending QoS={publishPacket.Message.QoS} PublishPacket id={publishPacket.PacketIdentifier}"); + // QoS > 0 - Add to transaction queue. OPubTransactionQueue will block when necessary // to respect the broker's ReceiveMaximum var success = await this.OPubTransactionQueue.AddAsync( @@ -174,6 +178,10 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = continue; } } + else + { + Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending QoS 0 PublishPacket"); + } writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); this.OnPublishSentEventLauncher(publishPacket); @@ -243,12 +251,12 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. { // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. case ConnectPacket connectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket"); writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); this.OnConnectSentEventLauncher(connectPacket); break; case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket"); writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); this.OnDisconnectSentEventLauncher(disconnectPacket); break; @@ -266,7 +274,7 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. case PubAckPacket pubAckPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); - this.HandleSentPubAckPacket(pubAckPacket); + await this.HandleSentPubAckPacketAsync(pubAckPacket).ConfigureAwait(false); break; case PubRecPacket pubRecPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); @@ -281,11 +289,11 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. case PubCompPacket pubCompPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); - this.HandleSentPubCompPacket(pubCompPacket); + await this.HandleSentPubCompPacketAsync(pubCompPacket).ConfigureAwait(false); break; case PingReqPacket pingReqPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket"); writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); this.OnPingReqSentEventLauncher(pingReqPacket); break; @@ -496,23 +504,23 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => break; case PublishPacket publishPacket: - this.HandleIncomingPublishPacket(publishPacket); + await this.HandleIncomingPublishPacketAsync(publishPacket).ConfigureAwait(false); break; case PubAckPacket pubAckPacket: - this.HandleIncomingPubAckPacket(pubAckPacket); + await this.HandleIncomingPubAckPacketAsync(pubAckPacket).ConfigureAwait(false); break; case PubRecPacket pubRecPacket: - this.HandleIncomingPubRecPacket(pubRecPacket); + await this.HandleIncomingPubRecPacketAsync(pubRecPacket).ConfigureAwait(false); break; case PubRelPacket pubRelPacket: this.HandleIncomingPubRelPacket(pubRelPacket); break; case PubCompPacket pubCompPacket: - this.HandleIncomingPubCompPacket(pubCompPacket); + await this.HandleIncomingPubCompPacketAsync(pubCompPacket).ConfigureAwait(false); break; case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp"); this.OnPingRespReceivedEventLauncher(pingRespPacket); break; @@ -557,7 +565,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => /// The received ConnAck packet. internal void HandleIncomingConnAckPacket(ConnAckPacket connAckPacket) { - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck"); if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) { Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); @@ -587,14 +595,16 @@ internal async Task HandleIncomingDisconnectPacketAsync(DisconnectPacket disconn /// Handle an incoming Publish packet. /// /// The received publish packet. - internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) + /// A task that represents the asynchronous operation. + internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket) { - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}"); - this.OnPublishReceivedEventLauncher(publishPacket); bool success; + this.OnPublishReceivedEventLauncher(publishPacket); + if (publishPacket.Message.QoS is QualityOfService.AtMostOnceDelivery) { + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received QoS 0 Publish"); this.OnMessageReceivedEventLauncher(publishPacket); } else if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) @@ -603,7 +613,8 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) // by ConnectionReaderAsync to enforce the client's ReceiveMaximum // Send a PubAck and update the chain. Once the PubAckPacket is sent, // the transaction chain will be deleted and the appropriate events will be - // launched in HandleSentPubAckPacket. + // launched in HandleSentPubAckPacketAsync. + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received QoS 1 Publish id={publishPacket.PacketIdentifier}"); var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); success = this.IPubTransactionQueue.TryGetValue(publishPacket.PacketIdentifier, out var publishQoS1Chain); @@ -620,6 +631,7 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) { Logger.Error($"QoS1: Couldn't update Publish --> PubAck QoS1 Chain for packet identifier {publishPacket.PacketIdentifier}. Discarded."); this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _); + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false); var opts = new DisconnectOptions { @@ -640,6 +652,7 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) // by ConnectionReaderAsync to enforce the client's ReceiveMaximum. // Send a PubRec and add to QoS2 transaction register. Once PubComp is sent, // Subscribers will be notified and the transaction chain will be deleted. + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received QoS 2 Publish id={publishPacket.PacketIdentifier}"); var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); // Get the QoS2 transaction chain for this packet identifier and add the PubRec to it @@ -653,6 +666,7 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) { Logger.Error($"QoS2: Couldn't update Publish --> PubRec QoS2 Chain for packet identifier {publishPacket.PacketIdentifier}. Discarded."); this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _); + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false); } } else @@ -670,8 +684,9 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) /// Handle an incoming PubAck packet. /// /// The received PubAck packet. + /// A task that represents the asynchronous operation. /// Raised if the packet identifier is unknown. - internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) + internal async Task HandleIncomingPubAckPacketAsync(PubAckPacket pubAckPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); this.OnPubAckReceivedEventLauncher(pubAckPacket); @@ -690,13 +705,16 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) { Logger.Warn($"QoS1: Received PubAck with an unknown packet identifier {pubAckPacket.PacketIdentifier}. Discarded."); } + + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubAckPacket.PacketIdentifier).ConfigureAwait(false); } /// /// Handle an incoming PubRec packet. /// /// The received PubRec packet. - internal async void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) + /// A task that represents the asynchronous operation. + internal async Task HandleIncomingPubRecPacketAsync(PubRecPacket pubRecPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); this.OnPubRecReceivedEventLauncher(pubRecPacket); @@ -723,6 +741,7 @@ internal async void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) { Logger.Error($"QoS2: Couldn't update PubRec --> PubRel QoS2 Chain for packet identifier {pubRecPacket.PacketIdentifier}."); this.OPubTransactionQueue.Remove(pubRecPacket.PacketIdentifier, out _); + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubRecPacket.PacketIdentifier).ConfigureAwait(false); // FIXME: Send an appropriate disconnect packet? await this.HandleDisconnectionAsync(false).ConfigureAwait(false); @@ -781,7 +800,9 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) /// Handle an incoming PubComp packet. /// /// The received PubComp packet. - internal void HandleSentPubAckPacket(PubAckPacket pubAckPacket) + /// Raised if the packet identifier is unknown. + /// A task that represents the asynchronous operation. + internal async Task HandleSentPubAckPacketAsync(PubAckPacket pubAckPacket) { // Remove the transaction chain from the transaction queue var success = this.IPubTransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain); @@ -803,6 +824,9 @@ internal void HandleSentPubAckPacket(PubAckPacket pubAckPacket) Logger.Warn($"QoS1: Couldn't remove PubAck --> Publish QoS1 Chain for packet identifier {pubAckPacket.PacketIdentifier}."); } + // Release the packet identifier + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubAckPacket.PacketIdentifier).ConfigureAwait(false); + // The Packet Event this.OnPubAckSentEventLauncher(pubAckPacket); } @@ -811,7 +835,8 @@ internal void HandleSentPubAckPacket(PubAckPacket pubAckPacket) /// Action to take once a PubComp packet is sent. /// /// The sent PubComp packet. - internal void HandleSentPubCompPacket(PubCompPacket pubCompPacket) + /// A task that represents the asynchronous operation. + internal async Task HandleSentPubCompPacketAsync(PubCompPacket pubCompPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Sent PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); @@ -832,6 +857,9 @@ internal void HandleSentPubCompPacket(PubCompPacket pubCompPacket) } } + // Release the packet identifier + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubCompPacket.PacketIdentifier).ConfigureAwait(false); + // Trigger the general event this.OnPubCompSentEventLauncher(pubCompPacket); } @@ -841,7 +869,8 @@ internal void HandleSentPubCompPacket(PubCompPacket pubCompPacket) /// /// The received PubComp packet. /// Raised if the packet identifier is unknown. - internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) + /// A task that represents the asynchronous operation. + internal async Task HandleIncomingPubCompPacketAsync(PubCompPacket pubCompPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); this.OnPubCompReceivedEventLauncher(pubCompPacket); @@ -862,6 +891,9 @@ internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) { Logger.Warn($"QoS2: Received PubComp with an unknown packet identifier {pubCompPacket.PacketIdentifier}. Discarded."); } + + // Release the packet identifier + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(pubCompPacket.PacketIdentifier).ConfigureAwait(false); } /// diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index 6adbd2f3..9aebdd11 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -23,7 +23,6 @@ namespace HiveMQtt.Client; public partial class HiveMQClient : IDisposable, IHiveMQClient { private bool disposed; - private int lastPacketId; /// /// Validates whether a subscription already exists. @@ -131,20 +130,6 @@ public static bool MatchTopic(string pattern, string candidate) return Regex.IsMatch(candidate, regexp); } - /// - /// Generate a packet identifier. - /// - /// A valid packet identifier. - protected int GeneratePacketIdentifier() - { - if (this.lastPacketId == ushort.MaxValue) - { - this.lastPacketId = 0; - } - - return Interlocked.Increment(ref this.lastPacketId); - } - /// /// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0. /// diff --git a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs index c1fb077e..2c32722d 100644 --- a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs +++ b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs @@ -286,7 +286,7 @@ public void Validate() this.GenerateClientID(); } - if (this.ClientId.Length > 23) + if (this.ClientId is not null && this.ClientId.Length > 23) { Logger.Info($"Client ID {this.ClientId} is longer than 23 characters. This may cause issues with some brokers."); } diff --git a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs index 2b285002..9fd4886d 100644 --- a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs +++ b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs @@ -145,6 +145,13 @@ public bool Remove(TKey key, out TVal value) /// true if the item was retrieved; otherwise, false. public bool TryGetValue(TKey key, out TVal value) => this.dictionary.TryGetValue(key, out value); + /// + /// Determines whether the dictionary contains the specified key. + /// + /// The key to locate in the dictionary. + /// true if the dictionary contains an element with the specified key; otherwise, false. + public bool ContainsKey(TKey key) => this.dictionary.ContainsKey(key); + /// /// Removes all items from the dictionary. /// diff --git a/Source/HiveMQtt/Client/internal/PacketIDManagerr.cs b/Source/HiveMQtt/Client/internal/PacketIDManagerr.cs new file mode 100644 index 00000000..76a16215 --- /dev/null +++ b/Source/HiveMQtt/Client/internal/PacketIDManagerr.cs @@ -0,0 +1,87 @@ +namespace HiveMQtt.Client.Internal; + +using System.Collections; + +public class PacketIDManager +{ + private HashSet PacketIDsInUse { get; } = new(); + + private BitArray PacketIDBitArray { get; } = new BitArray(65536); + + private SemaphoreSlim SemLock { get; } = new(1, 1); + + private int LastPacketId { get; set; } = 1; + + public PacketIDManager() => this.PacketIDBitArray.SetAll(false); + + /// + /// Gets the next available packet ID. + /// + /// The next available packet ID. + public async Task GetAvailablePacketIDAsync() + { + // Obtain the lock + await this.SemLock.WaitAsync().ConfigureAwait(false); + + var candidate = this.FindNextAvailablePacketID(); + this.PacketIDsInUse.Add(candidate); + this.PacketIDBitArray[candidate] = true; + + // Release the lock + this.SemLock.Release(); + + return candidate; + } + + /// + /// Marks a packet ID as available. + /// + /// The packet ID to mark as available. + /// A representing the asynchronous operation. + public async Task MarkPacketIDAsAvailableAsync(int packetId) + { + // Obtain the lock + await this.SemLock.WaitAsync().ConfigureAwait(false); + + this.PacketIDsInUse.Remove(packetId); + this.PacketIDBitArray[packetId] = false; + + // Release the lock + this.SemLock.Release(); + } + + /// + /// Finds the next available packet ID. + /// + /// The next available packet ID. + /// Thrown when no available packet IDs are available. + internal int FindNextAvailablePacketID() + { + // Loop through starting at the last served packet ID + for (var i = this.LastPacketId; i <= 65535; i++) + { + if (!this.PacketIDsInUse.Contains(i) && !this.PacketIDBitArray[i]) + { + this.LastPacketId = i; + return i; + } + } + + // We hit the end of the range, loop from the beginning + for (var i = 1; i < this.LastPacketId; i++) + { + if (!this.PacketIDsInUse.Contains(i) && !this.PacketIDBitArray[i]) + { + this.LastPacketId = i; + return i; + } + } + + throw new InvalidOperationException("No available packet IDs"); + } + + /// + /// Gets the number of packet IDs in use. + /// + public int Count => this.PacketIDsInUse.Count; +} diff --git a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs index ffd58a7f..cdeaa4ec 100644 --- a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs @@ -37,9 +37,19 @@ public class PublishPacket : ControlPacket /// A unique packet identifier for the packet to be created. public PublishPacket(MQTT5PublishMessage message, int packetIdentifier) { + // PacketIdentifier is only used in transactional packets. The invalid value 0 is used when on QoS 0 + // packets since it won't be encoded anyways. this.PacketIdentifier = (ushort)packetIdentifier; this.Message = message; + if (this.Message.QoS != QualityOfService.AtMostOnceDelivery) + { + if (this.PacketIdentifier is < 1 or > 65535) + { + throw new ArgumentException("PacketIdentifier must be a valid value for QoS 1 and QoS 2 packets."); + } + } + // Setup the QoS 1 TaskCompletionSource so users can simply call // // await PublishPacket.OnPublishQoS1CompleteTCS diff --git a/Source/HiveMQtt/MQTT5/Packets/UnsubscribePacket.cs b/Source/HiveMQtt/MQTT5/Packets/UnsubscribePacket.cs index a097b9dc..0e356a5a 100644 --- a/Source/HiveMQtt/MQTT5/Packets/UnsubscribePacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/UnsubscribePacket.cs @@ -57,7 +57,7 @@ public byte[] Encode() using (var vhAndPayloadStream = new MemoryStream()) { // Variable Header - EncodeTwoByteInteger(vhAndPayloadStream, this.PacketIdentifier); + _ = EncodeTwoByteInteger(vhAndPayloadStream, (int)this.PacketIdentifier); this.EncodeProperties(vhAndPayloadStream); // Payload