diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 51a23851..482dfac6 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -57,6 +57,10 @@ public HiveMQClient(HiveMQClientOptions? options = null) this.Options = options; this.cancellationTokenSource = new CancellationTokenSource(); + this.ClientReceiveSemaphore = new SemaphoreSlim(this.Options.ClientReceiveMaximum); + + // Set protocol default until ConnAck is received + this.BrokerReceiveSemaphore = new SemaphoreSlim(65535); } /// @@ -167,7 +171,9 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) try { - disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + disconnectPacket = await taskCompletionSource.Task + .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .ConfigureAwait(false); } catch (TimeoutException) { @@ -179,9 +185,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.OnDisconnectSent -= eventHandler; } - await this.HandleDisconnectionAsync().ConfigureAwait(false); - - return true; + return await this.HandleDisconnectionAsync().ConfigureAwait(false); } /// @@ -210,8 +214,25 @@ public async Task PublishAsync(MQTT5PublishMessage message) Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); - // Wait on the QoS 1 handshake - var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + PubAckPacket pubAckPacket; + try + { + // Wait on the QoS 1 handshake + pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task + .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .ConfigureAwait(false); + } + catch (TimeoutException) + { + Logger.Error("PublishAsync: QoS 1 timeout. No PUBACK response received in time."); + var disconnectOptions = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.UnspecifiedError, + }; + await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false); + throw; + } + return new PublishResult(publishPacket.Message, pubAckPacket); } else if (message.QoS == QualityOfService.ExactlyOnceDelivery) @@ -225,24 +246,20 @@ public async Task PublishAsync(MQTT5PublishMessage message) try { // Wait on the QoS 2 handshake - packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task + .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .ConfigureAwait(false); } catch (TimeoutException) { Logger.Error("PublishAsync: QoS 2 timeout. No response received in time."); - // Remove the transaction chain - if (this.TransactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain)) + var disconnectOptions = new DisconnectOptions { - Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}."); - } - - // Prepare PublishResult - publishResult = new PublishResult(publishPacket.Message) - { - QoS2ReasonCode = null, + ReasonCode = DisconnectReasonCode.UnspecifiedError, }; - return publishResult; + await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false); + throw; } foreach (var packet in packetList) @@ -331,7 +348,9 @@ public async Task SubscribeAsync(SubscribeOptions options) SubscribeResult subscribeResult; try { - subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + subAck = await taskCompletionSource.Task + .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .ConfigureAwait(false); } catch (TimeoutException) { @@ -441,7 +460,9 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp UnsubscribeResult unsubscribeResult; try { - unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + unsubAck = await taskCompletionSource.Task + .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .ConfigureAwait(false); // FIXME: Validate that the packet identifier matches } @@ -488,6 +509,9 @@ private async Task HandleDisconnectionAsync(bool clean = true) // Cancel all background tasks and close the socket this.ConnectState = ConnectState.Disconnected; + + // Don't use CancelAsync here to maintain backwards compatibility + // with >=.net6.0. CancelAsync was introduced in .net8.0 this.cancellationTokenSource.Cancel(); this.CloseSocket(); diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index f6a3e4d5..a3cbb6d5 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -39,8 +39,15 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient internal AwaitableQueueX ReceivedQueue { get; } = new(); - // Transactional packets indexed by packet identifier - internal ConcurrentDictionary> TransactionQueue { get; } = new(); + // Incoming Publish QoS > 0 packets indexed by packet identifier + internal ConcurrentDictionary> IPubTransactionQueue { get; } = new(); + + // Outgoing Publish QoS > 0 packets indexed by packet identifier + internal ConcurrentDictionary> OPubTransactionQueue { get; } = new(); + + private SemaphoreSlim BrokerReceiveSemaphore { get; set; } + + internal SemaphoreSlim ClientReceiveSemaphore { get; } private readonly Stopwatch lastCommunicationTimer = new(); @@ -74,6 +81,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task { var keepAlivePeriod = this.Options.KeepAlive / 2; Logger.Trace($"{this.Options.ClientId}-(CM)- Starting...{this.ConnectState}"); + this.lastCommunicationTimer.Start(); while (true) { @@ -95,12 +103,14 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task } // Dumping Client State - Logger.Trace($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- SendQueue:............{this.SendQueue.Count}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- ReceivedQueue:........{this.ReceivedQueue.Count}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:.{this.OutgoingPublishQueue.Count}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:.....{this.TransactionQueue.Count}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- # of Subscriptions:...{this.Subscriptions.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- SendQueue:............{this.SendQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- ReceivedQueue:........{this.ReceivedQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:.{this.OutgoingPublishQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- BrokerReceiveMaxSem...{this.BrokerReceiveSemaphore.CurrentCount}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- OPubTransactionQueue:.{this.OPubTransactionQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:.{this.IPubTransactionQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:...{this.Subscriptions.Count}"); await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false); @@ -127,7 +137,6 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) => Task.Run( async () => { - this.lastCommunicationTimer.Start(); Logger.Trace($"{this.Options.ClientId}-(PW)- Starting...{this.ConnectState}"); while (true) @@ -141,30 +150,26 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = while (this.ConnectState != ConnectState.Connected) { Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); - await Task.Delay(1000).ConfigureAwait(false); + await Task.Delay(500).ConfigureAwait(false); continue; } - // Logger.Trace($"{this.Options.ClientId}-(PW)- {this.OutgoingPublishQueue.Count} publish packets waiting to be sent."); - var receiveMaximum = this.ConnectionProperties.ReceiveMaximum ?? 65535; - if (this.TransactionQueue.Count >= receiveMaximum) - { - Logger.Debug($"The Maximum number of publishes have been sent to broker. Waiting for existing transactions to complete."); - await Task.Delay(10).ConfigureAwait(false); - continue; - } - - var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); FlushResult writeResult = default; + var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { + // We have the next qos>0 publish packet to send + // Respect the broker's ReceiveMaximum + await this.BrokerReceiveSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // QoS > 0 - Add to transaction queue - if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) + if (!this.OPubTransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) { Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); + this.BrokerReceiveSemaphore.Release(); continue; } } @@ -183,8 +188,6 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); break; } - - this.lastCommunicationTimer.Restart(); } // while(true) Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); @@ -196,7 +199,6 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run( async () => { - this.lastCommunicationTimer.Start(); Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.ConnectState}"); while (true) @@ -216,7 +218,6 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. continue; } - // Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); var packet = await this.SendQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); FlushResult writeResult = default; @@ -246,29 +247,21 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. case PublishPacket publishPacket: throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); case PubAckPacket pubAckPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); this.OnPubAckSentEventLauncher(pubAckPacket); break; case PubRecPacket pubRecPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); this.OnPubRecSentEventLauncher(pubRecPacket); break; case PubRelPacket pubRelPacket: - // This is in response to a received PubRec packet. Communication chain management - // was done in the receiver code. Just send the response. Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); this.OnPubRelSentEventLauncher(pubRelPacket); break; case PubCompPacket pubCompPacket: - // This is in response to a received PubRel packet. Communication chain management - // was done in the receiver code. Just send the response. Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); this.OnPubCompSentEventLauncher(pubCompPacket); @@ -294,8 +287,6 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); break; } - - this.lastCommunicationTimer.Restart(); } // while(true) Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); @@ -382,13 +373,32 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => this.OnDisconnectReceivedEventLauncher(disconnectPacket); break; } - else - { - Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); - // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync - this.ReceivedQueue.Enqueue(decodedPacket); + if (decodedPacket is PublishPacket publishPacket) + { + // Limit the number of concurrent incoming QoS 1 and QoS 2 transactions + if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery || + publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) + { + while (true) + { + if (this.IPubTransactionQueue.Count >= this.Options.ClientReceiveMaximum) + { + Logger.Trace($"-(R)- The Maximum number of concurrent publishes have been received from broker. Applying back-pressure and waiting for existing transactions to complete."); + await Task.Delay(500).ConfigureAwait(false); + } + else + { + break; + } + } // while (true) + } } + + Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); + + // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync + this.ReceivedQueue.Enqueue(decodedPacket); } // while (buffer.Length > 0 await Task.Yield(); @@ -415,7 +425,6 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => break; } - // Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); if (this.Options.ClientMaximumPacketSize != null) { @@ -438,13 +447,17 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => { case ConnAckPacket connAckPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) + { + Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); + + // Replace the BrokerReceiveSemaphore with a new one with the broker's ReceiveMaximum + this.BrokerReceiveSemaphore = new SemaphoreSlim((int)connAckPacket.Properties.ReceiveMaximum); + } + this.ConnectionProperties = connAckPacket.Properties; this.OnConnAckReceivedEventLauncher(connAckPacket); break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier} {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - Logger.Warn($"We shouldn't get the disconnect here - Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); case PingRespPacket pingRespPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); this.OnPingRespReceivedEventLauncher(pingRespPacket); @@ -472,6 +485,11 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => case PubCompPacket pubCompPacket: this.HandleIncomingPubCompPacket(pubCompPacket); break; + case DisconnectPacket disconnectPacket: + // Disconnects are handled immediate and shouldn't be received here + // We leave this just as a sanity backup + Logger.Error($"{this.Options.ClientId}-(RPH)- Incorrectly received Disconnect packet in ReceivedPacketsHandlerAsync"); + throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); default: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); Logger.Error($"Unrecognized packet received. Will discard. {packet}"); @@ -492,24 +510,25 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}"); this.OnPublishReceivedEventLauncher(publishPacket); - if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) + if (publishPacket.Message.QoS is QualityOfService.AtMostOnceDelivery) + { + this.OnMessageReceivedEventLauncher(publishPacket); + } + else if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) { - // We've received a QoS 1 publish. Send a PubAck. + // We've received a QoS 1 publish. Send a PubAck and notify subscribers. var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); - - // FIXME We should wait until puback is sent before launching event - // FIXME Check DUP flag setting this.SendQueue.Enqueue(pubAckResponse); - publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckResponse); + this.OnMessageReceivedEventLauncher(publishPacket); } else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { // We've received a QoS 2 publish. Send a PubRec and add to QoS2 transaction register. + // When we get the PubRel, we'll notify subscribers and send the PubComp in HandleIncomingPubRelPacket var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); var publishQoS2Chain = new List { publishPacket, pubRecResponse }; - // FIXME: Wait for QoS 2 transaction to complete before calling OnMessageReceivedEventLauncher??? - if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain)) + if (!this.IPubTransactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain)) { Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an incoming QoS {publishPacket.Message.QoS} publish ."); pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; @@ -517,12 +536,10 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) this.SendQueue.Enqueue(pubRecResponse); } - - this.OnMessageReceivedEventLauncher(publishPacket); } /// - /// Handle an incoming ConnAck packet. + /// Handle an incoming PubAck packet. /// /// The received PubAck packet. /// Raised if the packet identifier is unknown. @@ -531,11 +548,17 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); this.OnPubAckReceivedEventLauncher(pubAckPacket); + // This is in response to a publish that we sent // Remove the transaction chain from the transaction queue - if (this.TransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) + if (this.OPubTransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) { var publishPacket = (PublishPacket)publishQoS1Chain.First(); + // We sent a QoS1 publish and received a PubAck. The transaction is complete. + + // Release the semaphore + this.BrokerReceiveSemaphore.Release(); + // Trigger the packet specific event publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckPacket); } @@ -549,13 +572,14 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) /// Handle an incoming PubRec packet. /// /// The received PubRec packet. - internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) + internal async void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); this.OnPubRecReceivedEventLauncher(pubRecPacket); + // This is in response to a publish that we sent // Find the QoS2 transaction chain for this packet identifier - if (this.TransactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var originalPublishQoS2Chain)) + if (this.OPubTransactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var originalPublishQoS2Chain)) { var originalPublishPacket = (PublishPacket)originalPublishQoS2Chain.First(); @@ -571,9 +595,14 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) }; // Update the chain in the queue - if (!this.TransactionQueue.TryUpdate(pubRecPacket.PacketIdentifier, newPublishQoS2Chain, originalPublishQoS2Chain)) + if (!this.OPubTransactionQueue.TryUpdate(pubRecPacket.PacketIdentifier, newPublishQoS2Chain, originalPublishQoS2Chain)) { - Logger.Warn($"QoS2: Couldn't update PubRec --> PubRel QoS2 Chain for packet identifier {pubRecPacket.PacketIdentifier}."); + Logger.Error($"QoS2: Couldn't update PubRec --> PubRel QoS2 Chain for packet identifier {pubRecPacket.PacketIdentifier}."); + this.OPubTransactionQueue.Remove(pubRecPacket.PacketIdentifier, out _); + this.BrokerReceiveSemaphore.Release(); + + // FIXME: Send an appropriate disconnect packet + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } // Send the PUBREL response @@ -596,7 +625,8 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); this.OnPubRelReceivedEventLauncher(pubRelPacket); - if (this.TransactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain)) + // This is in response to a publish that we received and already sent a pubrec + if (this.IPubTransactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain)) { var originalPublishPacket = (PublishPacket)originalPublishQoS2Chain.First(); @@ -604,7 +634,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.Success); // This QoS2 transaction chain is done. Remove it from the transaction queue. - if (this.TransactionQueue.TryRemove(pubRelPacket.PacketIdentifier, out var publishQoS2Chain)) + if (this.IPubTransactionQueue.TryRemove(pubRelPacket.PacketIdentifier, out var publishQoS2Chain)) { // Update the chain with the latest packets for the event launcher publishQoS2Chain.Add(pubRelPacket); @@ -612,6 +642,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) // Trigger the packet specific event originalPublishPacket.OnPublishQoS2CompleteEventLauncher(publishQoS2Chain); + this.OnMessageReceivedEventLauncher(originalPublishPacket); } else { @@ -641,14 +672,18 @@ internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); this.OnPubCompReceivedEventLauncher(pubCompPacket); + // This is in response to a QoS2 publish that we sent // Remove the QoS 2 transaction chain from the queue - if (this.TransactionQueue.Remove(pubCompPacket.PacketIdentifier, out var publishQoS2Chain)) + if (this.OPubTransactionQueue.Remove(pubCompPacket.PacketIdentifier, out var publishQoS2Chain)) { var originalPublishPacket = (PublishPacket)publishQoS2Chain.First(); // Update the chain with this PubComp packet for the event launcher publishQoS2Chain.Add(pubCompPacket); + // Release the semaphore + this.BrokerReceiveSemaphore.Release(); + // Trigger the packet specific event with the entire chain originalPublishPacket.OnPublishQoS2CompleteEventLauncher(publishQoS2Chain); } @@ -672,7 +707,9 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella throw new HiveMQttClientException("Writer is null"); } - return this.Writer.WriteAsync(source, cancellationToken); + var writeResult = this.Writer.WriteAsync(source, cancellationToken); + this.lastCommunicationTimer.Restart(); + return writeResult; } /// @@ -689,7 +726,7 @@ internal async ValueTask ReadAsync(CancellationToken cancellationTok } var readResult = await this.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); - Logger.Trace($"ReadAsync: Read Buffer Length {readResult.Buffer.Length}"); + this.lastCommunicationTimer.Restart(); return readResult; } } diff --git a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs index 0971f6cd..f9cf49e0 100644 --- a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs +++ b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs @@ -44,7 +44,9 @@ public HiveMQClientOptions() this.UseTLS = false; this.AllowInvalidBrokerCertificates = false; this.ClientCertificates = new X509CertificateCollection(); + this.ClientReceiveMaximum = 10; this.ConnectTimeoutInMs = 5000; + this.ResponseTimeoutInMs = 5000; } // Client Identifier to be used in the Client. Will be set automatically if not specified. @@ -105,7 +107,7 @@ public HiveMQClientOptions() /// value is absent then its value defaults to 65,535. /// /// - public int? ClientReceiveMaximum { get; set; } + public int ClientReceiveMaximum { get; set; } /// /// Gets or sets a value that indicates the maximum packet size that the MQTT client is willing @@ -178,6 +180,12 @@ public HiveMQClientOptions() /// public int ConnectTimeoutInMs { get; set; } + /// + /// Gets or sets the time in milliseconds to wait for a response in a transactional operation. + /// This could be a Publish, Subscribe, Unsubscribe, or Disconnect operation. + /// + public int ResponseTimeoutInMs { get; set; } + /// /// Generate a semi-random client identifier to be used in Client connections. /// hmqc#-pid-randomstring. @@ -222,11 +230,6 @@ public void Validate() this.UseTLS = true; } - if (this.ClientReceiveMaximum != null) - { - this.ClientReceiveMaximum = RangeValidateTwoByteInteger((int)this.ClientReceiveMaximum); - } - if (this.ClientMaximumPacketSize != null) { this.ClientMaximumPacketSize = RangeValidateFourByteInteger((long)this.ClientMaximumPacketSize); @@ -237,6 +240,12 @@ public void Validate() } } + this.ClientReceiveMaximum = RangeValidateTwoByteInteger(this.ClientReceiveMaximum); + if (this.ClientReceiveMaximum == 0) + { + this.ClientReceiveMaximum = 65535; + } + if (this.ClientTopicAliasMaximum != null) { this.ClientTopicAliasMaximum = RangeValidateTwoByteInteger((int)this.ClientTopicAliasMaximum); diff --git a/Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs b/Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs index 46387f5c..893b3084 100644 --- a/Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs @@ -160,11 +160,7 @@ internal void GatherConnectFlagsAndProperties() // Properties this.Properties.SessionExpiryInterval = (uint)this.clientOptions.SessionExpiryInterval; - - if (this.clientOptions.ClientReceiveMaximum != null) - { - this.Properties.ReceiveMaximum = (ushort)this.clientOptions.ClientReceiveMaximum; - } + this.Properties.ReceiveMaximum = (ushort)this.clientOptions.ClientReceiveMaximum; if (this.clientOptions.ClientMaximumPacketSize != null) { diff --git a/Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs b/Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs index 36d889e3..075c4a95 100644 --- a/Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs @@ -53,7 +53,7 @@ public byte[] Encode() using (var vhStream = new MemoryStream()) { // Variable Header - ControlPacket.EncodeTwoByteInteger(vhStream, this.PacketIdentifier); + EncodeTwoByteInteger(vhStream, this.PacketIdentifier); vhStream.WriteByte((byte)this.ReasonCode); this.EncodeProperties(vhStream); diff --git a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs index a8bc3882..ffd58a7f 100644 --- a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs @@ -89,7 +89,7 @@ internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet) { if (t.IsFaulted) { - Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception.Message); + Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception?.Message); } }, TaskScheduler.Default); @@ -120,7 +120,7 @@ internal virtual void OnPublishQoS2CompleteEventLauncher(List pac { if (t.IsFaulted) { - Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception.Message); + Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception?.Message); } }, TaskScheduler.Default); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs index bad98fc1..c72d14bb 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs @@ -48,7 +48,7 @@ public void WithBadClientReceiveMaximum() ClientReceiveMaximum = -300, }; options.Validate(); - Assert.Equal(0, options.ClientReceiveMaximum); + Assert.Equal(65535, options.ClientReceiveMaximum); options.ClientReceiveMaximum = int.MaxValue; options.Validate(); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs index 8efa7fac..36c097cf 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs @@ -197,7 +197,7 @@ public async Task ThreeNodeQoS0ChainedPublishesAsync() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); // client 2 Subscribe to the topic - var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + var subscribeResult = await client2.SubscribeAsync("HMQ/3NodeQoS0FirstTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); var client2MessageCount = 0; // client 2 will receive the message and republish it to another topic @@ -207,7 +207,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even Interlocked.Increment(ref client2MessageCount); if (sender is HiveMQClient client) { - var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtMostOnceDelivery).ConfigureAwait(true); + var publishResult = await client.PublishAsync("HMQ/3NodeQoS0SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtMostOnceDelivery).ConfigureAwait(true); Assert.NotNull(publishResult); } } @@ -216,7 +216,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even client2.OnMessageReceived += Client2MessageHandler; // client 3 Subscribe to the secondary topic - subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + subscribeResult = await client3.SubscribeAsync("HMQ/3NodeQoS0SecondTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); var client3MessageCount = 0; // client 3 will receive the final message @@ -234,7 +234,7 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) // client 1 Publish 100 messages for (var i = 1; i <= 10; i++) { - var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + var publishResult = await client1.PublishAsync("HMQ/3NodeQoS0FirstTopic", "Hello World", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); Assert.NotNull(publishResult); } @@ -255,9 +255,13 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) Assert.Equal(0, client2.SendQueue.Count); Assert.Equal(0, client3.SendQueue.Count); - Assert.Empty(client1.TransactionQueue); - Assert.Empty(client2.TransactionQueue); - Assert.Empty(client3.TransactionQueue); + Assert.Empty(client1.OPubTransactionQueue); + Assert.Empty(client2.OPubTransactionQueue); + Assert.Empty(client3.OPubTransactionQueue); + + Assert.Empty(client1.IPubTransactionQueue); + Assert.Empty(client2.IPubTransactionQueue); + Assert.Empty(client3.IPubTransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); @@ -288,7 +292,7 @@ public async Task ThreeNodeQoS1ChainedPublishesAsync() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); // client 2 Subscribe to the topic - var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + var subscribeResult = await client2.SubscribeAsync("HMQ/3NodeQoS1FirstTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); var client2MessageCount = 0; // client 2 will receive the message and republish it to another topic @@ -298,7 +302,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even Interlocked.Increment(ref client2MessageCount); if (sender is HiveMQClient client) { - var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + var publishResult = await client.PublishAsync("HMQ/3NodeQoS1SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); Assert.NotNull(publishResult); Assert.Equal(publishResult.QoS1ReasonCode, PubAckReasonCode.Success); } @@ -308,7 +312,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even client2.OnMessageReceived += Client2MessageHandler; // client 3 Subscribe to the secondary topic - subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + subscribeResult = await client3.SubscribeAsync("HMQ/3NodeQoS1SecondTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); var client3MessageCount = 0; @@ -325,7 +329,7 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) // client 1 Publish 10 messages for (var i = 1; i <= 10; i++) { - var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + var publishResult = await client1.PublishAsync("HMQ/3NodeQoS1FirstTopic", "Hello World", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); Assert.NotNull(publishResult); } @@ -346,9 +350,13 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) Assert.Equal(0, client2.SendQueue.Count); Assert.Equal(0, client3.SendQueue.Count); - Assert.Empty(client1.TransactionQueue); - Assert.Empty(client2.TransactionQueue); - Assert.Empty(client3.TransactionQueue); + Assert.Empty(client1.OPubTransactionQueue); + Assert.Empty(client2.OPubTransactionQueue); + Assert.Empty(client3.OPubTransactionQueue); + + Assert.Empty(client1.IPubTransactionQueue); + Assert.Empty(client2.IPubTransactionQueue); + Assert.Empty(client3.IPubTransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); @@ -379,7 +387,7 @@ public async Task ThreeNodeQoS2ChainedPublishesAsync() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); // client 2 Subscribe to the topic - var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + var subscribeResult = await client2.SubscribeAsync("HMQ/3NodeQoS2FirstTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); var client2MessageCount = 0; // client 2 will receive the message and republish it to another topic @@ -389,7 +397,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even Interlocked.Increment(ref client2MessageCount); var client = sender as HiveMQClient; #pragma warning disable CS8602 // Dereference of a possibly null reference. - var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true); + var publishResult = await client.PublishAsync("HMQ/3NodeQoS2SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true); #pragma warning restore CS8602 // Dereference of a possibly null reference. Assert.NotNull(publishResult); Assert.Equal(publishResult.QoS2ReasonCode, PubRecReasonCode.Success); @@ -399,7 +407,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even client2.OnMessageReceived += Client2MessageHandler; // client 3 Subscribe to the secondary topic - subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + subscribeResult = await client3.SubscribeAsync("HMQ/3NodeQoS2SecondTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); // client 3 will receive the final message var client3MessageCount = 0; @@ -415,7 +423,7 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) // client 1 Publish 10 messages for (var i = 1; i <= 10; i++) { - var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + var publishResult = await client1.PublishAsync("HMQ/3NodeQoS2FirstTopic", "Hello World", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); Assert.NotNull(publishResult); } @@ -436,9 +444,13 @@ void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) Assert.Equal(0, client2.SendQueue.Count); Assert.Equal(0, client3.SendQueue.Count); - Assert.Empty(client1.TransactionQueue); - Assert.Empty(client2.TransactionQueue); - Assert.Empty(client3.TransactionQueue); + Assert.Empty(client1.OPubTransactionQueue); + Assert.Empty(client2.OPubTransactionQueue); + Assert.Empty(client3.OPubTransactionQueue); + + Assert.Empty(client1.IPubTransactionQueue); + Assert.Empty(client2.IPubTransactionQueue); + Assert.Empty(client3.IPubTransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false);