From 7e9680acf914b42fd9aa2c87841d392133908d45 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 19 Mar 2024 11:51:22 +0100 Subject: [PATCH] Performance Improvements (#133) --- .../ClientBenchmarkApp/ClientBenchmark.cs | 35 +- .../docs/how-to/configure-logging.md | 31 +- Source/HiveMQtt/Client/HiveMQClient.cs | 42 +- .../Client/HiveMQClientOptionsBuilder.cs | 2 +- Source/HiveMQtt/Client/HiveMQClientSocket.cs | 33 +- .../Client/HiveMQClientTrafficProcessor.cs | 450 +++++++++--------- Source/HiveMQtt/Client/HiveMQClientUtil.cs | 42 +- Source/HiveMQtt/MQTT5/PacketDecoder.cs | 2 +- .../HiveMQtt.Test/HiveMQClient/OptionsTest.cs | 2 +- 9 files changed, 358 insertions(+), 281 deletions(-) diff --git a/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs b/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs index a55af0f6..92b5c8e5 100644 --- a/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs +++ b/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs @@ -12,13 +12,19 @@ namespace ClientBenchmarkApp; using HiveMQtt.MQTT5.ReasonCodes; using HiveMQtt.MQTT5.Types; -[SimpleJob(RunStrategy.Monitoring, iterationCount: 10, id: "MonitoringJob")] +[SimpleJob(RunStrategy.Monitoring, iterationCount: 100, id: "MonitoringJob")] public class ClientBenchmarks : IDisposable { private readonly string smallPayload = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}"); private HiveMQClient client; + public ClientBenchmarks() + { + Console.WriteLine("Starting HiveMQ client benchmarks..."); + this.client = null!; + } + [GlobalSetup] public async Task SetupAsync() { @@ -29,6 +35,7 @@ public async Task SetupAsync() }; this.client = new HiveMQClient(options); + Console.WriteLine($"Connecting to {options.Host} on port {options.Port}..."); await this.client.ConnectAsync().ConfigureAwait(false); @@ -50,22 +57,24 @@ public async Task CleanUpAsync() } [Benchmark(Description = "Publish a QoS 0 messages to the broker.")] - public async Task PublishQoS0MessageAsync() - { - await this.client.PublishAsync("benchmarks/PublishQoS0Messages", this.smallPayload).ConfigureAwait(false); - } + public async Task PublishQoS0MessageAsync() => + await this.client.PublishAsync( + "benchmarks/PublishQoS0Messages", + this.smallPayload).ConfigureAwait(false); [Benchmark(Description = "Publish a QoS 1 messages to the broker.")] - public async Task PublishQoS1MessageAsync() - { - await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); - } + public async Task PublishQoS1MessageAsync() => + await this.client.PublishAsync( + "benchmarks/PublishQoS1Messages", + this.smallPayload, + QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); [Benchmark(Description = "Publish a QoS 2 messages to the broker.")] - public async Task PublishQoS2MessageAsync() - { - await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); - } + public async Task PublishQoS2MessageAsync() => + await this.client.PublishAsync( + "benchmarks/PublishQoS2Messages", + this.smallPayload, + QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); public void Dispose() => GC.SuppressFinalize(this); } diff --git a/Documentation/docs/how-to/configure-logging.md b/Documentation/docs/how-to/configure-logging.md index 18796510..2f207bcb 100644 --- a/Documentation/docs/how-to/configure-logging.md +++ b/Documentation/docs/how-to/configure-logging.md @@ -23,14 +23,29 @@ The HiveMQtt package uses [NLog](https://github.com/NLog/NLog) and can be config Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package down to packet and event handling. Using this level will produce a lot of output such as the following: ```log -2023-10-04 16:56:54.9373|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher -2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|7: TrafficInflowProcessor Starting...Connecting -2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|9: TrafficOutflowProcessor Starting...Connecting -2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|--> ConnectPacket -2023-10-04 16:56:55.0128|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher -2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|<-- ConnAck -2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher -2023-10-04 16:56:55.0379|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher +2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend: +2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter +2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader +2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler +2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883 +2024-03-14 15:40:18.2343|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher +2024-03-14 15:40:18.2460|TRACE|HiveMQtt.Client.HiveMQClient|Socket connected to 127.0.0.1:1883 +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|Queuing packet for send: HiveMQtt.MQTT5.Packets.ConnectPacket +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- Starting...Connecting +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- ConnectionReader Starting...Connecting +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|5: ConnectionMonitor Starting...Connecting +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- 0 received packets currently waiting to be processed. +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter Starting...Connecting +2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 1 packets waiting to be sent. +2024-03-14 15:40:18.2476|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- --> Sending ConnectPacket id=0 +2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher +2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 0 packets waiting to be sent. +2024-03-14 15:40:18.2732|TRACE|HiveMQtt.Client.HiveMQClient|ReadAsync: Read Buffer Length 11 +2024-03-14 15:40:18.2765|TRACE|HiveMQtt.MQTT5.PacketDecoder|PacketDecoder: Decoded Packet: consumed=11, packet=HiveMQtt.MQTT5.Packets.ConnAckPacket id=0 +2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- <-- Received ConnAckPacket. Adding to receivedQueue. +2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- <-- Received ConnAck id=0 +2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher +2024-03-14 15:40:18.2775|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher ``` ## See Also diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index da6d18bb..e875bce2 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -44,8 +44,16 @@ public HiveMQClient(HiveMQClientOptions? options = null) options ??= new HiveMQClientOptions(); options.Validate(); + Logger.Trace($"New client created: Client ID: {options.ClientId}"); + + Logger.Trace("Trace Level Logging Legend:"); + Logger.Trace(" -(W)- == ConnectionWriter"); + Logger.Trace(" -(R)- == ConnectionReader"); + Logger.Trace(" -(CM)- == ConnectionMonitor"); + Logger.Trace(" -(RPH)- == ReceivedPacketsHandler"); + this.Options = options; - this.cancellationSource = new CancellationTokenSource(); + this.cancellationTokenSource = new CancellationTokenSource(); } /// @@ -80,7 +88,8 @@ public async Task ConnectAsync() // Construct the MQTT Connect packet and queue to send var connPacket = new ConnectPacket(this.Options); - this.sendQueue.Enqueue(connPacket); + Logger.Trace($"Queuing packet for send: {connPacket}"); + this.sendQueue.Add(connPacket); // FIXME: Cancellation token and better timeout value ConnAckPacket connAck; @@ -126,6 +135,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) { if (this.connectState != ConnectState.Connected) { + Logger.Warn("DisconnectAsync: Client is not connected."); return false; } @@ -149,7 +159,8 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) EventHandler eventHandler = TaskHandler; this.OnDisconnectSent += eventHandler; - this.sendQueue.Enqueue(disconnectPacket); + Logger.Trace($"Queuing packet for send: {disconnectPacket}"); + this.sendQueue.Add(disconnectPacket); try { @@ -173,8 +184,17 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.connectState = ConnectState.Disconnected; - // Clear the send queue - this.sendQueue.Clear(); + // FIXME + if (this.sendQueue.Count > 0) + { + Logger.Warn("Disconnect: Send queue not empty. Packets pending but we are disconnecting."); + } + + // We only clear the send queue on explicit disconnect + while (this.sendQueue.TryTake(out _)) + { + } + return true; } @@ -189,7 +209,8 @@ public async Task PublishAsync(MQTT5PublishMessage message) // QoS 0: Fast Service if (message.QoS == QualityOfService.AtMostOnceDelivery) { - this.sendQueue.Enqueue(publishPacket); + Logger.Trace($"Queuing packet for send: {publishPacket}"); + this.sendQueue.Add(publishPacket); return new PublishResult(publishPacket.Message); } else if (message.QoS == QualityOfService.AtLeastOnceDelivery) @@ -201,7 +222,8 @@ public async Task PublishAsync(MQTT5PublishMessage message) publishPacket.OnPublishQoS1Complete += eventHandler; // Construct the MQTT Connect packet and queue to send - this.sendQueue.Enqueue(publishPacket); + Logger.Trace($"Queuing packet for send: {publishPacket}"); + this.sendQueue.Add(publishPacket); var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); @@ -217,7 +239,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) publishPacket.OnPublishQoS2Complete += eventHandler; // Construct the MQTT Connect packet and queue to send - this.sendQueue.Enqueue(publishPacket); + this.sendQueue.Add(publishPacket); // Wait on the QoS 2 handshake var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); @@ -300,7 +322,7 @@ public async Task SubscribeAsync(SubscribeOptions options) this.OnSubAckReceived += eventHandler; // Queue the constructed packet to be sent on the wire - this.sendQueue.Enqueue(subscribePacket); + this.sendQueue.Add(subscribePacket); SubAckPacket subAck; SubscribeResult subscribeResult; @@ -404,7 +426,7 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp EventHandler eventHandler = TaskHandler; this.OnUnsubAckReceived += eventHandler; - this.sendQueue.Enqueue(unsubscribePacket); + this.sendQueue.Add(unsubscribePacket); // FIXME: Cancellation token and better timeout value UnsubAckPacket unsubAck; diff --git a/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs b/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs index c9a83838..597fee84 100644 --- a/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs +++ b/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs @@ -192,7 +192,7 @@ public HiveMQClientOptionsBuilder WithClientCertificate(string clientCertificate } else { - Logger.Error("File does not exist."); + Logger.Error("WithClientCertificate: The specified client certificate file does not exist."); throw new FileNotFoundException(); } } diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs index c79645ff..f25be2cf 100644 --- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs +++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs @@ -34,15 +34,13 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient private Stream? stream; private PipeReader? reader; private PipeWriter? writer; - private CancellationTokenSource cancellationSource; - private CancellationToken outFlowCancellationToken; - private CancellationToken inFlowCancellationToken; - private CancellationToken receivedPacketsCancellationToken; + private CancellationTokenSource cancellationTokenSource; #pragma warning disable IDE0052 - private Task? trafficOutflowProcessorTask; - private Task? trafficInflowProcessorTask; - private Task? receivedPacketsProcessorAsync; + private Task? connectionWriterTask; + private Task? connectionReaderTask; + private Task? receivedPacketsHandlerAsync; + private Task? connectionMonitorTask; #pragma warning restore IDE0052 /// @@ -173,18 +171,14 @@ internal async Task ConnectSocketAsync() this.writer = PipeWriter.Create(this.stream); // Reset the CancellationTokenSource in case this is a reconnect - this.cancellationSource.Dispose(); - this.cancellationSource = new CancellationTokenSource(); - - // Setup the cancellation tokens - this.outFlowCancellationToken = this.cancellationSource.Token; - this.inFlowCancellationToken = this.cancellationSource.Token; - this.receivedPacketsCancellationToken = this.cancellationSource.Token; + this.cancellationTokenSource.Dispose(); + this.cancellationTokenSource = new CancellationTokenSource(); // Start the traffic processors - this.trafficOutflowProcessorTask = this.TrafficOutflowProcessorAsync(this.outFlowCancellationToken); - this.trafficInflowProcessorTask = this.TrafficInflowProcessorAsync(this.inFlowCancellationToken); - this.receivedPacketsProcessorAsync = this.ReceivedPacketsProcessorAsync(this.receivedPacketsCancellationToken); + this.connectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token); + this.connectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token); + this.receivedPacketsHandlerAsync = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token); + this.connectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token); Logger.Trace($"Socket connected to {this.socket.RemoteEndPoint}"); return socketConnected; @@ -254,6 +248,9 @@ private async Task CreateTLSConnectionAsync(Stream stream) internal bool CloseSocket(bool? shutdownPipeline = true) { + // Cancel the background traffic processing tasks + this.cancellationTokenSource.Cancel(); + if (shutdownPipeline == true) { // Shutdown the pipeline @@ -265,8 +262,6 @@ internal bool CloseSocket(bool? shutdownPipeline = true) this.socket?.Shutdown(SocketShutdown.Both); this.socket?.Close(); - this.cancellationSource.Cancel(); - return true; } } diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 233ec5dd..44fb677a 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -30,202 +30,237 @@ namespace HiveMQtt.Client; /// public partial class HiveMQClient : IDisposable, IHiveMQClient { - // The outgoing packet queue. Packets queued to be sent. - private readonly ConcurrentQueue sendQueue = new(); + private readonly BlockingCollection sendQueue = new(); - private readonly ConcurrentQueue receivedQueue = new(); + private readonly BlockingCollection receivedQueue = new(); // Transactional packets indexed by packet identifier private readonly ConcurrentDictionary> transactionQueue = new(); + private readonly Stopwatch lastCommunicationTimer = new Stopwatch(); + /// - /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue. + /// Asynchronous background task that monitors the connection state and sends PingReq packets when + /// necessary. /// - private Task TrafficOutflowProcessorAsync(CancellationToken cancellationToken) => Task.Run( + /// The cancellation token. + /// A boolean return indicating exit state. + private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task.Run( async () => { - var stopWatch = new Stopwatch(); var keepAlivePeriod = this.Options.KeepAlive / 2; + Logger.Trace($"{this.Options.ClientId}-(CM)- Starting...{this.connectState}"); - stopWatch.Start(); - - Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Starting...{this.connectState}"); - - while (this.connectState != ConnectState.Disconnected) + while (true) { - if (stopWatch.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) + if (cancellationToken.IsCancellationRequested) { - // Send PingReq - Logger.Trace("--> PingReq"); - var writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); - this.OnPingReqSentEventLauncher(new PingReqPacket()); - stopWatch.Restart(); + Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled"); + break; } - if (this.sendQueue.IsEmpty) + // If connected and no recent traffic, send a ping + if (this.connectState == ConnectState.Connected) { - if (this.connectState == ConnectState.Disconnecting) - { - return true; - } - else + if (this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) { - await Task.Delay(50).ConfigureAwait(false); - continue; + // Send PingReq + Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq"); + this.sendQueue.Add(new PingReqPacket()); } } - Logger.Trace($"TrafficOutflowProcessor: {this.sendQueue.Count} packets waiting to be sent."); + try + { + await Task.Delay(2000, cancellationToken).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled"); + break; + } + } + + Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.connectState}"); + + return true; + }, cancellationToken); - // Batch load up to 50 queued packets - List packetsToSend = new(); - while (this.sendQueue.TryDequeue(out var p)) + /// + /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue. + /// + private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run( + async () => + { + this.lastCommunicationTimer.Start(); + Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.connectState}"); + + while (true) + { + if (cancellationToken.IsCancellationRequested) { - packetsToSend.Add(p); - if (packetsToSend.Count >= 50) - { - break; - } + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.sendQueue.Count} packets remaining."); + break; } - Logger.Trace($"TrafficOutflowProcessor: Sending a batch of {packetsToSend.Count} packets."); - foreach (var packet in packetsToSend) + while (this.connectState == ConnectState.Disconnected) { - FlushResult writeResult = default; + Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); + await Task.Delay(2000).ConfigureAwait(false); + continue; + } - switch (packet) - { - // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. - case ConnectPacket connectPacket: - Logger.Trace($"--> ConnectPacket id={connectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); - this.OnConnectSentEventLauncher(connectPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"--> DisconnectPacket id={disconnectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); - this.OnDisconnectSentEventLauncher(disconnectPacket); - break; - case SubscribePacket subscribePacket: - Logger.Trace($"--> SubscribePacket id={subscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); - this.OnSubscribeSentEventLauncher(subscribePacket); - break; - case UnsubscribePacket unsubscribePacket: - Logger.Trace($"--> UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); - this.OnUnsubscribeSentEventLauncher(unsubscribePacket); - break; - case PublishPacket publishPacket: - Logger.Trace($"--> PublishPacket id={publishPacket.PacketIdentifier}"); - if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery || - publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) + Logger.Trace($"{this.Options.ClientId}-(W)- {this.sendQueue.Count} packets waiting to be sent."); + + var packet = this.sendQueue.Take(); + FlushResult writeResult = default; + + switch (packet) + { + // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. + case ConnectPacket connectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); + this.OnConnectSentEventLauncher(connectPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); + this.OnDisconnectSentEventLauncher(disconnectPacket); + break; + case SubscribePacket subscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); + this.OnSubscribeSentEventLauncher(subscribePacket); + break; + case UnsubscribePacket unsubscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); + this.OnUnsubscribeSentEventLauncher(unsubscribePacket); + break; + case PublishPacket publishPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); + if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery || + publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) + { + // QoS > 0 - Add to transaction queue + if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) { - // QoS > 0 - Add to transaction queue - if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) - { - Logger.Warn("Duplicate packet ID detected."); - continue; - } + Logger.Warn("Duplicate packet ID detected."); + continue; } + } - writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); - this.OnPublishSentEventLauncher(publishPacket); - break; - case PubAckPacket pubAckPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"--> PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); - this.OnPubAckSentEventLauncher(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"--> PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); - this.OnPubRecSentEventLauncher(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - // This is in response to a received PubRec packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"--> PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); - this.OnPubRelSentEventLauncher(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - // This is in response to a received PubRel packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"--> PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); - this.OnPubCompSentEventLauncher(pubCompPacket); - break; - /* case AuthPacket authPacket: - /* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false); - /* this.OnAuthSentEventLauncher(authPacket); - /* break; - */ - default: - Logger.Trace($"--> Unknown packet type {packet}"); - break; - - } // switch - - if (writeResult.IsCanceled) - { - Logger.Trace("TrafficOutflowProcessor Write Canceled"); + writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); + this.OnPublishSentEventLauncher(publishPacket); + break; + case PubAckPacket pubAckPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); + this.OnPubAckSentEventLauncher(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); + this.OnPubRecSentEventLauncher(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + // This is in response to a received PubRec packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); + this.OnPubRelSentEventLauncher(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + // This is in response to a received PubRel packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); + this.OnPubCompSentEventLauncher(pubCompPacket); + break; + case PingReqPacket pingReqPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); + this.OnPingReqSentEventLauncher(pingReqPacket); break; - } - if (writeResult.IsCompleted) - { - Logger.Trace("TrafficOutflowProcessor IsCompleted: end of the stream"); + /* case AuthPacket authPacket: + /* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false); + /* this.OnAuthSentEventLauncher(authPacket); + /* break; + */ + default: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); break; - } - stopWatch.Restart(); - } // foreach - } // while + } // switch + + // if (writeResult.IsCancelled) + // { + // Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + // break; + // } - Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}"); + if (writeResult.IsCompleted) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + break; + } + + this.lastCommunicationTimer.Restart(); + } // foreach + + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.connectState}"); return true; }, cancellationToken); /// /// Asynchronous background task that handles the incoming traffic of packets. Received packets - /// are queued into this.receivedQueue for processing by ReceivedPacketsProcessorAsync. + /// are queued into this.receivedQueue for processing by ReceivedPacketsHandlerAsync. /// - private Task TrafficInflowProcessorAsync(CancellationToken cancellationToken) => Task.Run( + private Task ConnectionReaderAsync(CancellationToken cancellationToken) => Task.Run( async () => { - Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.connectState}"); ReadResult readResult; while (this.connectState is ConnectState.Connecting or ConnectState.Connected) { - readResult = await this.ReadAsync().ConfigureAwait(false); - - if (cancellationToken.IsCancellationRequested || readResult.IsCanceled) + if (cancellationToken.IsCancellationRequested) { - Logger.Trace("TrafficInflowProcessor exiting due to cancellation: {cancellationToken.IsCancellationRequested} {readResult.IsCanceled}"); + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled"); break; } + readResult = await this.ReadAsync().ConfigureAwait(false); + + // if (readResult.IsCancelled) + // { + // Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); + // break; + // } + if (readResult.IsCompleted) { - Logger.Trace("TrafficInflowProcessor IsCompleted: end of the stream"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); if (this.connectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. - Logger.Trace("TrafficInflowProcessor IsCompleted: was unexpected"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: was unexpected"); this.connectState = ConnectState.Disconnected; // Launch the AfterDisconnect event with a clean disconnect set to false. this.AfterDisconnectEventLauncher(false); - this.cancellationSource.Cancel(); + this.cancellationTokenSource.Cancel(); + + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}"); return false; } @@ -241,7 +276,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok if (decodedPacket is MalformedPacket) { Logger.Warn($"Malformed packet received. Disconnecting."); - Logger.Debug($"Malformed packet received: {decodedPacket}"); + Logger.Debug($"{this.Options.ClientId}-(R)- Malformed packet received: {decodedPacket}"); var opts = new DisconnectOptions { @@ -255,7 +290,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok // Advance the reader to the end of the consumed data buffer = buffer.Slice(0, consumed); this.reader?.AdvanceTo(buffer.Start, readResult.Buffer.End); - Logger.Trace("TrafficInflowProcessor: PacketDecoder.TryDecode returned false. Waiting for more data..."); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader: PacketDecoder.TryDecode returned false. Waiting for more data..."); break; } @@ -264,14 +299,14 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok this.reader?.AdvanceTo(buffer.Start); // Add the packet to the received queue for processing later - // by ReceivedPacketsProcessorAsync - this.receivedQueue.Enqueue(decodedPacket); + // by ReceivedPacketsHandlerAsync + Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue."); + this.receivedQueue.Add(decodedPacket); } // while (buffer.Length > 0 } // while (this.connectState is ConnectState.Connecting or ConnectState.Connected) - Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Exiting...{this.connectState}"); - + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}"); return true; }, cancellationToken); @@ -280,79 +315,68 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok /// /// The cancellation token to stop the task. /// A fairly worthless boolean. - private Task ReceivedPacketsProcessorAsync(CancellationToken cancellationToken) => Task.Run( - async () => + private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Task.Run( + () => { - Logger.Trace($"{Environment.CurrentManagedThreadId}: ReceivedPacketsProcessor Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- Starting...{this.connectState}"); while (true) { - if (this.receivedQueue.IsEmpty) - { - await Task.Delay(50).ConfigureAwait(false); - continue; - } - else - { - Logger.Trace($"ReceivedPacketsProcessor: {this.receivedQueue.Count} received packets waiting to be processed."); - } - - if (this.receivedQueue.TryDequeue(out var packet)) - { - switch (packet) - { - case ConnAckPacket connAckPacket: - Logger.Trace($"<-- ConnAck id={connAckPacket.PacketIdentifier}"); - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"<-- Disconnect id={disconnectPacket.PacketIdentifier}"); - Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - this.OnDisconnectReceivedEventLauncher(disconnectPacket); - break; - case PingRespPacket pingRespPacket: - Logger.Trace($"<-- PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); - break; - case SubAckPacket subAckPacket: - Logger.Trace($"<-- SubAck id={subAckPacket.PacketIdentifier}"); - this.OnSubAckReceivedEventLauncher(subAckPacket); - break; - case UnsubAckPacket unsubAckPacket: - Logger.Trace($"<-- UnsubAck id={unsubAckPacket.PacketIdentifier}"); - this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); - break; - case PublishPacket publishPacket: - this.HandleIncomingPublishPacket(publishPacket); - break; - case PubAckPacket pubAckPacket: - this.HandleIncomingPubAckPacket(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - this.HandleIncomingPubRecPacket(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - this.HandleIncomingPubRelPacket(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - this.HandleIncomingPubCompPacket(pubCompPacket); - break; - default: - Logger.Trace("<-- Unknown"); - Logger.Error($"Unrecognized packet received. Will discard. {packet}"); - break; - } // switch (packet) - } - if (cancellationToken.IsCancellationRequested) { - Logger.Trace("ReceivedPacketsProcessor Canceled"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.receivedQueue.Count} received packets remaining."); break; } + Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.receivedQueue.Count} received packets currently waiting to be processed."); + + var packet = this.receivedQueue.Take(); + switch (packet) + { + case ConnAckPacket connAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + this.OnConnAckReceivedEventLauncher(connAckPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}"); + Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + this.OnDisconnectReceivedEventLauncher(disconnectPacket); + break; + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case SubAckPacket subAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); + this.OnSubAckReceivedEventLauncher(subAckPacket); + break; + case UnsubAckPacket unsubAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); + this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); + break; + case PublishPacket publishPacket: + this.HandleIncomingPublishPacket(publishPacket); + break; + case PubAckPacket pubAckPacket: + this.HandleIncomingPubAckPacket(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + this.HandleIncomingPubRecPacket(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + this.HandleIncomingPubRelPacket(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + this.HandleIncomingPubCompPacket(pubCompPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); + Logger.Error($"Unrecognized packet received. Will discard. {packet}"); + break; + } // switch (packet) } // while - Logger.Trace($"{Environment.CurrentManagedThreadId}: ReceivedPacketsProcessor Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.connectState}"); return true; }, cancellationToken); @@ -363,14 +387,14 @@ private Task ReceivedPacketsProcessorAsync(CancellationToken cancellationT /// The received publish packet. internal void HandleIncomingPublishPacket(PublishPacket publishPacket) { - Logger.Trace($"<-- Publish id={publishPacket.PacketIdentifier}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}"); this.OnMessageReceivedEventLauncher(publishPacket); if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery) { // We've received a QoS 1 publish. Send a PubAck. var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); - this.sendQueue.Enqueue(pubAckResponse); + this.sendQueue.Add(pubAckResponse); } else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) { @@ -384,7 +408,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; } - this.sendQueue.Enqueue(pubRecResponse); + this.sendQueue.Add(pubRecResponse); } } @@ -395,7 +419,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) /// Raised if the packet identifier is unknown. internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) { - Logger.Trace($"<-- PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); this.OnPubAckReceivedEventLauncher(pubAckPacket); // Remove the transaction chain from the transaction queue @@ -419,7 +443,7 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) /// The received PubRec packet. internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) { - Logger.Trace($"<-- PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); this.OnPubRecReceivedEventLauncher(pubRecPacket); // Find the QoS2 transaction chain for this packet identifier @@ -443,13 +467,13 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) } // Send the PUBREL response - this.sendQueue.Enqueue(pubRelResponsePacket); + this.sendQueue.Add(pubRelResponsePacket); } else { // Send a PUBREL with PacketIdentifierNotFound var pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound); - this.sendQueue.Enqueue(pubRelResponsePacket); + this.sendQueue.Add(pubRelResponsePacket); } } @@ -460,7 +484,7 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) /// The received PubRel packet. internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) { - Logger.Trace($"<-- PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); this.OnPubRelReceivedEventLauncher(pubRelPacket); if (this.transactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain)) @@ -485,7 +509,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) Logger.Warn($"QoS2: Couldn't remove PubRel --> PubComp QoS2 Chain for packet identifier {pubRelPacket.PacketIdentifier}."); } - this.sendQueue.Enqueue(pubCompResponsePacket); + this.sendQueue.Add(pubCompResponsePacket); } else { @@ -494,7 +518,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) // Send a PUBCOMP with PacketIdentifierNotFound var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound); - this.sendQueue.Enqueue(pubCompResponsePacket); + this.sendQueue.Add(pubCompResponsePacket); } } @@ -505,7 +529,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) /// Raised if the packet identifier is unknown. internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) { - Logger.Trace($"<-- PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); this.OnPubCompReceivedEventLauncher(pubCompPacket); // Remove the QoS 2 transaction chain from the queue diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index c69646c6..de6ac950 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -131,6 +131,20 @@ public static bool MatchTopic(string pattern, string candidate) return Regex.IsMatch(candidate, regexp); } + /// + /// Generate a packet identifier. + /// + /// A valid packet identifier. + protected int GeneratePacketIdentifier() + { + if (this.lastPacketId == ushort.MaxValue) + { + this.lastPacketId = 0; + } + + return Interlocked.Increment(ref this.lastPacketId); + } + /// /// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0. /// @@ -147,20 +161,6 @@ from executing a second time. GC.SuppressFinalize(this); } - /// - /// Generate a packet identifier. - /// - /// A valid packet identifier. - protected int GeneratePacketIdentifier() - { - if (this.lastPacketId == ushort.MaxValue) - { - this.lastPacketId = 0; - } - - return Interlocked.Increment(ref this.lastPacketId); - } - /// /// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0 /// Dispose(bool disposing) executes in two distinct scenarios. @@ -174,6 +174,8 @@ protected int GeneratePacketIdentifier() /// True if called from user code. protected virtual void Dispose(bool disposing) { + Logger.Trace("Disposing HiveMQClient"); + // Check to see if Dispose has already been called. if (!this.disposed) { @@ -181,8 +183,18 @@ protected virtual void Dispose(bool disposing) // and unmanaged resources. if (disposing) { + if (this.connectState == Internal.ConnectState.Connected) + { + Logger.Trace("HiveMQClient Dispose: Disconnecting connected client."); + _ = Task.Run(async () => await this.DisconnectAsync().ConfigureAwait(false)); + } + // Dispose managed resources. - // { } + this.sendQueue.CompleteAdding(); + this.receivedQueue.CompleteAdding(); + + this.cancellationTokenSource.Cancel(); + this.cancellationTokenSource.Dispose(); } // Call the appropriate methods to clean up diff --git a/Source/HiveMQtt/MQTT5/PacketDecoder.cs b/Source/HiveMQtt/MQTT5/PacketDecoder.cs index 2d21b00e..127de369 100644 --- a/Source/HiveMQtt/MQTT5/PacketDecoder.cs +++ b/Source/HiveMQtt/MQTT5/PacketDecoder.cs @@ -54,7 +54,7 @@ public static bool TryDecode(ReadOnlySequence buffer, out ControlPacket de if (buffer.Length < packetLength) { // Not all data for this packet has arrived yet. Try again... - Logger.Trace("PacketDecoder.Decode: Not all data for this packet has arrived yet. Returning PartialPacket."); + Logger.Trace($"PacketDecoder.TryDecode: Waiting on more data: {buffer.Length} < {packetLength} - Returning PartialPacket."); decodedPacket = new PartialPacket(); consumed = default; return false; diff --git a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs index f7a32d1f..b70d9b3b 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs @@ -34,7 +34,7 @@ public void WithBadSessionExpiryInterval() Assert.Equal(0, options.SessionExpiryInterval); options.SessionExpiryInterval = long.MaxValue; - options.ValidateOptions(); + options.Validate(); Assert.Equal(uint.MaxValue, options.SessionExpiryInterval); }